21 Commits

Author SHA1 Message Date
a33b484dea ajout: test fonction 7 2026-02-13 17:52:11 +01:00
dd430b9861 ajout(main.py): ajout dans csv 2026-02-11 23:58:51 +01:00
011bb6a689 ajout(main.py): optimisation des fonctions 2026-02-11 23:46:22 +01:00
96dbaaaaf6 ajout: fonctions de recherche 2026-02-11 23:35:56 +01:00
ed86e588f7 merge exo2 et commentaire exo7 2026-02-11 23:20:20 +01:00
Chahrazad650
cd1e266f25 optimisation fonction prix() 2026-02-10 19:57:20 +01:00
Chahrazad650
2aa99453a0 modefication fonction prix() return None+tests 2026-02-10 19:39:47 +01:00
9f1ff1ef7b ajout(main.py): initialise la fonction getvin 2026-02-10 19:01:10 +01:00
Chahrazad650
bfc39db652 ajout de la fonction informations 2026-02-10 01:25:00 +01:00
Chahrazad650
717fce6ca4 Modification de la fonction prix() 2026-02-10 00:56:39 +01:00
9914e8af41 Merge branch 'optimisation' into exo7 2026-02-09 23:46:33 +01:00
Chahrazad650
5785d571b2 Merge branch 'optimisation' of https://github.com/guezoloic/millesima_projetS6 into optimisation 2026-02-09 23:14:27 +01:00
ae66e94d6c fix: correction et ajout de tests 2026-02-09 23:11:15 +01:00
2bc5d57a31 ajout(main): fonction acceder 2026-02-09 22:09:28 +01:00
8f21e48b28 ajout: changement des fonctions pour retourner un None si err 2026-02-09 21:42:03 +01:00
Chahrazad650
9e0cb9737e ajout +attributs sur json_data 2026-02-09 21:10:33 +01:00
c62a5e6a76 fix(test_main): ScraperData n'existe plus 2026-02-09 20:27:06 +01:00
9da0159869 fix(main.py): changement donnée data 2026-02-08 22:47:10 +01:00
5c22777c2d ajout(main.py): meilleur systeme de cache 2026-02-08 19:03:49 +01:00
0d78b1aec3 ajout EXO4 2026-02-07 23:11:12 +01:00
168ccf88dc ajout fonction getvin et robinson / suckling 2026-02-07 23:03:11 +01:00
4 changed files with 481 additions and 88 deletions

2
.gitignore vendored
View File

@@ -205,3 +205,5 @@ cython_debug/
marimo/_static/ marimo/_static/
marimo/_lsp/ marimo/_lsp/
__marimo__/ __marimo__/
*.csv

303
main.py
View File

