ajout: ajout factorisation vin et meilleure barre

This commit is contained in:
2026-03-02 21:42:23 +01:00
parent 3619890dc4
commit e6c649b433
3 changed files with 67 additions and 44 deletions

View File

@@ -1,7 +1,7 @@
[project] [project]
name = "projet-millesima-s6" name = "projet-millesima-s6"
version = "0.1.0" version = "0.1.0"
dependencies = ["requests==2.32.5", "beautifulsoup4==4.14.3", "pandas==2.3.3"] dependencies = ["requests==2.32.5", "beautifulsoup4==4.14.3", "pandas==2.3.3", "tqdm==4.67.3"]
[project.optional-dependencies] [project.optional-dependencies]
test = ["pytest==8.4.2", "requests-mock==1.12.1", "flake8==7.3.0"] test = ["pytest==8.4.2", "requests-mock==1.12.1", "flake8==7.3.0"]

View File

@@ -1,20 +1,23 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from io import SEEK_END, SEEK_SET, BufferedWriter, BufferedReader
from os.path import exists, normpath, realpath, dirname, join
from os import makedirs
from pickle import dump, load, UnpicklingError
from sys import argv
from typing import Any, Callable, Literal, cast
from requests import HTTPError, Response, Session
from bs4 import BeautifulSoup, Tag
from collections import OrderedDict from collections import OrderedDict
from io import SEEK_END, SEEK_SET, BufferedWriter
from json import JSONDecodeError, loads from json import JSONDecodeError, loads
from os import makedirs
from os.path import dirname, exists, join, normpath, realpath
from pickle import UnpicklingError, dump, load
from sys import argv
from tqdm.std import tqdm
from typing import Any, Callable, Literal, TypeVar, cast
from bs4 import BeautifulSoup, Tag
from requests import HTTPError, Response, Session
_dir: str = dirname(realpath(__name__)) _dir: str = dirname(realpath(__name__))
T = TypeVar("T")
def _getcache[T](mode: Literal["rb", "wb"], fn: Callable[[Any], T]) -> T | None:
def _getcache(mode: Literal["rb", "wb"], fn: Callable[[Any], T]) -> T | None:
"""_summary_ """_summary_
Returns: Returns:
@@ -39,8 +42,10 @@ def savestate(data: tuple[int, set[str]]) -> None:
_ = f.truncate() _ = f.truncate()
dump(data, f) dump(data, f)
f.flush() f.flush()
_getcache("wb", save) _getcache("wb", save)
def loadstate() -> tuple[int, set[str]] | None: def loadstate() -> tuple[int, set[str]] | None:
return _getcache("rb", lambda f: load(f)) return _getcache("rb", lambda f: load(f))
@@ -147,7 +152,6 @@ class _ScraperData:
str | None: Le nom (ex: 'Pauillac') ou None. str | None: Le nom (ex: 'Pauillac') ou None.
""" """
attrs: dict[str, object] | None = self._getattributes() attrs: dict[str, object] | None = self._getattributes()
if attrs is not None: if attrs is not None:
app_dict: object | None = attrs.get("appellation") app_dict: object | None = attrs.get("appellation")
if isinstance(app_dict, dict): if isinstance(app_dict, dict):
@@ -365,7 +369,7 @@ class Scraper:
return _ScraperData(cast(dict[str, object], current_data)) return _ScraperData(cast(dict[str, object], current_data))
def _geturlproductslist(self, subdir: str) -> list[str] | None: def _geturlproductslist(self, subdir: str) -> list[dict[str, Any]] | None:
""" """
Récupère la liste des produits d'une page de catégorie. Récupère la liste des produits d'une page de catégorie.
""" """
@@ -373,32 +377,61 @@ class Scraper:
data: dict[str, object] = self.getjsondata(subdir).getdata() data: dict[str, object] = self.getjsondata(subdir).getdata()
for element in ["initialReduxState", "categ", "content"]: for element in ["initialReduxState", "categ", "content"]:
data: dict[str, object] = cast(dict[str, object], data.get(element)) data = cast(dict[str, object], data.get(element))
if not isinstance(data, dict):
return None products: list[dict[str, Any]] = cast(
list[dict[str, Any]], data.get("products")
)
products: list[str] = cast(list[str], data.get("products"))
if isinstance(products, list): if isinstance(products, list):
return products return products
except (JSONDecodeError, HTTPError): except (JSONDecodeError, HTTPError):
return None return None
def _writevins(self, cache: set[str], product: dict[str, Any], f: Any) -> None:
"""_summary_
Args:
cache (set[str]): _description_
product (dict): _description_
f (Any): _description_
"""
if isinstance(product, dict):
link: Any | None = product.get("seoKeyword")
if link and link not in cache:
try:
infos = self.getjsondata(link).informations()
_ = f.write(infos + "\n")
cache.add(link)
except (JSONDecodeError, HTTPError) as e:
print(f"Erreur sur le produit {link}: {e}")
def getvins(self, subdir: str, filename: str, reset: bool = False) -> None: def getvins(self, subdir: str, filename: str, reset: bool = False) -> None:
""" """
Scrape récursivement toutes les pages d'une catégorie et sauvegarde en CSV. Scrape toutes les pages d'une catégorie et sauvegarde en CSV.
Args: Args:
subdir (str): La catégorie (ex: '/vins-rouges'). subdir (str): La catégorie (ex: '/vins-rouges').
filename (str): Nom du fichier de sortie (ex: 'vins.csv'). filename (str): Nom du fichier de sortie (ex: 'vins.csv').
reset (bool): (Optionnel) pour réinitialiser le processus. reset (bool): (Optionnel) pour réinitialiser le processus.
""" """
# mode d'écriture fichier
mode: Literal["w", "a+"] = "w" if reset else "a+" mode: Literal["w", "a+"] = "w" if reset else "a+"
# titre
title: str = "Appellation,Robert,Robinson,Suckling,Prix\n" title: str = "Appellation,Robert,Robinson,Suckling,Prix\n"
# page du début
page: int = 1 page: int = 1
# le set qui sert de cache
cache: set[str] = set[str]() cache: set[str] = set[str]()
custom_format = "{l_bar} {bar:20} {r_bar}"
if not reset: if not reset:
# appelle la fonction pour load le cache, si il existe
# pas, il utilise les variables de base sinon il override
# toute les variables pour continuer et pas recommencer le
# processus en entier.
serializable: tuple[int, set[str]] | None = loadstate() serializable: tuple[int, set[str]] | None = loadstate()
if isinstance(serializable, tuple): if isinstance(serializable, tuple):
page, cache = serializable page, cache = serializable
@@ -416,33 +449,23 @@ class Scraper:
_ = f.seek(0, SEEK_END) _ = f.seek(0, SEEK_END)
while True: while True:
products_list: list[str] | None = self._geturlproductslist( products_list: list[dict[str, Any]] | None = (
f"{subdir}?page={page}" self._geturlproductslist(f"{subdir}?page={page}")
) )
if not products_list: if not products_list:
break break
products_list_length = len(products_list) pbar: tqdm[dict[str, Any]] = tqdm(
for i, product in enumerate(products_list): products_list, bar_format=custom_format
if not isinstance(product, dict): )
continue for product in pbar:
keyword = product.get("seoKeyword", "Inconnu")[:40]
link = product.get("seoKeyword") pbar.set_description(
f"Page: {page:<3} | Product: {keyword:<40}"
if link and link not in cache: )
try: self._writevins(cache, product, f)
infos = self.getjsondata(link).informations()
_ = f.write(infos + "\n")
print(
f"page: {page} | {i + 1}/{products_list_length} {link}"
)
cache.add(link)
except (JSONDecodeError, HTTPError) as e:
print(f"Erreur sur le produit {link}: {e}")
f.flush()
page += 1 page += 1
except: except Exception:
if not reset: if not reset:
savestate((page, cache)) savestate((page, cache))

