3 Commits

Author SHA1 Message Date
DAHMANI chahrazad
0a3561ffaa Fix comments for clarity in main.py 2026-02-07 02:25:31 +01:00
Chahrazad650
6366fcd8dd ajout fonction prix 2026-02-07 02:21:30 +01:00
Chahrazad650
5797b72cbc juste un test 2026-02-05 18:28:07 +01:00
5 changed files with 100 additions and 641 deletions

2
.gitignore vendored
View File

@@ -205,5 +205,3 @@ cython_debug/
marimo/_static/ marimo/_static/
marimo/_lsp/ marimo/_lsp/
__marimo__/ __marimo__/
*.csv

377
main.py
View File

@@ -1,161 +1,10 @@
from typing import cast from sys import stderr
from requests import HTTPError, Response, Session from typing import cast, Any, Dict, Optional
from requests import Response, Session
from bs4 import BeautifulSoup, Tag from bs4 import BeautifulSoup, Tag
from collections import OrderedDict
from json import JSONDecodeError, loads from json import JSONDecodeError, loads
class _ScraperData:
"""_summary_
"""
def __init__(self, data: dict[str, object]) -> None:
"""_summary_
Args:
data (dict[str, object]): _description_
"""
self._data: dict[str, object] = data
def _getcontent(self) -> dict[str, object] | None:
"""_summary_
Returns:
dict[str, object]: _description_
"""
current_data: dict[str, object] = self._data
for key in ["initialReduxState", "product", "content"]:
new_data: object | None = current_data.get(key)
if new_data is None:
return None
current_data: dict[str, object] = cast(dict[str, object], new_data)
return current_data
def _getattributes(self) -> dict[str, object] | None:
"""_summary_
Returns:
dict[str, object]: _description_
"""
current_data: object = self._getcontent()
if current_data is None:
return None
return cast(dict[str, object], current_data.get("attributes"))
def prix(self) -> float | None:
"""
Retourne le prix unitaire d'une bouteille (75cl).
Si aucun prix n'est disponible, retourne None.
"""
content = self._getcontent()
if content is None:
return None
items = content.get("items")
# Vérifie que items existe et n'est pas vide
if not isinstance(items, list) or len(items) == 0:
return None
prix_calcule: float | None = None
for item in items:
if not isinstance(item, dict):
continue
p = item.get("offerPrice")
attrs = item.get("attributes", {})
nbunit = attrs.get("nbunit", {}).get("value")
equivbtl = attrs.get("equivbtl", {}).get("value")
if not isinstance(p, (int, float)) or not nbunit or not equivbtl:
continue
nb = float(nbunit)
eq = float(equivbtl)
if nb <= 0 or eq <= 0:
continue
if nb == 1 and eq == 1:
return float(p)
prix_calcule = round(float(p) / (nb * eq), 2)
return prix_calcule
def appellation(self) -> str | None:
"""_summary_
Returns:
str: _description_
"""
attrs: dict[str, object] | None = self._getattributes()
if attrs is not None:
app_dict: object | None = attrs.get("appellation")
if isinstance(app_dict, dict):
return cast(str, app_dict.get("value"))
return None
def _getcritiques(self, name: str) -> str | None:
"""_summary_
Args:
name (str): _description_
Returns:
str | None: _description_
"""
current_value: dict[str, object] | None = self._getattributes()
if current_value is not None:
app_dict: dict[str, object] = cast(
dict[str, object], current_value.get(name)
)
if not app_dict:
return None
val = cast(str, app_dict.get("value")).rstrip("+").split("-")
if len(val) > 1 and val[1] != "":
val[0] = str((int(val[0]) + int(val[1])) / 2)
return val[0]
return None
def parker(self) -> str | None:
return self._getcritiques("note_rp")
def robinson(self) -> str | None:
return self._getcritiques("note_jr")
def suckling(self) -> str | None:
return self._getcritiques("note_js")
def getdata(self) -> dict[str, object]:
return self._data
def informations(self) -> str:
"""
Retourne toutes les informations sous la forme :
"Appelation,Parker,J.Robinson,J.Suckling,Prix"
"""
appellation = self.appellation()
parker = self.parker()
robinson = self.robinson()
suckling = self.suckling()
try:
prix = self.prix()
except ValueError:
prix = None
return f"{appellation},{parker},{robinson},{suckling},{prix}"
class Scraper: class Scraper:
""" """
Scraper est une classe qui permet de gerer Scraper est une classe qui permet de gerer
@@ -164,194 +13,98 @@ class Scraper:
""" """
def __init__(self) -> None: def __init__(self) -> None:
"""
Initialise la session de scraping.
"""
self._url: str = "https://www.millesima.fr/" self._url: str = "https://www.millesima.fr/"
# Très utile pour éviter de renvoyer toujours les mêmes handshake
# TCP et d'avoir toujours une connexion constante avec le server
self._session: Session = Session() self._session: Session = Session()
# Système de cache pour éviter de solliciter le serveur inutilement self._latest_request: tuple[(str, Response | None)] = ("", None)
self._latest_request: tuple[(str, Response)] | None = None
self._latest_soups: OrderedDict[str, BeautifulSoup] = OrderedDict[
str, BeautifulSoup
]()
def _request(self, subdir: str) -> Response: def _request(self, subdir: str) -> Response:
"""
Effectue une requête GET sur le serveur Millesima.
Args:
subdir (str): Le sous-répertoire ou chemin de l'URL (ex: "/vins").
Returns:
Response: L'objet réponse de la requête.
Raise:
HTTPError: Si le serveur renvoie un code d'erreur (4xx, 5xx).
"""
target_url: str = self._url + subdir.lstrip("/") target_url: str = self._url + subdir.lstrip("/")
response: Response = self._session.get(url=target_url, timeout=10) response: Response = self._session.get(url=target_url, timeout=10)
response.raise_for_status() response.raise_for_status()
return response return response
def getresponse(self, subdir: str = "", use_cache: bool = True) -> Response: def getresponse(self, subdir: str = "") -> Response:
"""
Récupère la réponse d'une page, en utilisant le cache si possible.
Args:
subdir (str, optional): Le chemin de la page.
use_cache (bool, optional): Utilise la donnée deja sauvegarder ou
écrase la donnée utilisé avec la nouvelle
Returns:
Response: L'objet réponse (cache ou nouvelle requête).
Raise:
HTTPError: Si le serveur renvoie un code d'erreur (4xx, 5xx).
"""
# si dans le cache, latest_request existe
if use_cache and self._latest_request is not None:
rq_subdir, rq_response = self._latest_request rq_subdir, rq_response = self._latest_request
if rq_response is None or subdir != rq_subdir:
# si c'est la meme requete et que use_cache est true, request: Response = self._request(subdir)
# on renvoie celle enregistrer self._latest_request = (subdir, request)
if subdir == rq_subdir: return request
return rq_response return rq_response
request: Response = self._request(subdir) def getsoup(self, subdir: str = "") -> BeautifulSoup:
# on recrée la structure pour le systeme de cache si activer
if use_cache:
self._latest_request = (subdir, request)
return request
def getsoup(self, subdir: str, use_cache: bool = True) -> BeautifulSoup:
"""
Récupère le contenu HTML d'une page et le transforme en objet BeautifulSoup.
Args:
subdir (str, optional): Le chemin de la page.
Returns:
BeautifulSoup: L'objet parsé pour extraction de données.
Raise:
HTTPError: Si le serveur renvoie un code d'erreur (4xx, 5xx).
"""
if use_cache and subdir in self._latest_soups:
return self._latest_soups[subdir]
markup: str = self.getresponse(subdir).text markup: str = self.getresponse(subdir).text
soup: BeautifulSoup = BeautifulSoup(markup, features="html.parser") return BeautifulSoup(markup, features="html.parser")
if use_cache: def getjsondata(self, subdir: str = "", id: str = "__NEXT_DATA__") -> dict[str, object]:
self._latest_soups[subdir] = soup
if len(self._latest_soups) > 10:
_ = self._latest_soups.popitem(last=False)
return soup
def getjsondata(self, subdir: str, id: str = "__NEXT_DATA__") -> _ScraperData:
"""
Extrait les données JSON contenues dans la balise __NEXT_DATA__ du site.
Beaucoup de sites modernes (Next.js) stockent leur état initial dans
une balise <script> pour l'hydratation côté client.
Args:
subdir (str): Le chemin de la page.
id (str, optional): L'identifiant de la balise script (par défaut __NEXT_DATA__).
Raises:
HTTPError: Soulevée par `getresponse` si le serveur renvoie un code d'erreur (4xx, 5xx).
JSONDecodeError: Soulevée par `loads` si le contenu de la balise n'est pas un JSON valide.
ValueError: Soulevée manuellement si l'une des clés attendues (props, pageProps, etc.)
est absente de la structure JSON.
Returns:
dict[str, object]: Un dictionnaire contenant les données utiles
ou un dictionnaire vide en cas d'erreur.
"""
soup: BeautifulSoup = self.getsoup(subdir) soup: BeautifulSoup = self.getsoup(subdir)
script: Tag | None = soup.find("script", id=id) script: Tag | None = soup.find("script", id=id)
if script is None or not script.string: if isinstance(script, Tag) and script.string:
raise ValueError(f"le script id={id} est introuvable") try:
current_data: object = loads(script.string)
current_data: object = cast(object, loads(script.string)) keys: list[str] = ["props", "pageProps", "initialReduxState", "product", "content"]
for key in keys:
for key in ["props", "pageProps"]:
if isinstance(current_data, dict) and key in current_data: if isinstance(current_data, dict) and key in current_data:
current_data = cast(object, current_data[key]) current_data = current_data[key]
continue else:
raise ValueError(f"Clé manquante dans le JSON : {key}") raise ValueError(f"Clé manquante dans le JSON : {key}")
return _ScraperData(cast(dict[str, object], current_data)) if isinstance(current_data, dict):
return cast(dict[str, object], current_data)
def _geturlproductslist(self, subdir: str): except (JSONDecodeError, ValueError) as e:
"""_summary_ print(f"Erreur lors de l'extraction JSON : {e}", file=stderr)
return {}
Args: def prix(self, subdir: str) -> float:
subdir (str): _description_
Returns:
_type_: _description_
""" """
try: Retourne le prix d'une bouteille (75cl).
data: dict[str, object] = self.getjsondata(subdir).getdata() Les données récupérées depuis le site contiennent plusieurs formats
de vente dans la liste "items" :
- bouteille seule si nbunit=1 et equivbtl=1
-> prix direct (format vendu à l'unité).
- caisse de plusieurs bouteilles si nbunit=1
-> prix direct (format vendu à l'unité).
- formats spéciaux (magnum, impériale, etc.)sinon
-> calcul du prix unitaire : offerPrice / (nbunit * equivbtl)
for element in ["initialReduxState", "categ", "content"]: Chaque item possède notamment :
data: dict[str, object] = cast(dict[str, object], data.get(element)) - offerPrice : prix total du format proposé
if not isinstance(data, dict): - nbunit : nombre d'unités dans le format
return None - equivbtl : équivalent en nombre de bouteilles standard (75cl)
products: list[str] = cast(list[str], data.get("products"))
if isinstance(products, list):
return products
except (JSONDecodeError, HTTPError):
return None
def getvins(self, subdir: str, filename: str):
"""_summary_
Args:
subdir (str): _description_
filename (str): _description_
""" """
with open(filename, "a") as f: data = self.getjsondata(subdir)
cache: set[str] = set[str]()
page = 0
while True: items = data.get("items")
page += 1 if not isinstance(items, list) or len(items) == 0:
products_list = self._geturlproductslist(f"{subdir}?page={page}") raise ValueError("Aucun prix disponible (items vide).")
if not products_list: # 1) bouteille 75cl (nbunit=1 et equivbtl=1)
break for item in items:
if not isinstance(item, dict):
products_list_length = len(products_list)
for i, product in enumerate(products_list):
if not isinstance(product, dict):
continue continue
attrs = item.get("attributes", {})
nbunit = attrs.get("nbunit", {}).get("value")
equivbtl = attrs.get("equivbtl", {}).get("value")
link = product.get("seoKeyword") if nbunit == "1" and equivbtl == "1":
p = item.get("offerPrice")
if isinstance(p, (int, float)):
return float(p)
if link and link not in cache: # 2) calcul depuis caisse
try: for item in items:
infos = self.getjsondata(link).informations() if not isinstance(item, dict):
_ = f.write(infos + "\n") continue
print( p = item.get("offerPrice")
f"page: {page} | {i + 1}/{products_list_length} {link}" attrs = item.get("attributes", {})
) nbunit = attrs.get("nbunit", {}).get("value")
cache.add(link) equivbtl = attrs.get("equivbtl", {}).get("value")
except (JSONDecodeError, HTTPError) as e:
print(f"Erreur sur le produit {link}: {e}")
f.flush()
if isinstance(p, (int, float)) and nbunit and equivbtl:
denom = float(nbunit) * float(equivbtl)
if denom > 0:
return round(float(p) / denom, 2)
if __name__ == "__main__": raise ValueError("Impossible de trouver le prix unitaire.")
Scraper().getvins("bordeaux.html", "donnee.csv")