@@ -1,35 +1,160 @@
from sys import stderr
from typing import cast from typing import cast
from requests import Response, Session from requests import HTTPError, Response, Session
from bs4 import BeautifulSoup, Tag from bs4 import BeautifulSoup, Tag
from collections import OrderedDict
from json import JSONDecodeError, loads from json import JSONDecodeError, loads
class ScraperData: class _ScraperData:
"""_summary_
"""
def __init__(self, data: dict[str, object]) -> None: def __init__(self, data: dict[str, object]) -> None:
if not data: """_summary_
raise ValueError("Données insuffisantes pour créer un ScraperData.")
Args:
data (dict[str, object]): _description_
"""
self._data: dict[str, object] = data self._data: dict[str, object] = data
def _getcontent(self) -> dict[str, object] | None:
"""_summary_
Returns:
dict[str, object]: _description_
"""
current_data: dict[str, object] = self._data
for key in ["initialReduxState", "product", "content"]:
new_data: object | None = current_data.get(key)
if new_data is None:
return None
current_data: dict[str, object] = cast(dict[str, object], new_data)
return current_data
def _getattributes(self) -> dict[str, object] | None: def _getattributes(self) -> dict[str, object] | None:
current_data: object = self._data.get("attributes") """_summary_
if isinstance(current_data, dict):
return cast(dict[str, object], current_data) Returns:
return None dict[str, object]: _description_
"""
current_data: object = self._getcontent()
if current_data is None:
return None
return cast(dict[str, object], current_data.get("attributes"))
def prix(self) -> float | None:
"""
Retourne le prix unitaire d'une bouteille (75cl).
Si aucun prix n'est disponible, retourne None.
"""
content = self._getcontent()
if content is None:
return None
items = content.get("items")
# Vérifie que items existe et n'est pas vide
if not isinstance(items, list) or len(items) == 0:
return None
prix_calcule: float | None = None
for item in items:
if not isinstance(item, dict):
continue
p = item.get("offerPrice")
attrs = item.get("attributes", {})
nbunit = attrs.get("nbunit", {}).get("value")
equivbtl = attrs.get("equivbtl", {}).get("value")
if not isinstance(p, (int, float)) or not nbunit or not equivbtl:
continue
nb = float(nbunit)
eq = float(equivbtl)
if nb <= 0 or eq <= 0:
continue
if nb == 1 and eq == 1:
return float(p)
prix_calcule = round(float(p) / (nb * eq), 2)
return prix_calcule
def appellation(self) -> str | None: def appellation(self) -> str | None:
current_value: dict[str, object] | None = self._getattributes() """_summary_
if current_value is not None:
app_dict: dict[str, object] = cast( Returns:
dict[str, object], current_value.get("appellation") str: _description_
) """
if app_dict: attrs: dict[str, object] | None = self._getattributes()
if attrs is not None:
app_dict: object | None = attrs.get("appellation")
if isinstance(app_dict, dict):
return cast(str, app_dict.get("value")) return cast(str, app_dict.get("value"))
return None return None
def _getcritiques(self, name: str) -> str | None:
"""_summary_
Args:
name (str): _description_
Returns:
str | None: _description_
"""
current_value: dict[str, object] | None = self._getattributes()
if current_value is not None:
app_dict: dict[str, object] = cast(
dict[str, object], current_value.get(name)
)
if not app_dict:
return None
val = cast(str, app_dict.get("value")).rstrip("+").split("-")
if len(val) > 1 and val[1] != "":
val[0] = str((int(val[0]) + int(val[1])) / 2)
return val[0]
return None
def parker(self) -> str | None:
return self._getcritiques("note_rp")
def robinson(self) -> str | None:
return self._getcritiques("note_jr")
def suckling(self) -> str | None:
return self._getcritiques("note_js")
def getdata(self) -> dict[str, object]: def getdata(self) -> dict[str, object]:
return self._data return self._data
def informations(self) -> str:
"""
Retourne toutes les informations sous la forme :
"Appelation,Parker,J.Robinson,J.Suckling,Prix"
"""
appellation = self.appellation()
parker = self.parker()
robinson = self.robinson()
suckling = self.suckling()
try:
prix = self.prix()
except ValueError:
prix = None
return f"{appellation},{parker},{robinson},{suckling},{prix}"
class Scraper: class Scraper:
""" """
@@ -47,8 +172,10 @@ class Scraper:
# TCP et d'avoir toujours une connexion constante avec le server # TCP et d'avoir toujours une connexion constante avec le server
self._session: Session = Session() self._session: Session = Session()
# Système de cache pour éviter de solliciter le serveur inutilement # Système de cache pour éviter de solliciter le serveur inutilement
self._latest_request: tuple[(str, Response | None)] = ("", None) self._latest_request: tuple[(str, Response)] | None = None
self._latest_soup: tuple[(str, BeautifulSoup | None)] = ("", None) self._latest_soups: OrderedDict[str, BeautifulSoup] = OrderedDict[
str, BeautifulSoup
]()
def _request(self, subdir: str) -> Response: def _request(self, subdir: str) -> Response:
""" """
@@ -68,12 +195,14 @@ class Scraper:
response.raise_for_status() response.raise_for_status()
return response return response
def getresponse(self, subdir: str = "") -> Response: def getresponse(self, subdir: str = "", use_cache: bool = True) -> Response:
""" """
Récupère la réponse d'une page, en utilisant le cache si possible. Récupère la réponse d'une page, en utilisant le cache si possible.
Args: Args:
subdir (str, optional): Le chemin de la page. subdir (str, optional): Le chemin de la page.
use_cache (bool, optional): Utilise la donnée deja sauvegarder ou
écrase la donnée utilisé avec la nouvelle
Returns: Returns:
Response: L'objet réponse (cache ou nouvelle requête). Response: L'objet réponse (cache ou nouvelle requête).
@@ -81,16 +210,24 @@ class Scraper:
Raise: Raise:
HTTPError: Si le serveur renvoie un code d'erreur (4xx, 5xx). HTTPError: Si le serveur renvoie un code d'erreur (4xx, 5xx).
""" """
rq_subdir, rq_response = self._latest_request
if rq_response is not None and subdir == rq_subdir: # si dans le cache, latest_request existe
return rq_response if use_cache and self._latest_request is not None:
rq_subdir, rq_response = self._latest_request
# si c'est la meme requete et que use_cache est true,
# on renvoie celle enregistrer
if subdir == rq_subdir:
return rq_response
request: Response = self._request(subdir) request: Response = self._request(subdir)
self._latest_request = (subdir, request) # on recrée la structure pour le systeme de cache si activer
if use_cache:
self._latest_request = (subdir, request)
return request return request
def getsoup(self, subdir: str = "") -> BeautifulSoup: def getsoup(self, subdir: str, use_cache: bool = True) -> BeautifulSoup:
""" """
Récupère le contenu HTML d'une page et le transforme en objet BeautifulSoup. Récupère le contenu HTML d'une page et le transforme en objet BeautifulSoup.
@@ -103,26 +240,29 @@ class Scraper:
Raise: Raise:
HTTPError: Si le serveur renvoie un code d'erreur (4xx, 5xx). HTTPError: Si le serveur renvoie un code d'erreur (4xx, 5xx).
""" """
rq_subdir, rq_soup = self._latest_soup
if rq_soup is not None and subdir == rq_subdir: if use_cache and subdir in self._latest_soups:
return rq_soup return self._latest_soups[subdir]
soup: BeautifulSoup = BeautifulSoup( markup: str = self.getresponse(subdir).text
markup=self.getresponse(subdir).text, features="html.parser" soup: BeautifulSoup = BeautifulSoup(markup, features="html.parser")
)
if use_cache:
self._latest_soups[subdir] = soup
if len(self._latest_soups) > 10:
_ = self._latest_soups.popitem(last=False)
self._latest_soup = (subdir, soup)
return soup return soup
def getjsondata(self, subdir: str = "", id: str = "__NEXT_DATA__") -> ScraperData: def getjsondata(self, subdir: str, id: str = "__NEXT_DATA__") -> _ScraperData:
""" """
Extrait les données JSON contenues dans la balise __NEXT_DATA__ du site. Extrait les données JSON contenues dans la balise __NEXT_DATA__ du site.
Beaucoup de sites modernes (Next.js) stockent leur état initial dans Beaucoup de sites modernes (Next.js) stockent leur état initial dans
une balise <script> pour l'hydratation côté client. une balise <script> pour l'hydratation côté client.
Args: Args:
subdir (str, optional): Le chemin de la page. subdir (str): Le chemin de la page.
id (str, optional): L'identifiant de la balise script (par défaut __NEXT_DATA__). id (str, optional): L'identifiant de la balise script (par défaut __NEXT_DATA__).
Raises: Raises:
@@ -138,29 +278,80 @@ class Scraper:
soup: BeautifulSoup = self.getsoup(subdir) soup: BeautifulSoup = self.getsoup(subdir)
script: Tag | None = soup.find("script", id=id) script: Tag | None = soup.find("script", id=id)
if isinstance(script, Tag) and script.string: if script is None or not script.string:
try: raise ValueError(f"le script id={id} est introuvable")
current_data: object = loads(script.string)
# tout le chemin à parcourir pour arriver au données
# (plein d'information inutile)
keys: list[str] = [
"props",
"pageProps",
"initialReduxState",
"product",
"content",
]
for key in keys:
# si current_data est bien un dictionnaire et que la clé
# est bien dedans
if isinstance(current_data, dict) and key in current_data:
current_data: object = current_data[key]
else:
raise ValueError(f"Clé manquante dans le JSON : {key}")
if isinstance(current_data, dict): current_data: object = cast(object, loads(script.string))
return ScraperData(data=cast(dict[str, object], current_data))
except (JSONDecodeError, ValueError) as e: for key in ["props", "pageProps"]:
print(f"Erreur lors de l'extraction JSON : {e}", file=stderr) if isinstance(current_data, dict) and key in current_data:
return ScraperData({}) current_data = cast(object, current_data[key])
continue
raise ValueError(f"Clé manquante dans le JSON : {key}")
return _ScraperData(cast(dict[str, object], current_data))
def _geturlproductslist(self, subdir: str):
"""_summary_
Args:
subdir (str): _description_
Returns:
_type_: _description_
"""
try:
data: dict[str, object] = self.getjsondata(subdir).getdata()
for element in ["initialReduxState", "categ", "content"]:
data: dict[str, object] = cast(dict[str, object], data.get(element))
if not isinstance(data, dict):
return None
products: list[str] = cast(list[str], data.get("products"))
if isinstance(products, list):
return products
except (JSONDecodeError, HTTPError):
return None
def getvins(self, subdir: str, filename: str):
"""_summary_
Args:
subdir (str): _description_
filename (str): _description_
"""
with open(filename, "a") as f:
cache: set[str] = set[str]()
page = 0
while True:
page += 1
products_list = self._geturlproductslist(f"{subdir}?page={page}")
if not products_list:
break
products_list_length = len(products_list)
for i, product in enumerate(products_list):
if not isinstance(product, dict):
continue
link = product.get("seoKeyword")
if link and link not in cache:
try:
infos = self.getjsondata(link).informations()
_ = f.write(infos + "\n")
print(
f"page: {page} | {i + 1}/{products_list_length} {link}"
)
cache.add(link)
except (JSONDecodeError, HTTPError) as e:
print(f"Erreur sur le produit {link}: {e}")
f.flush()
if __name__ == "__main__":
Scraper().getvins("bordeaux.html", "donnee.csv")

