Merge pull request #7 from guezoloic/exo7+6

Exo7 sans exo6
This commit is contained in:
Loïc GUEZO
2026-02-10 20:12:54 +01:00
committed by GitHub
3 changed files with 337 additions and 78 deletions

249
main.py
View File

@@ -1,32 +1,173 @@
from sys import stderr
from typing import cast from typing import cast
from requests import Response, Session from requests import Response, Session
from bs4 import BeautifulSoup, Tag from bs4 import BeautifulSoup, Tag
from json import JSONDecodeError, loads from collections import OrderedDict
from json import loads
class ScraperData: class _ScraperData:
def __init__(self, data: dict[str, object]) -> None: def __init__(self, data: dict[str, object]) -> None:
if not data:
raise ValueError("Données insuffisantes pour créer un ScraperData.")
self._data: dict[str, object] = data self._data: dict[str, object] = data
def _getcontent(self) -> dict[str, object] | None:
"""_summary_
Returns:
dict[str, object]: _description_
"""
current_data: dict[str, object] = self._data
for key in ["initialReduxState", "product", "content"]:
new_data: object | None = current_data.get(key)
if new_data is None:
return None
current_data: dict[str, object] = cast(dict[str, object], new_data)
return current_data
def _getattributes(self) -> dict[str, object] | None: def _getattributes(self) -> dict[str, object] | None:
current_data: object = self._data.get("attributes") """_summary_
if isinstance(current_data, dict):
return cast(dict[str, object], current_data) Returns:
return None dict[str, object]: _description_
"""
current_data: object = self._getcontent()
if current_data is None:
return None
return cast(dict[str, object], current_data.get("attributes"))
def prix(self) -> float:
"""
Retourne le prix unitaire d'une bouteille (75cl).
Le JSON contient plusieurs formats de vente dans content["items"] :
- bouteille seule : nbunit = 1 et equivbtl = 1 -> prix direct
- caisse de plusieurs bouteilles : nbunit > 1 -> on divise le prix total
- formats spéciaux (magnum etc.) : equivbtl > 1 -> même calcul
Formule générale :
prix_unitaire = offerPrice / (nbunit * equivbtl)
"""
content = self._getcontent()
# si content n'existe pas -> erreur
if content is None:
raise ValueError("Contenu introuvable")
# On récupère la liste des formats disponibles (bouteille, carton...)
items = content.get("items")
# Vérification que items est bien une liste non vide
if not isinstance(items, list) or len(items) == 0:
raise ValueError("Aucun prix disponible (items vide)")
# --------------------------
# CAS 1 : bouteille unitaire
# --------------------------
# On cherche un format où nbunit=1 et equivbtl=1 ->bouteille standard 75cl
for item in items:
if not isinstance(item, dict):
continue
# On récupère les attributs du format
attrs = item.get("attributes", {})
# On récupère nbunit et equivbtl
nbunit = attrs.get("nbunit", {}).get("value")
equivbtl = attrs.get("equivbtl", {}).get("value")
# Si c'est une bouteille unitaire
if nbunit == "1" and equivbtl == "1":
p = item.get("offerPrice")
# Vérification que c'est bien un nombre
if isinstance(p, (int, float)):
return float(p)
# --------------------------
# CAS 2 : caisse ou autre format
# --------------------------
# On calcule le prix unitaire à partir du prix total
for item in items:
if not isinstance(item, dict):
continue
p = item.get("offerPrice")
attrs = item.get("attributes", {})
nbunit = attrs.get("nbunit", {}).get("value")
equivbtl = attrs.get("equivbtl", {}).get("value")
# Vérification que toutes les valeurs existent
if isinstance(p, (int, float)) and nbunit and equivbtl:
# Calcul du nombre total de bouteilles équivalentes
denom = float(nbunit) * float(equivbtl)
# Évite division par zéro
if denom > 0:
# Calcul du prix unitaire
prix_unitaire = float(p) / denom
# Arrondi à 2 décimales
return round(prix_unitaire, 2)
# Si aucun prix trouvé
raise ValueError("Impossible de trouver le prix unitaire.")
def appellation(self) -> str | None: def appellation(self) -> str | None:
"""_summary_
Returns:
str: _description_
"""
attrs: dict[str, object] | None = self._getattributes()
if attrs is not None:
app_dict: object | None = attrs.get("appellation")
if isinstance(app_dict, dict):
return cast(str, app_dict.get("value"))
return None
def _getcritiques(self, name: str) -> str | None:
"""_summary_
Args:
name (str): _description_
Returns:
str | None: _description_
"""
current_value: dict[str, object] | None = self._getattributes() current_value: dict[str, object] | None = self._getattributes()
if current_value is not None: if current_value is not None:
app_dict: dict[str, object] = cast( app_dict: dict[str, object] = cast(
dict[str, object], current_value.get("appellation") dict[str, object], current_value.get(name)
) )
if app_dict: if not app_dict:
return cast(str, app_dict.get("value")) return None
val = cast(str, app_dict.get("value")).rstrip("+").split("-")
if len(val) > 1:
val[0] = str((int(val[0]) + int(val[1])) / 2)
return val[0]
return None return None
def parker(self) -> str | None:
return self._getcritiques("note_rp")
def robinson(self) -> str | None:
return self._getcritiques("note_jr")
def suckling(self) -> str | None:
return self._getcritiques("note_js")
def getdata(self) -> dict[str, object]: def getdata(self) -> dict[str, object]:
return self._data return self._data
@@ -47,8 +188,10 @@ class Scraper:
# TCP et d'avoir toujours une connexion constante avec le server # TCP et d'avoir toujours une connexion constante avec le server
self._session: Session = Session() self._session: Session = Session()
# Système de cache pour éviter de solliciter le serveur inutilement # Système de cache pour éviter de solliciter le serveur inutilement
self._latest_request: tuple[(str, Response | None)] = ("", None) self._latest_request: tuple[(str, Response)] | None = None
self._latest_soup: tuple[(str, BeautifulSoup | None)] = ("", None) self._latest_soups: OrderedDict[str, BeautifulSoup] = OrderedDict[
str, BeautifulSoup
]()
def _request(self, subdir: str) -> Response: def _request(self, subdir: str) -> Response:
""" """
@@ -68,12 +211,14 @@ class Scraper:
response.raise_for_status() response.raise_for_status()
return response return response
def getresponse(self, subdir: str = "") -> Response: def getresponse(self, subdir: str = "", use_cache: bool = True) -> Response:
""" """
Récupère la réponse d'une page, en utilisant le cache si possible. Récupère la réponse d'une page, en utilisant le cache si possible.
Args: Args:
subdir (str, optional): Le chemin de la page. subdir (str, optional): Le chemin de la page.
use_cache (bool, optional): Utilise la donnée deja sauvegarder ou
écrase la donnée utilisé avec la nouvelle
Returns: Returns:
Response: L'objet réponse (cache ou nouvelle requête). Response: L'objet réponse (cache ou nouvelle requête).
@@ -81,16 +226,24 @@ class Scraper:
Raise: Raise:
HTTPError: Si le serveur renvoie un code d'erreur (4xx, 5xx). HTTPError: Si le serveur renvoie un code d'erreur (4xx, 5xx).
""" """
rq_subdir, rq_response = self._latest_request
if rq_response is not None and subdir == rq_subdir: # si dans le cache, latest_request existe
return rq_response if use_cache and self._latest_request is not None:
rq_subdir, rq_response = self._latest_request
# si c'est la meme requete et que use_cache est true,
# on renvoie celle enregistrer
if subdir == rq_subdir:
return rq_response
request: Response = self._request(subdir) request: Response = self._request(subdir)
self._latest_request = (subdir, request) # on recrée la structure pour le systeme de cache si activer
if use_cache:
self._latest_request = (subdir, request)
return request return request
def getsoup(self, subdir: str = "") -> BeautifulSoup: def getsoup(self, subdir: str, use_cache: bool = True) -> BeautifulSoup:
""" """
Récupère le contenu HTML d'une page et le transforme en objet BeautifulSoup. Récupère le contenu HTML d'une page et le transforme en objet BeautifulSoup.
@@ -103,26 +256,29 @@ class Scraper:
Raise: Raise:
HTTPError: Si le serveur renvoie un code d'erreur (4xx, 5xx). HTTPError: Si le serveur renvoie un code d'erreur (4xx, 5xx).
""" """
rq_subdir, rq_soup = self._latest_soup
if rq_soup is not None and subdir == rq_subdir: if use_cache and subdir in self._latest_soups:
return rq_soup return self._latest_soups[subdir]
soup: BeautifulSoup = BeautifulSoup( markup: str = self.getresponse(subdir).text
markup=self.getresponse(subdir).text, features="html.parser" soup: BeautifulSoup = BeautifulSoup(markup, features="html.parser")
)
if use_cache:
self._latest_soups[subdir] = soup
if len(self._latest_soups) > 10:
_ = self._latest_soups.popitem(last=False)
self._latest_soup = (subdir, soup)
return soup return soup
def getjsondata(self, subdir: str = "", id: str = "__NEXT_DATA__") -> ScraperData: def getjsondata(self, subdir: str, id: str = "__NEXT_DATA__") -> _ScraperData:
""" """
Extrait les données JSON contenues dans la balise __NEXT_DATA__ du site. Extrait les données JSON contenues dans la balise __NEXT_DATA__ du site.
Beaucoup de sites modernes (Next.js) stockent leur état initial dans Beaucoup de sites modernes (Next.js) stockent leur état initial dans
une balise <script> pour l'hydratation côté client. une balise <script> pour l'hydratation côté client.
Args: Args:
subdir (str, optional): Le chemin de la page. subdir (str): Le chemin de la page.
id (str, optional): L'identifiant de la balise script (par défaut __NEXT_DATA__). id (str, optional): L'identifiant de la balise script (par défaut __NEXT_DATA__).
Raises: Raises:
@@ -138,29 +294,16 @@ class Scraper:
soup: BeautifulSoup = self.getsoup(subdir) soup: BeautifulSoup = self.getsoup(subdir)
script: Tag | None = soup.find("script", id=id) script: Tag | None = soup.find("script", id=id)
if isinstance(script, Tag) and script.string: if script is None or not script.string:
try: raise ValueError(f"le script id={id} est introuvable")
current_data: object = loads(script.string)
# tout le chemin à parcourir pour arriver au données
# (plein d'information inutile)
keys: list[str] = [
"props",
"pageProps",
"initialReduxState",
"product",
"content",
]
for key in keys:
# si current_data est bien un dictionnaire et que la clé
# est bien dedans
if isinstance(current_data, dict) and key in current_data:
current_data: object = current_data[key]
else:
raise ValueError(f"Clé manquante dans le JSON : {key}")
if isinstance(current_data, dict): current_data: object = cast(object, loads(script.string))
return ScraperData(data=cast(dict[str, object], current_data))
for key in ["props", "pageProps"]:
if isinstance(current_data, dict) and key in current_data:
current_data = cast(object, current_data[key])
continue
raise ValueError(f"Clé manquante dans le JSON : {key}")
return _ScraperData(cast(dict[str, object], current_data))
except (JSONDecodeError, ValueError) as e:
print(f"Erreur lors de l'extraction JSON : {e}", file=stderr)
return ScraperData({})