BIN
projet.pdf Normal file

Binary file not shown.

View File

@@ -1,3 +1,2 @@
requests>=2.32.5 requests>=2.32.5
requests-mock>=1.12.1
beautifulsoup4>=4.14.3 beautifulsoup4>=4.14.3

View File

@@ -1,326 +1,35 @@
from json import dumps import json
from unittest.mock import patch, mock_open
import pytest
from requests_mock import Mocker
from main import Scraper from main import Scraper
@pytest.fixture(autouse=True) def test_json():
def mock_site(): scraper = Scraper()
with Mocker() as m:
m.get(
"https://www.millesima.fr/",
text=f"""
<html>
<body>
<script id="__NEXT_DATA__" type="application/json">
{dumps({
"props": {
"pageProps": {
"initialReduxState": {
"product": {
"content": {
"items": [],
"attributes": {}
}
}
}
}
}
})}
</script>
</body>
</html>
""",
)
m.get( data = scraper.getjsondata("/chateau-gloria-2016.html")
"https://www.millesima.fr/poubelle",
text=f"""
<html>
<body>
<h1>POUBELLE</h1>
<script id="__NEXT_DATA__" type="application/json">
{dumps({
"props": {
"pageProps": {
}
}
})}
</script>
</body>
</html>
""",
)
json_data = { print("JSON récupéré :")
"props": { print(json.dumps(data, indent=4, ensure_ascii=False))
"pageProps": {
"initialReduxState": {
"product": {
"content": {
"_id": "J4131/22-11652",
"partnumber": "J4131/22",
"productName": "Nino Negri : 5 Stelle Sfursat 2022",
"productNameForSearch": "Nino Negri : 5 Stelle Sfursat 2022",
"storeId": "11652",
"seoKeyword": "nino-negri-5-stelle-sfursat-2022.html",
"title": "Nino Negri : 5 Stelle Sfursat 2022",
"items": [
{
"_id": "J4131/22/C/CC/6-11652",
"partnumber": "J4131/22/C/CC/6",
"taxRate": "H",
"listPrice": 842,
"offerPrice": 842,
"seoKeyword": "vin-de-charazade1867.html",
"shortdesc": "Une bouteille du meilleur vin du monde?",
"attributes": {
"promotion_o_n": {
"valueId": "0",
"name": "En promotion",
"value": "Non",
"sequence": 80,
"displayable": "False",
"type": "CHECKBOX",
"isSpirit": False,
},
"in_stock": {
"valueId": "L",
"name": "En stock",
"value": "Livrable",
"sequence": 65,
"displayable": "true",
"type": "CHECKBOX",
"isSpirit": False,
},
"equivbtl": {
"valueId": "1",
"name": "equivbtl",
"value": "1",
"isSpirit": False,
},
"nbunit": {
"valueId": "1",
"name": "nbunit",
"value": "1",
"isSpirit": False,
},
},
"stock": 12,
"availability": "2026-02-05",
"isCustomizable": False,
"gtin_cond": "",
"gtin_unit": "",
"stockOrigin": "EUR",
"isPrevSale": False,
}
],
"attributes": {
"appellation": {
"valueId": "433",
"name": "Appellation",
"value": "Madame-Loïk",
"url": "Madame-loik.html",
"isSpirit": False,
"groupIdentifier": "appellation_433",
},
"note_rp": {
"valueId": "91",
"name": "Peter Parker",
"value": "91",
"isSpirit": False,
},
"note_jr": {
"valueId": "17+",
"name": "J. Robinson",
"value": "17+",
"isSpirit": False,
},
"note_js": {
"valueId": "93-94",
"name": "J. cherazade",
"value": "93-94",
"isSpirit": False,
},
},
}
}
}
}
}
}
html_product = f""" assert isinstance(data, dict)
<html> assert "items" in data
<body>
<h1>MILLESIMA</h1>
<script id="__NEXT_DATA__" type="application/json">
{dumps(json_data)}
</script>
</body>
</html>
"""
m.get(
"https://www.millesima.fr/nino-negri-5-stelle-sfursat-2022.html",
text=html_product,
)
html_product = f"""
<html>
<body>
<h1>MILLESIMA</h1>
<script id="__NEXT_DATA__" type="application/json">
{dumps(json_data)}
</script>
</body>
</html>
"""
list_pleine = f"""
<html>
<body>
<h1>LE WINE</h1>
<script id="__NEXT_DATA__" type="application/json">
{dumps({
"props": {
"pageProps": {
"initialReduxState": {
"categ": {
"content": {
"products": [
{"seoKeyword": "/nino-negri-5-stelle-sfursat-2022.html",},
{"seoKeyword": "/poubelle",},
{"seoKeyword": "/",}
]
}
}
}
}
}
}
)}
</script>
</body>
</html>
"""
list_vide = f"""
<html>
<body>
<h1>LE WINE</h1>
<script id="__NEXT_DATA__" type="application/json">
{dumps({
"props": {
"pageProps": {
"initialReduxState": {
"categ": {
"content": {
"products": [
]
}
}
}
}
}
}
)}
</script>
</body>
</html>
"""
m.get(
"https://www.millesima.fr/wine.html",
complete_qs=False,
response_list=[
{"text": list_pleine},
{"text": list_vide},
],
)
# on return m sans fermer le server qui simule la page
yield m
@pytest.fixture def test_prix():
def scraper() -> Scraper: scraper = Scraper()
return Scraper()
try:
p = scraper.prix("/chateau-saint-pierre-2011.html")
print("Prix unitaire =", p)
assert isinstance(p, float)
assert p > 0
except ValueError:
# le vin n'est pas disponible à la vente
print("OK : aucun prix (vin indisponible, items vide)")
def test_soup(scraper: Scraper): if __name__ == "__main__":
vide = scraper.getsoup("") test_json()
poubelle = scraper.getsoup("poubelle") test_prix()
contenu = scraper.getsoup("nino-negri-5-stelle-sfursat-2022.html") print("\nTous les tests terminés")
assert vide.find("h1") is None
assert str(poubelle.find("h1")) == "<h1>POUBELLE</h1>"
assert str(contenu.find("h1")) == "<h1>MILLESIMA</h1>"
def test_appellation(scraper: Scraper):
vide = scraper.getjsondata("")
poubelle = scraper.getjsondata("poubelle")
contenu = scraper.getjsondata("nino-negri-5-stelle-sfursat-2022.html")
assert vide.appellation() is None
assert poubelle.appellation() is None
assert contenu.appellation() == "Madame-Loïk"
def test_fonctionprivee(scraper: Scraper):
vide = scraper.getjsondata("")
poubelle = scraper.getjsondata("poubelle")
contenu = scraper.getjsondata("nino-negri-5-stelle-sfursat-2022.html")
assert vide._getattributes() is not None
assert vide._getattributes() == {}
assert vide._getcontent() is not None
assert vide._getcontent() == {"items": [], "attributes": {}}
assert poubelle._getattributes() is None
assert poubelle._getcontent() is None
assert contenu._getcontent() is not None
assert contenu._getattributes() is not None
def test_critiques(scraper: Scraper):
vide = scraper.getjsondata("")
poubelle = scraper.getjsondata("poubelle")
contenu = scraper.getjsondata("nino-negri-5-stelle-sfursat-2022.html")
assert vide.parker() is None
assert vide.robinson() is None
assert vide.suckling() is None
assert vide._getcritiques("test_ts") is None
assert poubelle.parker() is None
assert poubelle.robinson() is None
assert poubelle.suckling() is None
assert poubelle._getcritiques("test_ts") is None
assert contenu.parker() == "91"
assert contenu.robinson() == "17"
assert contenu.suckling() == "93.5"
assert contenu._getcritiques("test_ts") is None
def test_prix(scraper: Scraper):
vide = scraper.getjsondata("")
poubelle = scraper.getjsondata("poubelle")
contenu = scraper.getjsondata("nino-negri-5-stelle-sfursat-2022.html")
assert vide.prix() is None
assert poubelle.prix() is None
assert contenu.prix() == 842.0
def test_informations(scraper: Scraper):
contenu = scraper.getjsondata("nino-negri-5-stelle-sfursat-2022.html")
assert contenu.informations() == "Madame-Loïk,91,17,93.5,842.0"
vide = scraper.getjsondata("")
poubelle = scraper.getjsondata("poubelle")
assert vide.informations() == "None,None,None,None,None"
assert poubelle.informations() == "None,None,None,None,None"
def test_search(scraper: Scraper):
m = mock_open()
with patch("builtins.open", m):
scraper.getvins("wine.html", "fake_file.csv")
assert m().write.called
all_writes = "".join(call.args[0] for call in m().write.call_args_list)
assert "Madame-Loïk,91,17,93.5,842.0" in all_writes