View File

@@ -1,4 +1,3 @@
requests>=2.32.5 requests>=2.32.5
requests-mock>=1.12.1 requests-mock>=1.12.1
beautifulsoup4>=4.14.3 beautifulsoup4>=4.14.3

View File

@@ -1,8 +1,8 @@
from json import dumps from json import dumps
from bs4 import Tag from unittest.mock import patch, mock_open
import pytest import pytest
from requests_mock import Mocker from requests_mock import Mocker
from main import Scraper, ScraperData from main import Scraper
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
@@ -10,7 +10,47 @@ def mock_site():
with Mocker() as m: with Mocker() as m:
m.get( m.get(
"https://www.millesima.fr/", "https://www.millesima.fr/",
text="<html><body><h1>MILLESIMA</h1></body></html>", text=f"""
<html>
<body>
<script id="__NEXT_DATA__" type="application/json">
{dumps({
"props": {
"pageProps": {
"initialReduxState": {
"product": {
"content": {
"items": [],
"attributes": {}
}
}
}
}
}
})}
</script>
</body>
</html>
""",
)
m.get(
"https://www.millesima.fr/poubelle",
text=f"""
<html>
<body>
<h1>POUBELLE</h1>
<script id="__NEXT_DATA__" type="application/json">
{dumps({
"props": {
"pageProps": {
}
}
})}
</script>
</body>
</html>
""",
) )
json_data = { json_data = {
@@ -31,17 +71,17 @@ def mock_site():
"_id": "J4131/22/C/CC/6-11652", "_id": "J4131/22/C/CC/6-11652",
"partnumber": "J4131/22/C/CC/6", "partnumber": "J4131/22/C/CC/6",
"taxRate": "H", "taxRate": "H",
"listPrice": 390, "listPrice": 842,
"offerPrice": 390, "offerPrice": 842,
"seoKeyword": "nino-negri-5-stelle-sfursat-2022-c-cc-6.html", "seoKeyword": "vin-de-charazade1867.html",
"shortdesc": "Un carton de 6 Bouteilles (75cl)", "shortdesc": "Une bouteille du meilleur vin du monde?",
"attributes": { "attributes": {
"promotion_o_n": { "promotion_o_n": {
"valueId": "0", "valueId": "0",
"name": "En promotion", "name": "En promotion",
"value": "Non", "value": "Non",
"sequence": 80, "sequence": 80,
"displayable": "false", "displayable": "False",
"type": "CHECKBOX", "type": "CHECKBOX",
"isSpirit": False, "isSpirit": False,
}, },
@@ -54,6 +94,18 @@ def mock_site():
"type": "CHECKBOX", "type": "CHECKBOX",
"isSpirit": False, "isSpirit": False,
}, },
"equivbtl": {
"valueId": "1",
"name": "equivbtl",
"value": "1",
"isSpirit": False,
},
"nbunit": {
"valueId": "1",
"name": "nbunit",
"value": "1",
"isSpirit": False,
},
}, },
"stock": 12, "stock": 12,
"availability": "2026-02-05", "availability": "2026-02-05",
@@ -68,11 +120,29 @@ def mock_site():
"appellation": { "appellation": {
"valueId": "433", "valueId": "433",
"name": "Appellation", "name": "Appellation",
"value": "Sforzato di Valtellina", "value": "Madame-Loïk",
"url": "sforzato-di-valtellina.html", "url": "Madame-loik.html",
"isSpirit": False, "isSpirit": False,
"groupIdentifier": "appellation_433", "groupIdentifier": "appellation_433",
}, },
"note_rp": {
"valueId": "91",
"name": "Peter Parker",
"value": "91",
"isSpirit": False,
},
"note_jr": {
"valueId": "17+",
"name": "J. Robinson",
"value": "17+",
"isSpirit": False,
},
"note_js": {
"valueId": "93-94",
"name": "J. cherazade",
"value": "93-94",
"isSpirit": False,
},
}, },
} }
} }
@@ -83,10 +153,12 @@ def mock_site():
html_product = f""" html_product = f"""
<html> <html>
<script id="__NEXT_DATA__" type="application/json"> <body>
{dumps(json_data)} <h1>MILLESIMA</h1>
</script> <script id="__NEXT_DATA__" type="application/json">
</body> {dumps(json_data)}
</script>
</body>
</html> </html>
""" """
m.get( m.get(
@@ -94,6 +166,79 @@ def mock_site():
text=html_product, text=html_product,
) )
html_product = f"""
<html>
<body>
<h1>MILLESIMA</h1>
<script id="__NEXT_DATA__" type="application/json">
{dumps(json_data)}
</script>
</body>
</html>
"""
list_pleine = f"""
<html>
<body>
<h1>LE WINE</h1>
<script id="__NEXT_DATA__" type="application/json">
{dumps({
"props": {
"pageProps": {
"initialReduxState": {
"categ": {
"content": {
"products": [
{"seoKeyword": "/nino-negri-5-stelle-sfursat-2022.html",},
{"seoKeyword": "/poubelle",},
{"seoKeyword": "/",}
]
}
}
}
}
}
}
)}
</script>
</body>
</html>
"""
list_vide = f"""
<html>
<body>
<h1>LE WINE</h1>
<script id="__NEXT_DATA__" type="application/json">
{dumps({
"props": {
"pageProps": {
"initialReduxState": {
"categ": {
"content": {
"products": [
]
}
}
}
}
}
}
)}
</script>
</body>
</html>
"""
m.get(
"https://www.millesima.fr/wine.html",
complete_qs=False,
response_list=[
{"text": list_pleine},
{"text": list_vide},
],
)
# on return m sans fermer le server qui simule la page # on return m sans fermer le server qui simule la page
yield m yield m
@@ -104,22 +249,78 @@ def scraper() -> Scraper:
def test_soup(scraper: Scraper): def test_soup(scraper: Scraper):
h1: Tag | None = scraper.getsoup().find("h1") vide = scraper.getsoup("")
poubelle = scraper.getsoup("poubelle")
assert isinstance(h1, Tag) contenu = scraper.getsoup("nino-negri-5-stelle-sfursat-2022.html")
assert h1.text == "MILLESIMA" assert vide.find("h1") is None
assert str(poubelle.find("h1")) == "<h1>POUBELLE</h1>"
assert str(contenu.find("h1")) == "<h1>MILLESIMA</h1>"
# def test_getProductName(scraper: Scraper):
# jsondata = scraper.getjsondata("nino-negri-5-stelle-sfursat-2022.html")
# assert jsondata["productName"] == "Nino Negri : 5 Stelle Sfursat 2022"
# assert isinstance(jsondata["items"], list)
# assert len(jsondata["items"]) > 0
# assert jsondata["items"][0]["offerPrice"] == 390
def test_appellation(scraper: Scraper): def test_appellation(scraper: Scraper):
appellation: ScraperData = scraper.getjsondata( vide = scraper.getjsondata("")
"nino-negri-5-stelle-sfursat-2022.html" poubelle = scraper.getjsondata("poubelle")
) contenu = scraper.getjsondata("nino-negri-5-stelle-sfursat-2022.html")
assert appellation.appellation() == "Sforzato di Valtellina" assert vide.appellation() is None
assert poubelle.appellation() is None
assert contenu.appellation() == "Madame-Loïk"
def test_fonctionprivee(scraper: Scraper):
vide = scraper.getjsondata("")
poubelle = scraper.getjsondata("poubelle")
contenu = scraper.getjsondata("nino-negri-5-stelle-sfursat-2022.html")
assert vide._getattributes() is not None
assert vide._getattributes() == {}
assert vide._getcontent() is not None
assert vide._getcontent() == {"items": [], "attributes": {}}
assert poubelle._getattributes() is None
assert poubelle._getcontent() is None
assert contenu._getcontent() is not None
assert contenu._getattributes() is not None
def test_critiques(scraper: Scraper):
vide = scraper.getjsondata("")
poubelle = scraper.getjsondata("poubelle")
contenu = scraper.getjsondata("nino-negri-5-stelle-sfursat-2022.html")
assert vide.parker() is None
assert vide.robinson() is None
assert vide.suckling() is None
assert vide._getcritiques("test_ts") is None
assert poubelle.parker() is None
assert poubelle.robinson() is None
assert poubelle.suckling() is None
assert poubelle._getcritiques("test_ts") is None
assert contenu.parker() == "91"
assert contenu.robinson() == "17"
assert contenu.suckling() == "93.5"
assert contenu._getcritiques("test_ts") is None
def test_prix(scraper: Scraper):
vide = scraper.getjsondata("")
poubelle = scraper.getjsondata("poubelle")
contenu = scraper.getjsondata("nino-negri-5-stelle-sfursat-2022.html")
assert vide.prix() is None
assert poubelle.prix() is None
assert contenu.prix() == 842.0
def test_informations(scraper: Scraper):
contenu = scraper.getjsondata("nino-negri-5-stelle-sfursat-2022.html")
assert contenu.informations() == "Madame-Loïk,91,17,93.5,842.0"
vide = scraper.getjsondata("")
poubelle = scraper.getjsondata("poubelle")
assert vide.informations() == "None,None,None,None,None"
assert poubelle.informations() == "None,None,None,None,None"
def test_search(scraper: Scraper):
m = mock_open()
with patch("builtins.open", m):
scraper.getvins("wine.html", "fake_file.csv")
assert m().write.called
all_writes = "".join(call.args[0] for call in m().write.call_args_list)
assert "Madame-Loïk,91,17,93.5,842.0" in all_writes