15 Commits

16 changed files with 752 additions and 699 deletions

View File

@@ -5,35 +5,34 @@ name: Python application
on:
push:
branches: [ "main" ]
branches: ["main"]
pull_request:
branches: [ "main" ]
branches: ["main"]
permissions:
contents: read
contents: write
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python 3.10
uses: actions/setup-python@v3
with:
python-version: "3.10"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install flake8 pytest
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Test with pytest
run: |
pytest
- uses: actions/checkout@v4
- name: Set up Python 3.10
uses: actions/setup-python@v4
with:
python-version: "3.10"
- name: install dependencies
run: |
python -m pip install --upgrade pip
pip install ".[test,doc]"
- name: Lint with flake8
run: |
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Test with pytest
run: pytest

View File

@@ -1,106 +0,0 @@
#!/usr/bin/env python3
from pandas import DataFrame, to_numeric
import pandas as pd
SCORE_COLS = ["Robert", "Robinson", "Suckling"]
def display_info(df: DataFrame, name: str = "DataFrame") -> None:
"""
Affiche un résumé du DataFrame
-la taille
-types des colonnes
-valeurs manquantes
-statistiques numériques
"""
print(f"\n===== {name} =====")
print(f"Shape : {df.shape[0]} lignes × {df.shape[1]} colonnes")
print("\nTypes des colonnes :")
print(df.dtypes)
print("\nValeurs manquantes :")
print(df.isna().sum())
print("\nStatistiques numériques :")
print(df.describe().round(2))
def drop_empty_appellation(df: DataFrame) -> DataFrame:
return df.dropna(subset=["Appellation"])
def mean_score(df: DataFrame, col: str) -> DataFrame:
"""
Calcule la moyenne d'une colonne de score par appellation.
- Convertit les valeurs en numériques, en remplaçant les non-convertibles par NaN
- Calcule la moyenne par appellation
- Remplace les NaN résultants par 0
"""
tmp = df[["Appellation", col]].copy()
tmp[col] = to_numeric(tmp[col], errors="coerce")
# moyenne par appellation
means = tmp.groupby("Appellation", as_index=False)[col].mean()
means[col] = means[col].fillna(0)
means = means.rename(columns={col: f"mean_{col}"})
return means
def mean_robert(df: DataFrame) -> DataFrame:
return mean_score(df, "Robert")
def mean_robinson(df: DataFrame) -> DataFrame:
return mean_score(df, "Robinson")
def mean_suckling(df: DataFrame) -> DataFrame:
return mean_score(df, "Suckling")
def fill_missing_scores(df: DataFrame) -> DataFrame:
"""
Remplacer les notes manquantes par la moyenne
des vins de la même appellation.
"""
df_copy = df.copy()
df_copy["Appellation"] = df_copy["Appellation"].astype(str).str.strip()
for score in SCORE_COLS:
df_copy[score] = to_numeric(df_copy[score], errors="coerce")
temp_cols: list[str] = []
for score in SCORE_COLS:
mean_df = mean_score(df_copy, score)
mean_name = f"mean_{score}"
temp_cols.append(mean_name)
df_copy = df_copy.merge(mean_df, on="Appellation", how="left")
df_copy[score] = df_copy[score].fillna(df_copy[mean_name])
df_copy = df_copy.drop(columns=temp_cols)
return df_copy
def encode_appellation(df: DataFrame, column: str = "Appellation") -> DataFrame:
"""
Remplace la colonne 'Appellation' par des colonnes indicatrices
"""
df_copy = df.copy()
appellations = df_copy[column].astype(str).str.strip()
appellation_dummies = pd.get_dummies(appellations)
df_copy = df_copy.drop(columns=[column])
return df_copy.join(appellation_dummies)

1
docs/index.md Normal file
View File

@@ -0,0 +1 @@
# Millesima

3
docs/scraper.md Normal file
View File

@@ -0,0 +1,3 @@
# Scraper
::: scraper.Scraper

4
docs/scraperdata.md Normal file
View File

@@ -0,0 +1,4 @@
# _ScraperData
::: scraper._ScraperData

64
main.py
View File

@@ -1,64 +0,0 @@
#!/usr/bin/env python3
from os import getcwd
from os.path import normpath, join
from sys import argv
from pandas import read_csv, DataFrame
from cleaning import (display_info,
drop_empty_appellation,
mean_robert,
mean_robinson,
mean_suckling,
fill_missing_scores,
encode_appellation)
def load_csv(filename: str) -> DataFrame:
path: str = normpath(join(getcwd(), filename))
return read_csv(path)
def save_csv(df: DataFrame, out_filename: str) -> None:
df.to_csv(out_filename, index=False)
def main() -> None:
if len(argv) != 2:
raise ValueError(f"Usage: {argv[0]} <filename.csv>")
df = load_csv(argv[1])
display_info(df, "Avant le nettoyage")
df = drop_empty_appellation(df)
save_csv(df, "donnee_clean.csv")
display_info(df, "Après nettoyage d'appellations manquantes")
#la moyenne des notes des vins pour chaque appellation
robert_means = mean_robert(df)
save_csv(robert_means, "mean_robert_by_appellation.csv")
display_info(robert_means, "Moyennes Robert par appellation")
robinson_means = mean_robinson(df)
save_csv(robinson_means, "mean_robinson_by_appellation.csv")
display_info(robinson_means, "Moyennes Robinson par appellation")
suckling_means = mean_suckling(df)
save_csv(suckling_means, "mean_suckling_by_appellation.csv")
display_info(suckling_means, "Moyennes Suckling par appellation")
df_missing_scores = fill_missing_scores(df)
save_csv(df_missing_scores, "donnee_filled.csv")
display_info(df_missing_scores, "Après remplissage des notes manquantes par la moyenne de l'appellation")
df_ready = encode_appellation(df_missing_scores)
save_csv(df_ready, "donnee_ready.csv")
display_info(df_ready, "Après remplacer la colonne 'Appellation' par des colonnes indicatrices")
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"ERREUR: {e}")