View File

@@ -153,7 +153,7 @@ def mock_site():
html_product = f""" html_product = f"""
<html> <html>
<body> <body>
<h1>MILLESIMA</h1> <h1>MILLESIMA</h1>
<script id="__NEXT_DATA__" type="application/json"> <script id="__NEXT_DATA__" type="application/json">
{dumps(json_data)} {dumps(json_data)}
@@ -168,7 +168,7 @@ def mock_site():
html_product = f""" html_product = f"""
<html> <html>
<body> <body>
<h1>MILLESIMA</h1> <h1>MILLESIMA</h1>
<script id="__NEXT_DATA__" type="application/json"> <script id="__NEXT_DATA__" type="application/json">
{dumps(json_data)} {dumps(json_data)}
@@ -179,7 +179,7 @@ def mock_site():
list_pleine = f""" list_pleine = f"""
<html> <html>
<body> <body>
<h1>LE WINE</h1> <h1>LE WINE</h1>
<script id="__NEXT_DATA__" type="application/json"> <script id="__NEXT_DATA__" type="application/json">
{dumps({ {dumps({
@@ -207,7 +207,7 @@ def mock_site():
list_vide = f""" list_vide = f"""
<html> <html>
<body> <body>
<h1>LE WINE</h1> <h1>LE WINE</h1>
<script id="__NEXT_DATA__" type="application/json"> <script id="__NEXT_DATA__" type="application/json">
{dumps({ {dumps({
@@ -319,7 +319,7 @@ def test_informations(scraper: Scraper):
def test_search(scraper: Scraper): def test_search(scraper: Scraper):
m = mock_open() m = mock_open()
with patch("builtins.open", m): with patch("builtins.open", m):
scraper.getvins("wine.html", "fake_file.csv", False) scraper.getvins("wine.html", "fake_file.csv", True)
assert m().write.called assert m().write.called
all_writes = "".join(call.args[0] for call in m().write.call_args_list) all_writes = "".join(call.args[0] for call in m().write.call_args_list)