diff --git a/main.py b/main.py index 349e81c..5de32eb 100644 --- a/main.py +++ b/main.py @@ -1,32 +1,173 @@ -from sys import stderr from typing import cast from requests import Response, Session from bs4 import BeautifulSoup, Tag -from json import JSONDecodeError, loads +from collections import OrderedDict +from json import loads -class ScraperData: +class _ScraperData: def __init__(self, data: dict[str, object]) -> None: - if not data: - raise ValueError("Données insuffisantes pour créer un ScraperData.") self._data: dict[str, object] = data + def _getcontent(self) -> dict[str, object] | None: + """_summary_ + + Returns: + dict[str, object]: _description_ + """ + current_data: dict[str, object] = self._data + for key in ["initialReduxState", "product", "content"]: + new_data: object | None = current_data.get(key) + if new_data is None: + return None + current_data: dict[str, object] = cast(dict[str, object], new_data) + + return current_data + def _getattributes(self) -> dict[str, object] | None: - current_data: object = self._data.get("attributes") - if isinstance(current_data, dict): - return cast(dict[str, object], current_data) - return None + """_summary_ + + Returns: + dict[str, object]: _description_ + """ + current_data: object = self._getcontent() + if current_data is None: + return None + return cast(dict[str, object], current_data.get("attributes")) + + def prix(self) -> float: + """ + Retourne le prix unitaire d'une bouteille (75cl). + + Le JSON contient plusieurs formats de vente dans content["items"] : + - bouteille seule : nbunit = 1 et equivbtl = 1 -> prix direct + - caisse de plusieurs bouteilles : nbunit > 1 -> on divise le prix total + - formats spéciaux (magnum etc.) : equivbtl > 1 -> même calcul + + Formule générale : + prix_unitaire = offerPrice / (nbunit * equivbtl) + + """ + + content = self._getcontent() + + # si content n'existe pas -> erreur + if content is None: + raise ValueError("Contenu introuvable") + + # On récupère la liste des formats disponibles (bouteille, carton...) + items = content.get("items") + + # Vérification que items est bien une liste non vide + if not isinstance(items, list) or len(items) == 0: + raise ValueError("Aucun prix disponible (items vide)") + + # -------------------------- + # CAS 1 : bouteille unitaire + # -------------------------- + # On cherche un format où nbunit=1 et equivbtl=1 ->bouteille standard 75cl + for item in items: + + if not isinstance(item, dict): + continue + + # On récupère les attributs du format + attrs = item.get("attributes", {}) + + # On récupère nbunit et equivbtl + nbunit = attrs.get("nbunit", {}).get("value") + equivbtl = attrs.get("equivbtl", {}).get("value") + + # Si c'est une bouteille unitaire + if nbunit == "1" and equivbtl == "1": + + p = item.get("offerPrice") + + # Vérification que c'est bien un nombre + if isinstance(p, (int, float)): + return float(p) + + # -------------------------- + # CAS 2 : caisse ou autre format + # -------------------------- + # On calcule le prix unitaire à partir du prix total + for item in items: + + if not isinstance(item, dict): + continue + + p = item.get("offerPrice") + attrs = item.get("attributes", {}) + + nbunit = attrs.get("nbunit", {}).get("value") + equivbtl = attrs.get("equivbtl", {}).get("value") + + # Vérification que toutes les valeurs existent + if isinstance(p, (int, float)) and nbunit and equivbtl: + + # Calcul du nombre total de bouteilles équivalentes + denom = float(nbunit) * float(equivbtl) + + # Évite division par zéro + if denom > 0: + + # Calcul du prix unitaire + prix_unitaire = float(p) / denom + + # Arrondi à 2 décimales + return round(prix_unitaire, 2) + + # Si aucun prix trouvé + raise ValueError("Impossible de trouver le prix unitaire.") def appellation(self) -> str | None: + """_summary_ + + Returns: + str: _description_ + """ + attrs: dict[str, object] | None = self._getattributes() + + if attrs is not None: + app_dict: object | None = attrs.get("appellation") + if isinstance(app_dict, dict): + return cast(str, app_dict.get("value")) + return None + + def _getcritiques(self, name: str) -> str | None: + """_summary_ + + Args: + name (str): _description_ + + Returns: + str | None: _description_ + """ + current_value: dict[str, object] | None = self._getattributes() if current_value is not None: app_dict: dict[str, object] = cast( - dict[str, object], current_value.get("appellation") + dict[str, object], current_value.get(name) ) - if app_dict: - return cast(str, app_dict.get("value")) + if not app_dict: + return None + + val = cast(str, app_dict.get("value")).rstrip("+").split("-") + if len(val) > 1: + val[0] = str((int(val[0]) + int(val[1])) / 2) + + return val[0] return None + def parker(self) -> str | None: + return self._getcritiques("note_rp") + + def robinson(self) -> str | None: + return self._getcritiques("note_jr") + + def suckling(self) -> str | None: + return self._getcritiques("note_js") + def getdata(self) -> dict[str, object]: return self._data @@ -47,8 +188,10 @@ class Scraper: # TCP et d'avoir toujours une connexion constante avec le server self._session: Session = Session() # Système de cache pour éviter de solliciter le serveur inutilement - self._latest_request: tuple[(str, Response | None)] = ("", None) - self._latest_soup: tuple[(str, BeautifulSoup | None)] = ("", None) + self._latest_request: tuple[(str, Response)] | None = None + self._latest_soups: OrderedDict[str, BeautifulSoup] = OrderedDict[ + str, BeautifulSoup + ]() def _request(self, subdir: str) -> Response: """ @@ -68,12 +211,14 @@ class Scraper: response.raise_for_status() return response - def getresponse(self, subdir: str = "") -> Response: + def getresponse(self, subdir: str = "", use_cache: bool = True) -> Response: """ Récupère la réponse d'une page, en utilisant le cache si possible. Args: subdir (str, optional): Le chemin de la page. + use_cache (bool, optional): Utilise la donnée deja sauvegarder ou + écrase la donnée utilisé avec la nouvelle Returns: Response: L'objet réponse (cache ou nouvelle requête). @@ -81,16 +226,24 @@ class Scraper: Raise: HTTPError: Si le serveur renvoie un code d'erreur (4xx, 5xx). """ - rq_subdir, rq_response = self._latest_request - if rq_response is not None and subdir == rq_subdir: - return rq_response + # si dans le cache, latest_request existe + if use_cache and self._latest_request is not None: + rq_subdir, rq_response = self._latest_request + + # si c'est la meme requete et que use_cache est true, + # on renvoie celle enregistrer + if subdir == rq_subdir: + return rq_response request: Response = self._request(subdir) - self._latest_request = (subdir, request) + # on recrée la structure pour le systeme de cache si activer + if use_cache: + self._latest_request = (subdir, request) + return request - def getsoup(self, subdir: str = "") -> BeautifulSoup: + def getsoup(self, subdir: str, use_cache: bool = True) -> BeautifulSoup: """ Récupère le contenu HTML d'une page et le transforme en objet BeautifulSoup. @@ -103,26 +256,29 @@ class Scraper: Raise: HTTPError: Si le serveur renvoie un code d'erreur (4xx, 5xx). """ - rq_subdir, rq_soup = self._latest_soup - if rq_soup is not None and subdir == rq_subdir: - return rq_soup + if use_cache and subdir in self._latest_soups: + return self._latest_soups[subdir] - soup: BeautifulSoup = BeautifulSoup( - markup=self.getresponse(subdir).text, features="html.parser" - ) + markup: str = self.getresponse(subdir).text + soup: BeautifulSoup = BeautifulSoup(markup, features="html.parser") + + if use_cache: + self._latest_soups[subdir] = soup + + if len(self._latest_soups) > 10: + _ = self._latest_soups.popitem(last=False) - self._latest_soup = (subdir, soup) return soup - def getjsondata(self, subdir: str = "", id: str = "__NEXT_DATA__") -> ScraperData: + def getjsondata(self, subdir: str, id: str = "__NEXT_DATA__") -> _ScraperData: """ Extrait les données JSON contenues dans la balise __NEXT_DATA__ du site. Beaucoup de sites modernes (Next.js) stockent leur état initial dans une balise + + + """, + ) + + m.get( + "https://www.millesima.fr/poubelle", + text=f""" + + +