14
mkdocs.yml Normal file
View File

@@ -0,0 +1,14 @@
site_name: "Projet Millesima S6"
theme:
name: "material"
plugins:
- search
- mkdocstrings
markdown_extensions:
- admonition
- pymdownx.details
- pymdownx.superfences
- pymdownx.tabbed

17
pyproject.toml Normal file
View File

@@ -0,0 +1,17 @@
[project]
name = "projet-millesima-s6"
version = "0.1.0"
dependencies = [
"requests==2.32.5",
"beautifulsoup4==4.14.3",
"pandas==2.3.3",
"tqdm==4.67.3",
]
[project.optional-dependencies]
test = ["pytest==8.4.2", "requests-mock==1.12.1", "flake8==7.3.0"]
doc = ["mkdocs<2.0.0", "mkdocs-material==9.6.23", "mkdocstrings[python]"]
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"

View File

@@ -1,6 +0,0 @@
requests==2.32.5
requests-mock==1.12.1
beautifulsoup4==4.14.3
pytest==8.4.2
requests-mock==1.12.1
pandas==2.3.3

View File

@@ -1,431 +0,0 @@
#!/usr/bin/env python3
from sys import argv
from typing import cast
from requests import HTTPError, Response, Session
from requests.exceptions import Timeout, ConnectionError
import time
from bs4 import BeautifulSoup, Tag
from collections import OrderedDict
from json import JSONDecodeError, loads
from pathlib import Path
class _ScraperData:
"""_summary_"""
def __init__(self, data: dict[str, object]) -> None:
"""_summary_
Args:
data (dict[str, object]): _description_
"""
self._data: dict[str, object] = data
def _getcontent(self) -> dict[str, object] | None:
"""_summary_
Returns:
dict[str, object]: _description_
"""
current_data: dict[str, object] = self._data
for key in ["initialReduxState", "product", "content"]:
new_data: object | None = current_data.get(key)
if new_data is None:
return None
current_data: dict[str, object] = cast(dict[str, object], new_data)
return current_data
def _getattributes(self) -> dict[str, object] | None:
"""_summary_
Returns:
dict[str, object]: _description_
"""
current_data: object = self._getcontent()
if current_data is None:
return None
return cast(dict[str, object], current_data.get("attributes"))
def prix(self) -> float | None:
"""
Retourne le prix unitaire d'une bouteille (75cl).
Si aucun prix n'est disponible, retourne None.
"""
content = self._getcontent()
if content is None:
return None
items = content.get("items")
# Vérifie que items existe et n'est pas vide
if not isinstance(items, list) or len(items) == 0:
return None
prix_calcule: float | None = None
for item in items:
if not isinstance(item, dict):
continue
p = item.get("offerPrice")
attrs = item.get("attributes", {})
nbunit = attrs.get("nbunit", {}).get("value")
equivbtl = attrs.get("equivbtl", {}).get("value")
if not isinstance(p, (int, float)) or not nbunit or not equivbtl:
continue
nb = float(nbunit)
eq = float(equivbtl)
if nb <= 0 or eq <= 0:
continue
if nb == 1 and eq == 1:
return float(p)
prix_calcule = round(float(p) / (nb * eq), 2)
return prix_calcule
def appellation(self) -> str | None:
"""_summary_
Returns:
str: _description_
"""
attrs: dict[str, object] | None = self._getattributes()
if attrs is not None:
app_dict: object | None = attrs.get("appellation")
if isinstance(app_dict, dict):
return cast(str, app_dict.get("value"))
return None
def _getcritiques(self, name: str) -> str | None:
"""_summary_
Args:
name (str): _description_
Returns:
str | None: _description_
"""
current_value: dict[str, object] | None = self._getattributes()
if current_value is not None:
app_dict: dict[str, object] = cast(
dict[str, object], current_value.get(name)
)
if not app_dict:
return None
val = cast(str, app_dict.get("value")).rstrip("+").split("-")
if len(val) > 1 and val[1] != "":
val[0] = str(round((float(val[0]) + float(val[1])) / 2, 1))
return val[0]
return None
def parker(self) -> str | None:
return self._getcritiques("note_rp")
def robinson(self) -> str | None:
return self._getcritiques("note_jr")
def suckling(self) -> str | None:
return self._getcritiques("note_js")
def getdata(self) -> dict[str, object]:
return self._data
def informations(self) -> str:
"""
Retourne toutes les informations sous la forme :
"Appelation,Parker,J.Robinson,J.Suckling,Prix"
"""
appellation = self.appellation()
parker = self.parker()
robinson = self.robinson()
suckling = self.suckling()
prix = self.prix()
return f"{appellation},{parker},{robinson},{suckling},{prix}"
class Scraper:
"""
Scraper est une classe qui permet de gerer
de façon dynamique des requetes uniquement
sur le serveur https de Millesima
"""
def __init__(self) -> None:
"""
Initialise la session de scraping.
"""
self._url: str = "https://www.millesima.fr/"
# Très utile pour éviter de renvoyer toujours les mêmes handshake
# TCP et d'avoir toujours une connexion constante avec le server
self._session: Session = Session()
self._session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/122.0.0.0 Safari/537.36",
"Accept-Language": "fr-FR,fr;q=0.9,en;q=0.8",
})
# Système de cache pour éviter de solliciter le serveur inutilement
self._latest_request: tuple[(str, Response)] | None = None
self._latest_soups: OrderedDict[str, BeautifulSoup] = OrderedDict[
str, BeautifulSoup
]()
def _request(self, subdir: str) -> Response:
"""
Effectue une requête GET sur le serveur Millesima.
Args:
subdir (str): Le sous-répertoire ou chemin de l'URL (ex: "/vins").
Returns:
Response: L'objet réponse de la requête.
Raise:
HTTPError: Si le serveur renvoie un code d'erreur (4xx, 5xx).
"""
target_url: str = self._url + subdir.lstrip("/")
last_exc: Exception | None = None
for attempt in range(1, 4):
try:
response: Response = self._session.get(url=target_url, timeout=30)
response.raise_for_status()
return response
except (Timeout, ConnectionError) as e:
last_exc = e
print(f"Timeout/ConnectionError ({attempt}/3) sur {target_url}: {e}")
time.sleep(2 * attempt) # 2s, 4s, 6s
# après 3 essais, on abandonne
raise last_exc if last_exc else RuntimeError("Request failed")
def getresponse(self, subdir: str = "", use_cache: bool = True) -> Response:
"""
Récupère la réponse d'une page, en utilisant le cache si possible.
Args:
subdir (str, optional): Le chemin de la page.
use_cache (bool, optional): Utilise la donnée deja sauvegarder ou
écrase la donnée utilisé avec la nouvelle
Returns:
Response: L'objet réponse (cache ou nouvelle requête).
Raise:
HTTPError: Si le serveur renvoie un code d'erreur (4xx, 5xx).
"""
# si dans le cache, latest_request existe
if use_cache and self._latest_request is not None:
rq_subdir, rq_response = self._latest_request
# si c'est la meme requete et que use_cache est true,
# on renvoie celle enregistrer
if subdir == rq_subdir:
return rq_response
request: Response = self._request(subdir)
# on recrée la structure pour le systeme de cache si activer
if use_cache:
self._latest_request = (subdir, request)
return request
def getsoup(self, subdir: str, use_cache: bool = True) -> BeautifulSoup:
"""
Récupère le contenu HTML d'une page et le transforme en objet BeautifulSoup.
Args:
subdir (str, optional): Le chemin de la page.
Returns:
BeautifulSoup: L'objet parsé pour extraction de données.
Raise:
HTTPError: Si le serveur renvoie un code d'erreur (4xx, 5xx).
"""
if use_cache and subdir in self._latest_soups:
return self._latest_soups[subdir]
markup: str = self.getresponse(subdir).text
soup: BeautifulSoup = BeautifulSoup(markup, features="html.parser")
if use_cache:
self._latest_soups[subdir] = soup
if len(self._latest_soups) > 10:
_ = self._latest_soups.popitem(last=False)
return soup
def getjsondata(self, subdir: str, id: str = "__NEXT_DATA__") -> _ScraperData:
"""
Extrait les données JSON contenues dans la balise __NEXT_DATA__ du site.
Beaucoup de sites modernes (Next.js) stockent leur état initial dans
une balise <script> pour l'hydratation côté client.
Args:
subdir (str): Le chemin de la page.
id (str, optional): L'identifiant de la balise script (par défaut __NEXT_DATA__).
Raises:
HTTPError: Soulevée par `getresponse` si le serveur renvoie un code d'erreur (4xx, 5xx).
JSONDecodeError: Soulevée par `loads` si le contenu de la balise n'est pas un JSON valide.
ValueError: Soulevée manuellement si l'une des clés attendues (props, pageProps, etc.)
est absente de la structure JSON.
Returns:
dict[str, object]: Un dictionnaire contenant les données utiles
ou un dictionnaire vide en cas d'erreur.
"""
soup: BeautifulSoup = self.getsoup(subdir)
script: Tag | None = soup.find("script", id=id)
if script is None or not script.string:
raise ValueError(f"le script id={id} est introuvable")
current_data: object = cast(object, loads(script.string))
for key in ["props", "pageProps"]:
if isinstance(current_data, dict) and key in current_data:
current_data = cast(object, current_data[key])
continue
raise ValueError(f"Clé manquante dans le JSON : {key}")
return _ScraperData(cast(dict[str, object], current_data))
def _geturlproductslist(self, subdir: str):
"""_summary_
Args:
subdir (str): _description_
Returns:
_type_: _description_
"""
try:
data: dict[str, object] = self.getjsondata(subdir).getdata()
for element in ["initialReduxState", "categ", "content"]:
data: dict[str, object] = cast(dict[str, object], data.get(element))
if not isinstance(data, dict):
return None
products: list[str] = cast(list[str], data.get("products"))
if isinstance(products, list):
return products
except (JSONDecodeError, HTTPError):
return None
def _save_progress(self, page: int, i: int, last_link: str) -> None:
Path("progress.txt").write_text(f"{page},{i},{last_link}", encoding="utf-8")
def _load_progress(self) -> tuple[int, int, str | None]:
p = Path("progress.txt")
if not p.exists():
return (1, 0, None)
try:
parts = p.read_text(encoding="utf-8").strip().split(",", 2)
page = int(parts[0])
i = int(parts[1])
last_link = parts[2] if len(parts) == 3 and parts[2] != "" else None
return (page, i, last_link)
except Exception:
return (1, 0, None)
def getvins(self, subdir: str, filename: str):
"""_summary_
Args:
subdir (str): _description_
filename (str): _description_
"""
start_page, start_i, last_link = self._load_progress()
print(f"__INFO__ Reprise à page={start_page}, index={start_i}, last_link={last_link}")
with open(filename, "a", encoding="utf-8") as f:
cache: set[str] = set[str]()
if f.tell() == 0:
_ = f.write("Appellation,Robert,Robinson,Suckling,Prix\n")
page = start_page - 1
while True:
page += 1
products_list = self._geturlproductslist(f"{subdir}?page={page}")
if not products_list:
break
products_list_length = len(products_list)
start_at = start_i if page == start_page else 0
for i in range(start_at, products_list_length):
product = products_list[i]
if not isinstance(product, dict):
continue
link = product.get("seoKeyword")
if not link:
continue
# pour eviter les doublons :
if (page == start_page) and (last_link is not None) and (link == last_link):
self._save_progress(page, + 1, link)
continue
self._save_progress(page, i + 1, link)
if link in cache:
continue
try:
infos = self.getjsondata(link).informations()
_ = f.write(infos + "\n")
print(f"page: {page} | {i + 1}/{products_list_length} {link}")
cache.add(link)
except (JSONDecodeError, HTTPError) as e:
print(f"Erreur sur le produit {link}: {e}")
f.flush()
Path("progress.txt").unlink(missing_ok=True)
def main() -> None:
if len(argv) != 2:
raise ValueError(f"{argv[0]} <sous-url>")
scraper: Scraper = Scraper()
scraper.getvins(argv[1], "donnee.csv")
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"ERREUR: {e}")