View File

@@ -1,4 +1,3 @@
requests>=2.32.5 requests>=2.32.5
requests-mock>=1.12.1 requests-mock>=1.12.1
beautifulsoup4>=4.14.3 beautifulsoup4>=4.14.3

View File

@@ -2,7 +2,7 @@ from json import dumps
from bs4 import Tag from bs4 import Tag
import pytest import pytest
from requests_mock import Mocker from requests_mock import Mocker
from main import Scraper, ScraperData from main import Scraper
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
@@ -10,7 +10,47 @@ def mock_site():
with Mocker() as m: with Mocker() as m:
m.get( m.get(
"https://www.millesima.fr/", "https://www.millesima.fr/",
text="<html><body><h1>MILLESIMA</h1></body></html>", text=f"""
<html>
<body>
<script id="__NEXT_DATA__" type="application/json">
{dumps({
"props": {
"pageProps": {
"initialReduxState": {
"product": {
"content": {
"items": [],
"attributes": {}
}
}
}
}
}
})}
</script>
</body>
</html>
""",
)
m.get(
"https://www.millesima.fr/poubelle",
text=f"""
<html>
<body>
<h1>POUBELLE</h1>
<script id="__NEXT_DATA__" type="application/json">
{dumps({
"props": {
"pageProps": {
}
}
})}
</script>
</body>
</html>
""",
) )
json_data = { json_data = {
@@ -41,7 +81,7 @@ def mock_site():
"name": "En promotion", "name": "En promotion",
"value": "Non", "value": "Non",
"sequence": 80, "sequence": 80,
"displayable": "false", "displayable": "False",
"type": "CHECKBOX", "type": "CHECKBOX",
"isSpirit": False, "isSpirit": False,
}, },
@@ -54,6 +94,18 @@ def mock_site():
"type": "CHECKBOX", "type": "CHECKBOX",
"isSpirit": False, "isSpirit": False,
}, },
"equivbtl": {
"valueId": "1",
"name": "equivbtl",
"value": "1",
"isSpirit": False,
},
"nbunit": {
"valueId": "6",
"name": "nbunit",
"value": "6",
"isSpirit": False,
},
}, },
"stock": 12, "stock": 12,
"availability": "2026-02-05", "availability": "2026-02-05",
@@ -73,6 +125,24 @@ def mock_site():
"isSpirit": False, "isSpirit": False,
"groupIdentifier": "appellation_433", "groupIdentifier": "appellation_433",
}, },
"note_rp": {
"valueId": "91",
"name": "Parker",
"value": "91",
"isSpirit": False,
},
"note_jr": {
"valueId": "17+",
"name": "J. Robinson",
"value": "17+",
"isSpirit": False,
},
"note_js": {
"valueId": "93-94",
"name": "J. Suckling",
"value": "93-94",
"isSpirit": False,
},
}, },
} }
} }
@@ -83,10 +153,12 @@ def mock_site():
html_product = f""" html_product = f"""
<html> <html>
<script id="__NEXT_DATA__" type="application/json"> <body>
{dumps(json_data)} <h1>MILLESIMA</h1>
</script> <script id="__NEXT_DATA__" type="application/json">
</body> {dumps(json_data)}
</script>
</body>
</html> </html>
""" """
m.get( m.get(
@@ -104,22 +176,67 @@ def scraper() -> Scraper:
def test_soup(scraper: Scraper): def test_soup(scraper: Scraper):
h1: Tag | None = scraper.getsoup().find("h1") vide = scraper.getsoup("")
poubelle = scraper.getsoup("poubelle")
assert isinstance(h1, Tag) contenu = scraper.getsoup("nino-negri-5-stelle-sfursat-2022.html")
assert h1.text == "MILLESIMA" assert vide.find("h1") is None
assert str(poubelle.find("h1")) == "<h1>POUBELLE</h1>"
assert str(contenu.find("h1")) == "<h1>MILLESIMA</h1>"
# def test_getProductName(scraper: Scraper):
# jsondata = scraper.getjsondata("nino-negri-5-stelle-sfursat-2022.html")
# assert jsondata["productName"] == "Nino Negri : 5 Stelle Sfursat 2022"
# assert isinstance(jsondata["items"], list)
# assert len(jsondata["items"]) > 0
# assert jsondata["items"][0]["offerPrice"] == 390
def test_appellation(scraper: Scraper): def test_appellation(scraper: Scraper):
appellation: ScraperData = scraper.getjsondata( vide = scraper.getjsondata("")
"nino-negri-5-stelle-sfursat-2022.html" poubelle = scraper.getjsondata("poubelle")
) contenu = scraper.getjsondata("nino-negri-5-stelle-sfursat-2022.html")
assert appellation.appellation() == "Sforzato di Valtellina" assert vide.appellation() is None
assert poubelle.appellation() is None
assert contenu.appellation() == "Sforzato di Valtellina"
def test_fonctionprivee(scraper: Scraper):
vide = scraper.getjsondata("")
poubelle = scraper.getjsondata("poubelle")
contenu = scraper.getjsondata("nino-negri-5-stelle-sfursat-2022.html")
assert vide._getattributes() is not None
assert vide._getattributes() == {}
assert vide._getcontent() is not None
assert vide._getcontent() == {"items": [], "attributes": {}}
assert poubelle._getattributes() is None
assert poubelle._getcontent() is None
assert contenu._getcontent() is not None
assert contenu._getattributes() is not None
def test_critiques(scraper: Scraper):
vide = scraper.getjsondata("")
poubelle = scraper.getjsondata("poubelle")
contenu = scraper.getjsondata("nino-negri-5-stelle-sfursat-2022.html")
assert vide.parker() is None
assert vide.robinson() is None
assert vide.suckling() is None
assert vide._getcritiques("test_ts") is None
assert poubelle.parker() is None
assert poubelle.robinson() is None
assert poubelle.suckling() is None
assert poubelle._getcritiques("test_ts") is None
assert contenu.parker() == "91"
assert contenu.robinson() == "17"
assert contenu.suckling() == "93.5"
assert contenu._getcritiques("test_ts") is None
def test_prix(scraper: Scraper):
vide = scraper.getjsondata("")
poubelle = scraper.getjsondata("poubelle")
contenu = scraper.getjsondata("nino-negri-5-stelle-sfursat-2022.html")
# Cas vide : items == [] -> on ne peut pas calculer -> ValueError
with pytest.raises(ValueError):
_ = vide.prix()
# Cas poubelle : JSON incomplet -> _getcontent() None -> ValueError
with pytest.raises(ValueError):
_ = poubelle.prix()
assert contenu.prix() == 65.0