POUBELLE

+ + + + """, ) json_data = { @@ -41,7 +81,7 @@ def mock_site(): "name": "En promotion", "value": "Non", "sequence": 80, - "displayable": "false", + "displayable": "False", "type": "CHECKBOX", "isSpirit": False, }, @@ -54,6 +94,18 @@ def mock_site(): "type": "CHECKBOX", "isSpirit": False, }, + "equivbtl": { + "valueId": "1", + "name": "equivbtl", + "value": "1", + "isSpirit": False, + }, + "nbunit": { + "valueId": "6", + "name": "nbunit", + "value": "6", + "isSpirit": False, + }, }, "stock": 12, "availability": "2026-02-05", @@ -73,6 +125,24 @@ def mock_site(): "isSpirit": False, "groupIdentifier": "appellation_433", }, + "note_rp": { + "valueId": "91", + "name": "Parker", + "value": "91", + "isSpirit": False, + }, + "note_jr": { + "valueId": "17+", + "name": "J. Robinson", + "value": "17+", + "isSpirit": False, + }, + "note_js": { + "valueId": "93-94", + "name": "J. Suckling", + "value": "93-94", + "isSpirit": False, + }, }, } } @@ -83,10 +153,12 @@ def mock_site(): html_product = f""" - - + +

MILLESIMA