109
src/cleaning.py Executable file
View File

@@ -0,0 +1,109 @@
#!/usr/bin/env python3
from os import getcwd
from os.path import normpath, join
from typing import cast
from pandas import DataFrame, read_csv, to_numeric, get_dummies
from sys import argv
def path_filename(filename: str) -> str:
return normpath(join(getcwd(), filename))
class Cleaning:
def __init__(self, filename) -> None:
self._vins: DataFrame = read_csv(filename)
# créer la liste de tout les scores
self.SCORE_COLS: list[str] = [
c for c in self._vins.columns if c not in ["Appellation", "Prix"]
]
# transforme tout les colonnes score en numérique
for col in self.SCORE_COLS:
self._vins[col] = to_numeric(self._vins[col], errors="coerce")
def getVins(self) -> DataFrame:
return self._vins.copy(deep=True)
def __str__(self) -> str:
"""
Affiche un résumé du DataFrame
- la taille
- types des colonnes
- valeurs manquantes
- statistiques numériques
"""
return (
f"Shape : {self._vins.shape[0]} lignes x {self._vins.shape[1]} colonnes\n\n"
f"Types des colonnes :\n{self._vins.dtypes}\n\n"
f"Valeurs manquantes :\n{self._vins.isna().sum()}\n\n"
f"Statistiques numériques :\n{self._vins.describe().round(2)}\n\n"
)
def drop_empty_appellation(self) -> "Cleaning":
self._vins = self._vins.dropna(subset=["Appellation"])
return self
def _mean_score(self, col: str) -> DataFrame:
"""
Calcule la moyenne d'une colonne de score par appellation.
- Convertit les valeurs en numériques, en remplaçant les non-convertibles par NaN
- Calcule la moyenne par appellation
- Remplace les NaN résultants par 0
"""
means = self._vins.groupby("Appellation", as_index=False)[col].mean()
means = means.rename(
columns={col: f"mean_{col}"}
) # pyright: ignore[reportCallIssue]
return cast(DataFrame, means.fillna(0))
def _mean_robert(self) -> DataFrame:
return self._mean_score("Robert")
def _mean_robinson(self) -> DataFrame:
return self._mean_score("Robinson")
def _mean_suckling(self) -> DataFrame:
return self._mean_score("Suckling")
def fill_missing_scores(self) -> "Cleaning":
"""
Remplacer les notes manquantes par la moyenne
des vins de la même appellation.
"""
for element in self.SCORE_COLS:
means = self._mean_score(element)
self._vins = self._vins.merge(means, on="Appellation", how="left")
mean_col = f"mean_{element}"
self._vins[element] = self._vins[element].fillna(self._vins[mean_col])
self._vins = self._vins.drop(columns=["mean_" + element])
return self
def encode_appellation(self, column: str = "Appellation") -> "Cleaning":
"""
Remplace la colonne 'Appellation' par des colonnes indicatrices
"""
appellations = self._vins[column].astype(str).str.strip()
appellation_dummies = get_dummies(appellations, prefix="App")
self._vins = self._vins.drop(columns=[column])
self._vins = self._vins.join(appellation_dummies)
return self
def main() -> None:
if len(argv) != 2:
raise ValueError(f"Usage: {argv[0]} <filename.csv>")
filename = argv[1]
cleaning: Cleaning = Cleaning(filename)
_ = cleaning.drop_empty_appellation().fill_missing_scores().encode_appellation()
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"ERREUR: {e}")

