mirror of
https://github.com/guezoloic/millesima-ai-engine.git
synced 2026-03-28 18:03:47 +00:00
modif(main.py): restructuration de la classe
This commit is contained in:
156
main.py
156
main.py
@@ -1,90 +1,94 @@
|
|||||||
import requests
|
from sys import stderr
|
||||||
from typing import Any, Dict
|
from typing import cast
|
||||||
from bs4 import BeautifulSoup
|
from requests import Response, Session
|
||||||
import json
|
from bs4 import BeautifulSoup, Tag
|
||||||
|
from json import JSONDecodeError, loads
|
||||||
|
|
||||||
|
|
||||||
class Scraper:
|
class Scraper:
|
||||||
"""
|
def __init__(self, subdir: str = "") -> None:
|
||||||
Scraper est une classe qui permet de gerer
|
self._url: str = "https://www.millemisa.fr/"
|
||||||
de façon dynamique des requetes uniquement
|
self._session: Session = Session()
|
||||||
sur le serveur https de Millesina
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, subdir: str = None):
|
self._latest_request: tuple[(str, Response | None)] = ("", None)
|
||||||
"""
|
|
||||||
Initialise la session de scraping et récupère la page d'accueil.
|
|
||||||
"""
|
|
||||||
# Très utile pour éviter de renvoyer toujours les mêmes handshake
|
|
||||||
# TCP et d'avoir toujours une connexion constante avec le server
|
|
||||||
self._session: requests.Session = requests.Session()
|
|
||||||
self._url: str = "https://www.millesima.fr/"
|
|
||||||
self._soup = self.getsoup(subdir)
|
|
||||||
|
|
||||||
def _request(
|
def _request(self, subdir: str) -> Response:
|
||||||
self, subdir: str, use_cache: bool = True
|
target_url: str = self._url + subdir.lstrip("/")
|
||||||
) -> requests.Response | requests.HTTPError:
|
response: Response = self._session.get(url=target_url, timeout=10)
|
||||||
"""
|
response.raise_for_status()
|
||||||
Effectue une requête GET sur le serveur Millesima.
|
return response
|
||||||
:param subdir: Le sous-répertoire ou chemin de l'URL (ex: "/vins").
|
|
||||||
:param use_cache: Si True, retourne la réponse précédente si l'URL est
|
|
||||||
identique.
|
|
||||||
:return: requests.Response: L'objet réponse de la requête.
|
|
||||||
:rtype: requests.HTTPError: Si le serveur renvoie un code d'erreur
|
|
||||||
(4xx, 5xx).
|
|
||||||
"""
|
|
||||||
|
|
||||||
target_url: str = f"{self._url}{subdir.lstrip('/')}" if subdir is \
|
def getresponse(self, subdir: str = "") -> Response:
|
||||||
not None else self._url
|
rq_subdir, rq_response = self._latest_request
|
||||||
# Éviter un max possible de faire des requetes au servers même
|
|
||||||
# en ayant un tunnel tcp avec le paramètre `use_cache` que si
|
|
||||||
# activer, va comparer l'url avec l'url précédant
|
|
||||||
if use_cache and hasattr(self, "_response") \
|
|
||||||
and self._response is not None:
|
|
||||||
if self._response.url == target_url:
|
|
||||||
return self._response
|
|
||||||
|
|
||||||
self._response: requests.Response = self._session.get(
|
if rq_response is None or subdir != rq_subdir:
|
||||||
target_url, timeout=10)
|
request: Response = self._request(subdir)
|
||||||
self._response.raise_for_status()
|
self._latest_request = (subdir, request)
|
||||||
|
return request
|
||||||
|
|
||||||
return self._response
|
return rq_response
|
||||||
|
|
||||||
def getsoup(self, subdir: str = None
|
def getsoup(self, subdir: str = "") -> BeautifulSoup:
|
||||||
) -> BeautifulSoup | requests.HTTPError:
|
markup: str = self.getresponse(subdir).text
|
||||||
"""
|
return BeautifulSoup(markup, features="html.parser")
|
||||||
Récupère le contenu HTML d'une page et le transforme en objet
|
|
||||||
BeautifulSoup.
|
|
||||||
|
|
||||||
:param subdir: Le chemin de la page. Si None, retourne la soupe
|
# def getjsondata(self, subdir: str = "", id: str = "__NEXT_DATA__") -> dict[str, object]:
|
||||||
actuelle.
|
# soup: BeautifulSoup = self.getsoup(subdir)
|
||||||
:return: BeautifulSoup: L'objet parsé pour extraction de données.
|
# # On s'assure que c'est bien un Tag pour avoir accès à .string
|
||||||
:rtype: BeautifulSoup
|
# script = soup.find("script", id=id)
|
||||||
"""
|
|
||||||
if not hasattr(self, "_soup") or subdir is not None:
|
|
||||||
self._request(subdir)
|
|
||||||
self._soup = BeautifulSoup(self._response.text, "html.parser")
|
|
||||||
return self._soup
|
|
||||||
|
|
||||||
def get_json_data(self) -> Dict[str, Any]:
|
# if isinstance(script, Tag) and script.string:
|
||||||
"""
|
# try:
|
||||||
Extrait les données JSON contenues dans la balise __NEXT_DATA__ du
|
# # On commence avec le dictionnaire complet
|
||||||
site.
|
# current_data: Any = loads(script.string)
|
||||||
Beaucoup de sites modernes (Next.js) stockent leur état initial dans
|
|
||||||
une balise <script> pour l'hydratation côté client.
|
|
||||||
|
|
||||||
:return Dict[str, Any]: Un dictionnaire contenant les props de la page,
|
# # Parcours de la structure imbriquée
|
||||||
ou un dictionnaire vide en cas d'erreur ou
|
# keys = ['props', 'pageProps', 'initialReduxState', 'product', 'content']
|
||||||
d'absence.
|
# for key in keys:
|
||||||
"""
|
# if isinstance(current_data, dict) and key in current_data:
|
||||||
script = self._soup.find("script", id="__NEXT_DATA__")
|
# current_data = current_data[key]
|
||||||
if script and script.string:
|
# else:
|
||||||
|
# # Si une clé manque, on lève une erreur explicite
|
||||||
|
# raise ValueError(f"Clé manquante dans le JSON : {key}")
|
||||||
|
|
||||||
|
# # On garantit à Pyright que le résultat final est un dictionnaire
|
||||||
|
# if isinstance(current_data, dict):
|
||||||
|
# return cast(dict[str, object], current_data)
|
||||||
|
|
||||||
|
# except (decoder.JSONDecodeError, ValueError) as e:
|
||||||
|
# print(f"Erreur lors de l'extraction JSON : {e}", file=stderr)
|
||||||
|
|
||||||
|
# return {}
|
||||||
|
|
||||||
|
def getjsondata(
|
||||||
|
self, subdir: str = "", id: str = "__NEXT_DATA__"
|
||||||
|
) -> dict[str, object]:
|
||||||
|
soup: BeautifulSoup = self.getsoup(subdir)
|
||||||
|
script: Tag | None = soup.find("script", id=id)
|
||||||
|
|
||||||
|
if isinstance(script, Tag) and script.string:
|
||||||
try:
|
try:
|
||||||
data: dict[str, Any] = json.loads(script.string)
|
current_data: object = loads(script.string)
|
||||||
for element in ['props', 'pageProps', 'initialReduxState',
|
# tout le chemin à parcourir pour arriver au données
|
||||||
'product', 'content']:
|
# (plein d'information inutile)
|
||||||
data = data[element]
|
keys: list[str] = [
|
||||||
return data
|
"props",
|
||||||
except json.decoder.JSONDecodeError:
|
"pageProps",
|
||||||
pass
|
"initialReduxState",
|
||||||
|
"product",
|
||||||
|
"content",
|
||||||
|
]
|
||||||
|
for key in keys:
|
||||||
|
# si current_data est bien un dictionnaire et que la clé
|
||||||
|
# est bien dedans
|
||||||
|
if isinstance(current_data, dict) and key in current_data:
|
||||||
|
current_data = current_data[key]
|
||||||
|
else:
|
||||||
|
raise ValueError(f"Clé manquante dans le JSON : {key}")
|
||||||
|
|
||||||
|
if isinstance(current_data, dict):
|
||||||
|
return cast(dict[str, object], current_data)
|
||||||
|
|
||||||
|
except (JSONDecodeError, ValueError) as e:
|
||||||
|
print(f"Erreur lors de l'extraction JSON : {e}", file=stderr)
|
||||||
return {}
|
return {}
|
||||||
@@ -2,10 +2,11 @@ from main import Scraper
|
|||||||
|
|
||||||
|
|
||||||
def test_soup():
|
def test_soup():
|
||||||
assert Scraper().getsoup().find('h1')\
|
assert Scraper().getsoup().find("h1").text[3:12] == "MILLESIMA"
|
||||||
.text[3:12] == "MILLESIMA"
|
|
||||||
|
|
||||||
|
|
||||||
def test_getProductName():
|
def test_getProductName():
|
||||||
assert Scraper("chateau-gloria-2016.html").get_json_data()['productName']\
|
assert (
|
||||||
|
Scraper("chateau-gloria-2016.html").get_json_data()["productName"]
|
||||||
== "Château Gloria 2016"
|
== "Château Gloria 2016"
|
||||||
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user