+ + """ m.get( @@ -104,22 +176,67 @@ def scraper() -> Scraper: def test_soup(scraper: Scraper): - h1: Tag | None = scraper.getsoup().find("h1") - - assert isinstance(h1, Tag) - assert h1.text == "MILLESIMA" - - -# def test_getProductName(scraper: Scraper): -# jsondata = scraper.getjsondata("nino-negri-5-stelle-sfursat-2022.html") -# assert jsondata["productName"] == "Nino Negri : 5 Stelle Sfursat 2022" -# assert isinstance(jsondata["items"], list) -# assert len(jsondata["items"]) > 0 -# assert jsondata["items"][0]["offerPrice"] == 390 + vide = scraper.getsoup("") + poubelle = scraper.getsoup("poubelle") + contenu = scraper.getsoup("nino-negri-5-stelle-sfursat-2022.html") + assert vide.find("h1") is None + assert str(poubelle.find("h1")) == "

POUBELLE

" + assert str(contenu.find("h1")) == "

MILLESIMA

" def test_appellation(scraper: Scraper): - appellation: ScraperData = scraper.getjsondata( - "nino-negri-5-stelle-sfursat-2022.html" - ) - assert appellation.appellation() == "Sforzato di Valtellina" + vide = scraper.getjsondata("") + poubelle = scraper.getjsondata("poubelle") + contenu = scraper.getjsondata("nino-negri-5-stelle-sfursat-2022.html") + assert vide.appellation() is None + assert poubelle.appellation() is None + assert contenu.appellation() == "Sforzato di Valtellina" + + +def test_fonctionprivee(scraper: Scraper): + vide = scraper.getjsondata("") + poubelle = scraper.getjsondata("poubelle") + contenu = scraper.getjsondata("nino-negri-5-stelle-sfursat-2022.html") + assert vide._getattributes() is not None + assert vide._getattributes() == {} + assert vide._getcontent() is not None + assert vide._getcontent() == {"items": [], "attributes": {}} + assert poubelle._getattributes() is None + assert poubelle._getcontent() is None + assert contenu._getcontent() is not None + assert contenu._getattributes() is not None + + + +def test_critiques(scraper: Scraper): + vide = scraper.getjsondata("") + poubelle = scraper.getjsondata("poubelle") + contenu = scraper.getjsondata("nino-negri-5-stelle-sfursat-2022.html") + assert vide.parker() is None + assert vide.robinson() is None + assert vide.suckling() is None + assert vide._getcritiques("test_ts") is None + assert poubelle.parker() is None + assert poubelle.robinson() is None + assert poubelle.suckling() is None + assert poubelle._getcritiques("test_ts") is None + assert contenu.parker() == "91" + assert contenu.robinson() == "17" + assert contenu.suckling() == "93.5" + assert contenu._getcritiques("test_ts") is None + +def test_prix(scraper: Scraper): + vide = scraper.getjsondata("") + poubelle = scraper.getjsondata("poubelle") + contenu = scraper.getjsondata("nino-negri-5-stelle-sfursat-2022.html") + + # Cas vide : items == [] -> on ne peut pas calculer -> ValueError + with pytest.raises(ValueError): + _ = vide.prix() + + # Cas poubelle : JSON incomplet -> _getcontent() None -> ValueError + with pytest.raises(ValueError): + _ = poubelle.prix() + + assert contenu.prix() == 65.0 +