510
src/scraper.py Executable file
View File

@@ -0,0 +1,510 @@
#!/usr/bin/env python3
from collections import OrderedDict
from io import SEEK_END, SEEK_SET, BufferedWriter, TextIOWrapper
from json import JSONDecodeError, loads
from os import makedirs
from os.path import dirname, exists, join, normpath, realpath
from pickle import UnpicklingError, dump, load
from sys import argv
from tqdm.std import tqdm
from typing import Any, Callable, Literal, TypeVar, cast
from bs4 import BeautifulSoup, Tag
from requests import HTTPError, Response, Session
_dir: str = dirname(realpath(__name__))
T = TypeVar("T")
def _getcache(mode: Literal["rb", "wb"], fn: Callable[[Any], T]) -> T | None:
"""_summary_
Returns:
_type_: _description_
"""
cache_dirname = normpath(join(_dir, ".cache"))
save_path = normpath(join(cache_dirname, "save"))
if not exists(cache_dirname):
makedirs(cache_dirname)
try:
with open(save_path, mode) as f:
return fn(f)
except (FileNotFoundError, EOFError, UnpicklingError):
return None
def savestate(data: tuple[int, set[str]]) -> None:
def save(f: BufferedWriter) -> None:
_ = f.seek(0)
_ = f.truncate()
dump(data, f)
f.flush()
_getcache("wb", save)
def loadstate() -> tuple[int, set[str]] | None:
return _getcache("rb", lambda f: load(f))
class _ScraperData:
"""
Conteneur de données spécialisé pour extraire les informations des dictionnaires JSON.
Cette classe agit comme une interface simplifiée au-dessus du dictionnaire brut
renvoyé par la balise __NEXT_DATA__ du site Millesima.
"""
def __init__(self, data: dict[str, object]) -> None:
"""
Initialise le conteneur avec un dictionnaire de données.
Args:
data (dict[str, object]): Le dictionnaire JSON brut extrait de la page.
"""
self._data: dict[str, object] = data
def _getcontent(self) -> dict[str, object] | None:
"""
Navigue dans l'arborescence Redux pour atteindre le contenu du produit.
Returns:
dict[str, object] | None: Le dictionnaire du produit ou None si la structure diffère.
"""
current_data: dict[str, object] = self._data
for key in ["initialReduxState", "product", "content"]:
new_data: object | None = current_data.get(key)
if new_data is None:
return None
current_data: dict[str, object] = cast(dict[str, object], new_data)
return current_data
def _getattributes(self) -> dict[str, object] | None:
"""
Extrait les attributs techniques (notes, appellations, etc.) du produit.
Returns:
dict[str, object] | None: Les attributs du vin ou None.
"""
current_data: object = self._getcontent()
if current_data is None:
return None
return cast(dict[str, object], current_data.get("attributes"))
def prix(self) -> float | None:
"""
Calcule le prix unitaire d'une bouteille (standardisée à 75cl).
Le site vend souvent par caisses (6, 12 bouteilles) ou formats (Magnum).
Cette méthode normalise le prix pour obtenir celui d'une seule unité.
Returns:
float | None: Le prix calculé arrondi à 2 décimales, ou None.
"""
content = self._getcontent()
if content is None:
return None
items = content.get("items")
# Vérifie que items existe et n'est pas vide
if not isinstance(items, list) or len(items) == 0:
return None
prix_calcule: float | None = None
for item in items:
if not isinstance(item, dict):
continue
p = item.get("offerPrice")
attrs = item.get("attributes", {})
nbunit = attrs.get("nbunit", {}).get("value")
equivbtl = attrs.get("equivbtl", {}).get("value")
if not isinstance(p, (int, float)) or not nbunit or not equivbtl:
continue
nb = float(nbunit)
eq = float(equivbtl)
if nb <= 0 or eq <= 0:
continue
if nb == 1 and eq == 1:
return float(p)
prix_calcule = round(float(p) / (nb * eq), 2)
return prix_calcule
def appellation(self) -> str | None:
"""
Extrait le nom de l'appellation du vin.
Returns:
str | None: Le nom (ex: 'Pauillac') ou None.
"""
attrs: dict[str, object] | None = self._getattributes()
if attrs is not None:
app_dict: object | None = attrs.get("appellation")
if isinstance(app_dict, dict):
return cast(str, app_dict.get("value"))
return None
def _getcritiques(self, name: str) -> str | None:
"""
Méthode générique pour parser les notes des critiques (Parker, Suckling, etc.).
Gère les notes simples ("95") et les plages de notes ("95-97") en faisant la moyenne.
Args:
name (str): La clé de l'attribut dans le JSON (ex: 'note_rp').
Returns:
str | None: La note formatée en chaîne de caractères ou None.
"""
current_value: dict[str, object] | None = self._getattributes()
if current_value is not None:
app_dict: dict[str, object] = cast(
dict[str, object], current_value.get(name)
)
if not app_dict:
return None
val = cast(str, app_dict.get("value")).rstrip("+").split("-")
if len(val) > 1 and val[1] != "":
val[0] = str(round((float(val[0]) + float(val[1])) / 2, 1))
return val[0]
return None
def parker(self) -> str | None:
"""Note Robert Parker."""
return self._getcritiques("note_rp")
def robinson(self) -> str | None:
"""Note Jancis Robinson."""
return self._getcritiques("note_jr")
def suckling(self) -> str | None:
"""Note James Suckling."""
return self._getcritiques("note_js")
def getdata(self) -> dict[str, object]:
"""Retourne le dictionnaire de données complet."""
return self._data
def informations(self) -> str:
"""
Agrège les données clés pour l'export CSV.
Returns:
str: Ligne formatée : "Appellation,Parker,Robinson,Suckling,Prix".
"""
appellation = self.appellation()
parker = self.parker()
robinson = self.robinson()
suckling = self.suckling()
prix = self.prix()
prix = self.prix()
return f"{appellation},{parker},{robinson},{suckling},{prix}"
class Scraper:
"""
Client HTTP optimisé pour le scraping de millesima.fr.
Gère la session persistante, les headers de navigation et un cache double
pour optimiser les performances et la discrétion.
"""
def __init__(self) -> None:
"""
Initialise l'infrastructure de navigation:
- créer une session pour éviter de faire un handshake pour chaque requête
- ajout d'un header pour éviter le blocage de l'accès au site
- ajout d'un système de cache
"""
self._url: str = "https://www.millesima.fr/"
# Très utile pour éviter de renvoyer toujours les mêmes handshake
# TCP et d'avoir toujours une connexion constante avec le server
self._session: Session = Session()
# Crée une "fausse carte d'identité" pour éviter que le site nous
# bloque car on serait des robots
self._session.headers.update(
{
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) \
AppleWebKit/537.36 (KHTML, like Gecko) \
Chrome/122.0.0.0 Safari/537.36",
"Accept-Language": "fr-FR,fr;q=0.9,en;q=0.8",
}
)
# Système de cache pour éviter de solliciter le serveur inutilement
# utilise pour _request
self._latest_request: tuple[(str, Response)] | None = None
# utilise pour getsoup
self._latest_soups: OrderedDict[str, BeautifulSoup] = OrderedDict[
str, BeautifulSoup
]()
def _request(self, subdir: str) -> Response:
"""
Effectue une requête GET sur le serveur Millesima.
Args:
subdir (str): Le sous-répertoire ou chemin de l'URL (ex: "/vins").
Returns:
Response: L'objet réponse de la requête.
Raises:
HTTPError: Si le serveur renvoie un code d'erreur (4xx, 5xx).
"""
target_url: str = self._url + subdir.lstrip("/")
# envoyer une requête GET sur la page si erreur, renvoie un raise
response: Response = self._session.get(url=target_url, timeout=30)
response.raise_for_status()
return response
def getresponse(self, subdir: str = "", use_cache: bool = True) -> Response:
"""
Récupère la réponse d'une page, en utilisant le cache si possible.
Args:
subdir (str, optional): Le chemin de la page.
use_cache (bool, optional): Utilise la donnée deja sauvegarder ou
écrase la donnée utilisé avec la nouvelle
Returns:
Response: L'objet réponse (cache ou nouvelle requête).
Raises:
HTTPError: Si le serveur renvoie un code d'erreur (4xx, 5xx).
"""
# si dans le cache, latest_request existe
if use_cache and self._latest_request is not None:
rq_subdir, rq_response = self._latest_request
# si c'est la meme requete et que use_cache est true,
# on renvoie celle enregistrer
if subdir == rq_subdir:
return rq_response
request: Response = self._request(subdir)
# on recrée la structure pour le systeme de cache si activer
if use_cache:
self._latest_request = (subdir, request)
return request
def getsoup(self, subdir: str, use_cache: bool = True) -> BeautifulSoup:
"""
Récupère le contenu HTML d'une page et le transforme en objet BeautifulSoup.
Args:
subdir (str, optional): Le chemin de la page.
Returns:
BeautifulSoup: L'objet parsé pour extraction de données.
Raises:
HTTPError: Si le serveur renvoie un code d'erreur (4xx, 5xx).
"""
if use_cache and subdir in self._latest_soups:
return self._latest_soups[subdir]
markup: str = self.getresponse(subdir).text
soup: BeautifulSoup = BeautifulSoup(markup, features="html.parser")
if use_cache:
self._latest_soups[subdir] = soup
if len(self._latest_soups) > 10:
_ = self._latest_soups.popitem(last=False)
return soup
def getjsondata(self, subdir: str, id: str = "__NEXT_DATA__") -> _ScraperData:
"""
Extrait les données JSON contenues dans la balise __NEXT_DATA__ du site.
Args:
subdir (str): Le chemin de la page.
id (str, optional): L'identifiant de la balise script.
Raises:
HTTPError: Erreur renvoyée par le serveur (4xx, 5xx).
JSONDecodeError: Si le contenu de la balise n'est pas un JSON valide.
ValueError: Si les clés 'props' ou 'pageProps' sont absentes.
Returns:
_ScraperData: Instance contenant les données extraites.
"""
soup: BeautifulSoup = self.getsoup(subdir)
script: Tag | None = soup.find("script", id=id)
if script is None or not script.string:
raise ValueError(f"le script id={id} est introuvable")
current_data: object = cast(object, loads(script.string))
for key in ["props", "pageProps"]:
if isinstance(current_data, dict) and key in current_data:
current_data = cast(object, current_data[key])
continue
raise ValueError(f"Clé manquante dans le JSON : {key}")
return _ScraperData(cast(dict[str, object], current_data))
def _geturlproductslist(self, subdir: str) -> list[dict[str, Any]] | None:
"""
Récupère la liste des produits d'une page de catégorie.
"""
try:
data: dict[str, object] = self.getjsondata(subdir).getdata()
for element in ["initialReduxState", "categ", "content"]:
data = cast(dict[str, object], data.get(element))
products: list[dict[str, Any]] = cast(
list[dict[str, Any]], data.get("products")
)
return products
except (JSONDecodeError, HTTPError):
return None
def _writevins(self, cache: set[str], product: dict[str, Any], f: Any) -> None:
"""_summary_
Args:
cache (set[str]): _description_
product (dict): _description_
f (Any): _description_
"""
if isinstance(product, dict):
link: Any | None = product.get("seoKeyword")
if link and link not in cache:
try:
infos = self.getjsondata(link).informations()
_ = f.write(infos + "\n")
cache.add(link)
except (JSONDecodeError, HTTPError) as e:
print(f"Erreur sur le produit {link}: {e}")
def _initstate(self, reset: bool) -> tuple[int, set[str]]:
"""
appelle la fonction pour load le cache, si il existe
pas, il utilise les variables de base sinon il override
toute les variables pour continuer et pas recommencer le
processus en entier.
Args:
reset (bool): pouvoir le reset ou pas
Returns:
tuple[int, set[str]]: le contenu de la page et du cache
"""
if not reset:
#
serializable: tuple[int, set[str]] | None = loadstate()
if isinstance(serializable, tuple):
return serializable
return 1, set()
def _ensuretitle(self, f: TextIOWrapper, title: str) -> None:
"""
check si le titre est bien présent au début du buffer
sinon il l'ecrit, petit bug potentiel, a+ ecrit tout le
temps a la fin du buffer, si on a ecrit des choses avant
le titre sera apres ces données mais on part du principe
que personne va toucher le fichier.
Args:
f (TextIOWrapper): buffer stream fichier
title (str): titre du csv
"""
_ = f.seek(0, SEEK_SET)
if not (f.read(len(title)) == title):
_ = f.write(title)
else:
_ = f.seek(0, SEEK_END)
def getvins(self, subdir: str, filename: str, reset: bool = False) -> None:
"""
Scrape toutes les pages d'une catégorie et sauvegarde en CSV.
Args:
subdir (str): La catégorie (ex: '/vins-rouges').
filename (str): Nom du fichier de sortie (ex: 'vins.csv').
reset (bool): (Optionnel) pour réinitialiser le processus.
"""
# mode d'écriture fichier
mode: Literal["w", "a+"] = "w" if reset else "a+"
# titre
title: str = "Appellation,Robert,Robinson,Suckling,Prix\n"
# page: page où commence le scraper
# cache: tout les pages déjà parcourir
page, cache = self._initstate(reset)
try:
with open(filename, mode) as f:
self._ensuretitle(f, title)
while True:
products_list: list[dict[str, Any]] | None = (
self._geturlproductslist(f"{subdir}?page={page}")
)
if not products_list:
break
pbar: tqdm[dict[str, Any]] = tqdm(
products_list, bar_format="{l_bar} {bar:20} {r_bar}"
)
for product in pbar:
keyword: str = cast(
str, product.get("seoKeyword", "Inconnu")[:40]
)
pbar.set_description(
f"Page: {page:<3} | Product: {keyword:<40}"
)
self._writevins(cache, product, f)
page += 1
# va créer un fichier au début et l'override
# tout les 5 pages au cas où SIGHUP ou autre
if page % 5 == 0 and not reset:
savestate((page, cache))
except (Exception, HTTPError, KeyboardInterrupt, JSONDecodeError):
if not reset:
savestate((page, cache))
def main() -> None:
if len(argv) != 3:
raise ValueError(f"{argv[0]} <filename> <sous-url>")
filename = argv[1]
suburl = argv[2]
scraper: Scraper = Scraper()
scraper.getvins(suburl, filename)
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"ERREUR: {e}")

View File

@@ -1,64 +0,0 @@
import pandas as pd
import pytest
from pandas import DataFrame
from cleaning import (
SCORE_COLS,
drop_empty_appellation,
mean_score,
fill_missing_scores,
encode_appellation,
)
@pytest.fixture
def df_raw() -> DataFrame:
return pd.DataFrame({
"Appellation": ["Pauillac", "Pauillac ", "Margaux", None, "Pomerol", "Pomerol"],
"Robert": ["95", None, "bad", 90, None, None],
"Robinson": [None, "93", 18, None, None, None],
"Suckling": [96, None, None, None, 91, None],
"Prix": ["10.0", "11.0", "20.0", "30.0", "40.0", "50.0"],
})
def test_drop_empty_appellation(df_raw: DataFrame):
out = drop_empty_appellation(df_raw)
assert out["Appellation"].isna().sum() == 0
assert len(out) == 5
def test_mean_score_zero_when_no_scores(df_raw: DataFrame):
out = drop_empty_appellation(df_raw)
m = mean_score(out, "Robert")
assert list(m.columns) == ["Appellation", "mean_Robert"]
# Pomerol n'a aucune note Robert => moyenne doit être 0
pomerol_mean = m.loc[m["Appellation"].str.strip() == "Pomerol", "mean_Robert"].iloc[0]
assert pomerol_mean == 0
def test_fill_missing_scores(df_raw: DataFrame):
out = drop_empty_appellation(df_raw)
filled = fill_missing_scores(out)
# plus de NaN dans les colonnes de scores
for col in SCORE_COLS:
assert filled[col].isna().sum() == 0
assert filled.loc[1, "Robert"] == 95.0
# pas de colonnes temporaires mean_*
for col in SCORE_COLS:
assert f"mean_{col}" not in filled.columns
def test_encode_appellation(df_raw: DataFrame):
out = drop_empty_appellation(df_raw)
filled = fill_missing_scores(out)
encoded = encode_appellation(filled)
# la colonne texte disparaît
assert "Appellation" not in encoded.columns
assert "Pauillac" in encoded.columns
assert encoded.loc[0, "Pauillac"] == 1

View File

67
tests/test_cleaning.py Executable file
View File

@@ -0,0 +1,67 @@
import pytest
from unittest.mock import patch, mock_open
from cleaning import Cleaning
@pytest.fixture
def cleaning_raw() -> Cleaning:
"""
"Appellation": ["Pauillac", "Pauillac ", "Margaux", None , "Pomerol", "Pomerol"],
"Robert": ["95" , None , "bad" , 90 , None , None ],
"Robinson": [None , "93" , 18 , None , None , None ],
"Suckling": [96 , None , None , None , 91 , None ],
"Prix": ["10.0" , "11.0" , "20.0" , "30.0", "40.0" , "50.0" ],
"""
csv_content = """Appellation,Robert,Robinson,Suckling,Prix
Pauillac,95,,96,10.0
Pauillac ,,93,,11.0
Margaux,bad,18,,20.0
,90,,,30.0
Pomerol,,,91,40.0
Pomerol,,,,50.0
"""
m = mock_open(read_data=csv_content)
with patch("builtins.open", m):
return Cleaning("donnee.csv")
def test_drop_empty_appellation(cleaning_raw: Cleaning) -> None:
out = cleaning_raw.drop_empty_appellation().getVins()
assert out["Appellation"].isna().sum() == 0
assert len(out) == 5
def test_mean_score_zero_when_no_scores(cleaning_raw: Cleaning) -> None:
out = cleaning_raw.drop_empty_appellation()
m = out._mean_score("Robert")
assert list(m.columns) == ["Appellation", "mean_Robert"]
pomerol_mean = m.loc[m["Appellation"].str.strip() == "Pomerol", "mean_Robert"].iloc[
0
]
assert pomerol_mean == 0
def test_fill_missing_scores(cleaning_raw: Cleaning):
cleaning_raw._vins["Appellation"] = cleaning_raw._vins["Appellation"].str.strip()
cleaning_raw.drop_empty_appellation()
filled = cleaning_raw.fill_missing_scores().getVins()
for col in cleaning_raw.SCORE_COLS:
assert filled[col].isna().sum() == 0
pauillac_robert = filled[filled["Appellation"] == "Pauillac"]["Robert"]
assert (pauillac_robert == 95.0).all()
def test_encode_appellation(cleaning_raw: Cleaning):
cleaning_raw._vins["Appellation"] = cleaning_raw._vins["Appellation"].str.strip()
out = (
cleaning_raw.drop_empty_appellation()
.fill_missing_scores()
.encode_appellation()
.getVins()
)
assert "App_Appellation" not in out.columns
assert "App_Pauillac" in out.columns
assert int(out.loc[0, "App_Pauillac"]) == 1

10
test_scraper.py → tests/test_scraper.py Normal file → Executable file
View File

@@ -153,7 +153,7 @@ def mock_site():
html_product = f"""
<html>
<body>
<body>
<h1>MILLESIMA</h1>
<script id="__NEXT_DATA__" type="application/json">
{dumps(json_data)}
@@ -168,7 +168,7 @@ def mock_site():
html_product = f"""
<html>
<body>
<body>
<h1>MILLESIMA</h1>
<script id="__NEXT_DATA__" type="application/json">
{dumps(json_data)}
@@ -179,7 +179,7 @@ def mock_site():
list_pleine = f"""
<html>
<body>
<body>
<h1>LE WINE</h1>
<script id="__NEXT_DATA__" type="application/json">
{dumps({
@@ -207,7 +207,7 @@ def mock_site():
list_vide = f"""
<html>
<body>
<body>
<h1>LE WINE</h1>
<script id="__NEXT_DATA__" type="application/json">
{dumps({
@@ -319,7 +319,7 @@ def test_informations(scraper: Scraper):
def test_search(scraper: Scraper):
m = mock_open()
with patch("builtins.open", m):
scraper.getvins("wine.html", "fake_file.csv")
scraper.getvins("wine.html", "fake_file.csv", True)
assert m().write.called
all_writes = "".join(call.args[0] for call in m().write.call_args_list)