27 Commits

Author SHA1 Message Date
888defb6b6 retravail: changement documentation et suppression version anglaise 2026-03-09 19:10:06 +01:00
734e3898e9 ajout: commencement écriture README.md 2026-03-09 14:35:57 +01:00
4bb3112dd0 ajout: creation de fichier out.csv main et ajout README 2026-03-09 14:16:05 +01:00
54e4b7860b ajout: 3e jalon pdf 2026-03-06 22:54:10 +01:00
b865a59aba ajout: cleaning.md et modification scraper.md 2026-03-06 21:56:51 +01:00
Loïc GUEZO
fde1f36148 Enhance workflow with Python setup and docs build
Added Python setup and documentation build steps to workflow.
2026-03-06 21:41:34 +01:00
6fbb36ea37 Merge branch 'jalon2-loic' of https://github.com/guezoloic/millesima_projetS6 2026-03-06 21:36:48 +01:00
4b3c3c26e8 ajout: ajout prefixe get_dummies 2026-03-06 21:34:34 +01:00
Loïc GUEZO
bcacd7a915 Merge pull request #11 from guezoloic/jalon2-loic
Jalon2 loic
2026-03-06 21:09:52 +01:00
de1d325fb7 fix: enlever la generation de page 2026-03-06 21:08:31 +01:00
f4ded6d8b5 ajout: correction d'erreur, changement de main dans cleaning 2026-03-06 21:02:52 +01:00
acf4ddd881 ajout: restructuration de la cleaning 2026-03-06 17:56:07 +01:00
69b8b4ce1f ajout: restructuration du code 2026-03-05 22:06:00 +01:00
8047b06253 fix: sauvegarde toute les 5 pages 2026-03-05 20:00:23 +01:00
5303d36988 Merge branch 'jalon2_Chahrazad' of https://github.com/guezoloic/millesima_projetS6 into jalon2-loic 2026-03-05 18:59:38 +01:00
Loïc GUEZO
d182e08f9b Merge pull request #10 from guezoloic/jalon2-loic
Jalon2 loic
2026-03-04 12:52:47 +01:00
0d96ff7714 ajout: commentaire getvins 2026-03-04 12:51:10 +01:00
ebd9d15f77 fix: enlever custom name save 2026-03-04 12:48:54 +01:00
8a888f583c fix: modification exception 2026-03-04 12:43:57 +01:00
Chahrazad650
cefdb94dd5 ajout : aout des tests test_cleaning.py 2026-03-03 04:18:30 +01:00
Chahrazad650
06097c257e ajout : remplacer appellation par les colonnes indicatrices 2026-03-03 03:26:58 +01:00
Chahrazad650
b0eb5df07e ajout : remplac les notes manquantes par la moyenne de l'appellation 2026-03-03 03:18:35 +01:00
e6c649b433 ajout: ajout factorisation vin et meilleure barre 2026-03-02 21:42:23 +01:00
3619890dc4 ajout: systeme de cache pour eviter recommencer 2026-03-02 18:30:26 +01:00
123c43aa05 ajout: documentation 2026-03-01 22:35:30 +01:00
a163e7687f ajout: modification dependances et ajout documentation 2026-03-01 20:37:16 +01:00
0f6eb856c6 ajout: restructuration des fichiers et modifications scraper 2026-03-01 19:39:57 +01:00
18 changed files with 913 additions and 574 deletions

View File

@@ -10,30 +10,29 @@ on:
branches: ["main"]
permissions:
contents: read
contents: write
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python 3.10
uses: actions/setup-python@v3
uses: actions/setup-python@v4
with:
python-version: "3.10"
- name: Install dependencies
- name: install dependencies
run: |
python -m pip install --upgrade pip
pip install flake8 pytest
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
pip install ".[test,doc]"
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Test with pytest
run: |
pytest
run: pytest

58
.github/workflows/static.yml vendored Normal file
View File

@@ -0,0 +1,58 @@
# Simple workflow for deploying static content to GitHub Pages
name: Deploy static content to Pages
on:
# Runs on pushes targeting the default branch
push:
branches: ["main"]
# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:
# Sets permissions of the GITHUB_TOKEN to allow deployment to GitHub Pages
permissions:
contents: read
pages: write
id-token: write
# Allow only one concurrent deployment, skipping runs queued between the run in-progress and latest queued.
# However, do NOT cancel in-progress runs as we want to allow these production deployments to complete.
concurrency:
group: "pages"
cancel-in-progress: false
jobs:
# Single deploy job since we're just deploying
deploy:
environment:
name: github-pages
url: ${{ steps.deployment.outputs.page_url }}
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Set up Python 3.10
uses: actions/setup-python@v5
with:
python-version: '3.10'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
# Installe le projet en mode éditable avec les extras de doc
pip install -e ".[doc]"
- name: Setup Pages
uses: actions/configure-pages@v5
- name: Build Documentation
run: mkdocs build
- name: Upload artifact
uses: actions/upload-pages-artifact@v3
with:
# Upload entire repository
path: './site'
- name: Deploy to GitHub Pages
id: deployment
uses: actions/deploy-pages@v4

View File

@@ -1 +1,37 @@
# millesima_projetS6
# Millesima AI Engine 🍷
> A **University of Paris-Est Créteil (UPEC)** Semester 6 project.
## Documentation
- 🇫🇷 [Version Française](https://guezoloic.github.io/millesima-ai-engine)
> note: only french version enabled for now.
---
## Installation
> Make sure you have **Python 3.10+** installed.
1. **Clone the repository:**
```bash
git clone https://github.com/votre-pseudo/millesima-ai-engine.git
cd millesima-ai-engine
```
2. **Set up a virtual environment:**
```bash
python3 -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
```
3. **Install dependencies:**
```bash
pip install -e .
```
## Usage
### 1. Data Extraction (Scraping)
To fetch the latest wine data from Millesima:
```bash
python3 src/scraper.py
```
> Note: that will take some time to fetch all data depending on the catalog size.

View File

@@ -1,48 +0,0 @@
#!/usr/bin/env python3
from pandas import DataFrame, to_numeric
def display_info(df: DataFrame) -> None:
print(df.all())
print(df.info())
print("\nNombre de valeurs manquantes par colonne :")
print(df.isna().sum())
def drop_empty_appellation(df: DataFrame) -> DataFrame:
return df.dropna(subset=["Appellation"])
def mean_score(df: DataFrame, col: str) -> DataFrame:
"""
Calcule la moyenne d'une colonne de score par appellation.
- Convertit les valeurs en numériques, en remplaçant les non-convertibles par NaN
- Calcule la moyenne par appellation
- Remplace les NaN résultants par 0
"""
tmp = df[["Appellation", col]].copy()
tmp[col] = to_numeric(tmp[col], errors="coerce")
# moyenne par appellation
means = tmp.groupby("Appellation", as_index=False)[col].mean()
means[col] = means[col].fillna(0)
means = means.rename(columns={col: f"mean_{col}"})
return means
def mean_robert(df: DataFrame) -> DataFrame:
return mean_score(df, "Robert")
def mean_robinson(df: DataFrame) -> DataFrame:
return mean_score(df, "Robinson")
def mean_suckling(df: DataFrame) -> DataFrame:
return mean_score(df, "Suckling")

17
docs/cleaning.md Normal file
View File

@@ -0,0 +1,17 @@
# Cleaning
## Sommaire
[TOC]
---
## Classe `Cleaning`
::: src.cleaning.Cleaning
options:
heading_level: 3
members:
- __init__
- getVins
- drop_empty_appellation
- fill_missing_scores
- encode_appellation

16
docs/index.md Normal file
View File

@@ -0,0 +1,16 @@
# Millesima
Lobjectif de ce projet est détudier, en utilisant des méthodes dapprentissage automatique, limpact de différents critères (notes des critiques, appelation) sur le prix dun vin. Pour ce faire, on sappuiera sur le site Millesima (https://www.millesima.fr/), qui a lavantage de ne pas posséder de protection contre les bots. Par respect pour lhébergeur du site, on veillera à limiter au maximum le nombre de requêtes. En particulier, on sassurera davoir un code fonctionnel avant de scraper lintégralité du site, pour éviter les répétitions.
## projet
<div style="text-align: center;">
<object
data="/millesima-ai-engine/projet.pdf"
type="application/pdf"
width="100%"
height="1000px"
>
<p>Votre navigateur ne peut pas afficher ce PDF.
<a href="/millesima-ai-engine/projet.pdf">Cliquez ici pour le télécharger.</a></p>
</object>
</div>

Binary file not shown.

31
docs/scraper.md Normal file
View File

@@ -0,0 +1,31 @@
# Scraper
## Sommaire
[TOC]
---
## Classe `Scraper`
::: scraper.Scraper
options:
members:
- __init__
- getvins
- getjsondata
- getresponse
- getsoup
heading_level: 4
## Classe `_ScraperData`
::: scraper._ScraperData
options:
members:
- __init__
- getdata
- appellation
- parker
- robinson
- suckling
- prix
- informations
heading_level: 4

60
main.py
View File

@@ -1,60 +0,0 @@
#!/usr/bin/env python3
from os import getcwd
from os.path import normpath, join
from sys import argv
from pandas import read_csv, DataFrame
from cleaning import (display_info,
drop_empty_appellation,
mean_robert,
mean_robinson,
mean_suckling)
def load_csv(filename: str) -> DataFrame:
path: str = normpath(join(getcwd(), filename))
return read_csv(path)
def save_csv(df: DataFrame, out_filename: str) -> None:
df.to_csv(out_filename, index=False)
def main() -> None:
if len(argv) != 2:
raise ValueError(f"Usage: {argv[0]} <filename.csv>")
df = load_csv(argv[1])
print("=== Avant nettoyage ===")
display_info(df)
df = drop_empty_appellation(df)
save_csv(df, "donnee_clean.csv")
print("\n=== Après nettoyage d'appellations manquantes ===")
display_info(df)
#la moyenne des notes des vins pour chaque appellation
robert_means = mean_robert(df)
save_csv(robert_means, "mean_robert_by_appellation.csv")
print("\n=== moyenne Robert par appellation ===")
print(robert_means.head(10))
robinson_means = mean_robinson(df)
save_csv(robinson_means, "mean_robinson_by_appellation.csv")
print("\n===: moyennes Robinson par appellation ===")
print(robinson_means.head(10))
suckling_means = mean_suckling(df)
save_csv(suckling_means, "mean_suckling_by_appellation.csv")
print("\n===: moyennes Suckling par appellation ===")
print(suckling_means.head(10))
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"ERREUR: {e}")

20
mkdocs.yml Normal file
View File

@@ -0,0 +1,20 @@
site_name: "Projet Millesima S6"
site_url: "https://github.guezoloic.com/millesima-ai-engine/"
theme:
name: "material"
plugins:
- search
- mkdocstrings
extra:
generator: false
copyright: "Loïc GUEZO & Chahrazad DAHMANI UPEC S6 2026"
markdown_extensions:
- admonition
- pymdownx.details
- pymdownx.superfences
- pymdownx.tabbed

17
pyproject.toml Normal file
View File

@@ -0,0 +1,17 @@
[project]
name = "projet-millesima-s6"
version = "0.1.0"
dependencies = [
"requests==2.32.5",
"beautifulsoup4==4.14.3",
"pandas==2.3.3",
"tqdm==4.67.3",
]
[project.optional-dependencies]
test = ["pytest==8.4.2", "requests-mock==1.12.1", "flake8==7.3.0"]
doc = ["mkdocs<2.0.0", "mkdocs-material==9.6.23", "mkdocstrings[python]"]
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"

View File

@@ -1,6 +0,0 @@
requests==2.32.5
requests-mock==1.12.1
beautifulsoup4==4.14.3
pytest==8.4.2
requests-mock==1.12.1
pandas==2.3.3

View File

@@ -1,431 +0,0 @@
#!/usr/bin/env python3
from sys import argv
from typing import cast
from requests import HTTPError, Response, Session
from requests.exceptions import Timeout, ConnectionError
import time
from bs4 import BeautifulSoup, Tag
from collections import OrderedDict
from json import JSONDecodeError, loads
from pathlib import Path
class _ScraperData:
"""_summary_"""
def __init__(self, data: dict[str, object]) -> None:
"""_summary_
Args:
data (dict[str, object]): _description_
"""
self._data: dict[str, object] = data
def _getcontent(self) -> dict[str, object] | None:
"""_summary_
Returns:
dict[str, object]: _description_
"""
current_data: dict[str, object] = self._data
for key in ["initialReduxState", "product", "content"]:
new_data: object | None = current_data.get(key)
if new_data is None:
return None
current_data: dict[str, object] = cast(dict[str, object], new_data)
return current_data
def _getattributes(self) -> dict[str, object] | None:
"""_summary_
Returns:
dict[str, object]: _description_
"""
current_data: object = self._getcontent()
if current_data is None:
return None
return cast(dict[str, object], current_data.get("attributes"))
def prix(self) -> float | None:
"""
Retourne le prix unitaire d'une bouteille (75cl).
Si aucun prix n'est disponible, retourne None.
"""
content = self._getcontent()
if content is None:
return None
items = content.get("items")
# Vérifie que items existe et n'est pas vide
if not isinstance(items, list) or len(items) == 0:
return None
prix_calcule: float | None = None
for item in items:
if not isinstance(item, dict):
continue
p = item.get("offerPrice")
attrs = item.get("attributes", {})
nbunit = attrs.get("nbunit", {}).get("value")
equivbtl = attrs.get("equivbtl", {}).get("value")
if not isinstance(p, (int, float)) or not nbunit or not equivbtl:
continue
nb = float(nbunit)
eq = float(equivbtl)
if nb <= 0 or eq <= 0:
continue
if nb == 1 and eq == 1:
return float(p)
prix_calcule = round(float(p) / (nb * eq), 2)
return prix_calcule
def appellation(self) -> str | None:
"""_summary_
Returns:
str: _description_
"""
attrs: dict[str, object] | None = self._getattributes()
if attrs is not None:
app_dict: object | None = attrs.get("appellation")
if isinstance(app_dict, dict):
return cast(str, app_dict.get("value"))
return None
def _getcritiques(self, name: str) -> str | None:
"""_summary_
Args:
name (str): _description_
Returns:
str | None: _description_
"""
current_value: dict[str, object] | None = self._getattributes()
if current_value is not None:
app_dict: dict[str, object] = cast(
dict[str, object], current_value.get(name)
)
if not app_dict:
return None
val = cast(str, app_dict.get("value")).rstrip("+").split("-")
if len(val) > 1 and val[1] != "":
val[0] = str(round((float(val[0]) + float(val[1])) / 2, 1))
return val[0]
return None
def parker(self) -> str | None:
return self._getcritiques("note_rp")
def robinson(self) -> str | None:
return self._getcritiques("note_jr")
def suckling(self) -> str | None:
return self._getcritiques("note_js")
def getdata(self) -> dict[str, object]:
return self._data
def informations(self) -> str:
"""
Retourne toutes les informations sous la forme :
"Appelation,Parker,J.Robinson,J.Suckling,Prix"
"""
appellation = self.appellation()
parker = self.parker()
robinson = self.robinson()
suckling = self.suckling()
prix = self.prix()
return f"{appellation},{parker},{robinson},{suckling},{prix}"
class Scraper:
"""
Scraper est une classe qui permet de gerer
de façon dynamique des requetes uniquement
sur le serveur https de Millesima
"""
def __init__(self) -> None:
"""
Initialise la session de scraping.
"""
self._url: str = "https://www.millesima.fr/"
# Très utile pour éviter de renvoyer toujours les mêmes handshake
# TCP et d'avoir toujours une connexion constante avec le server
self._session: Session = Session()
self._session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/122.0.0.0 Safari/537.36",
"Accept-Language": "fr-FR,fr;q=0.9,en;q=0.8",
})
# Système de cache pour éviter de solliciter le serveur inutilement
self._latest_request: tuple[(str, Response)] | None = None
self._latest_soups: OrderedDict[str, BeautifulSoup] = OrderedDict[
str, BeautifulSoup
]()
def _request(self, subdir: str) -> Response:
"""
Effectue une requête GET sur le serveur Millesima.
Args:
subdir (str): Le sous-répertoire ou chemin de l'URL (ex: "/vins").
Returns:
Response: L'objet réponse de la requête.
Raise:
HTTPError: Si le serveur renvoie un code d'erreur (4xx, 5xx).
"""
target_url: str = self._url + subdir.lstrip("/")
last_exc: Exception | None = None
for attempt in range(1, 4):
try:
response: Response = self._session.get(url=target_url, timeout=30)
response.raise_for_status()
return response
except (Timeout, ConnectionError) as e:
last_exc = e
print(f"Timeout/ConnectionError ({attempt}/3) sur {target_url}: {e}")
time.sleep(2 * attempt) # 2s, 4s, 6s
# après 3 essais, on abandonne
raise last_exc if last_exc else RuntimeError("Request failed")
def getresponse(self, subdir: str = "", use_cache: bool = True) -> Response:
"""
Récupère la réponse d'une page, en utilisant le cache si possible.
Args:
subdir (str, optional): Le chemin de la page.
use_cache (bool, optional): Utilise la donnée deja sauvegarder ou
écrase la donnée utilisé avec la nouvelle
Returns:
Response: L'objet réponse (cache ou nouvelle requête).
Raise:
HTTPError: Si le serveur renvoie un code d'erreur (4xx, 5xx).
"""
# si dans le cache, latest_request existe
if use_cache and self._latest_request is not None:
rq_subdir, rq_response = self._latest_request
# si c'est la meme requete et que use_cache est true,
# on renvoie celle enregistrer
if subdir == rq_subdir:
return rq_response
request: Response = self._request(subdir)
# on recrée la structure pour le systeme de cache si activer
if use_cache:
self._latest_request = (subdir, request)
return request
def getsoup(self, subdir: str, use_cache: bool = True) -> BeautifulSoup:
"""
Récupère le contenu HTML d'une page et le transforme en objet BeautifulSoup.
Args:
subdir (str, optional): Le chemin de la page.
Returns:
BeautifulSoup: L'objet parsé pour extraction de données.
Raise:
HTTPError: Si le serveur renvoie un code d'erreur (4xx, 5xx).
"""
if use_cache and subdir in self._latest_soups:
return self._latest_soups[subdir]
markup: str = self.getresponse(subdir).text
soup: BeautifulSoup = BeautifulSoup(markup, features="html.parser")
if use_cache:
self._latest_soups[subdir] = soup
if len(self._latest_soups) > 10:
_ = self._latest_soups.popitem(last=False)
return soup
def getjsondata(self, subdir: str, id: str = "__NEXT_DATA__") -> _ScraperData:
"""
Extrait les données JSON contenues dans la balise __NEXT_DATA__ du site.
Beaucoup de sites modernes (Next.js) stockent leur état initial dans
une balise <script> pour l'hydratation côté client.
Args:
subdir (str): Le chemin de la page.
id (str, optional): L'identifiant de la balise script (par défaut __NEXT_DATA__).
Raises:
HTTPError: Soulevée par `getresponse` si le serveur renvoie un code d'erreur (4xx, 5xx).
JSONDecodeError: Soulevée par `loads` si le contenu de la balise n'est pas un JSON valide.
ValueError: Soulevée manuellement si l'une des clés attendues (props, pageProps, etc.)
est absente de la structure JSON.
Returns:
dict[str, object]: Un dictionnaire contenant les données utiles
ou un dictionnaire vide en cas d'erreur.
"""
soup: BeautifulSoup = self.getsoup(subdir)
script: Tag | None = soup.find("script", id=id)
if script is None or not script.string:
raise ValueError(f"le script id={id} est introuvable")
current_data: object = cast(object, loads(script.string))
for key in ["props", "pageProps"]:
if isinstance(current_data, dict) and key in current_data:
current_data = cast(object, current_data[key])
continue
raise ValueError(f"Clé manquante dans le JSON : {key}")
return _ScraperData(cast(dict[str, object], current_data))
def _geturlproductslist(self, subdir: str):
"""_summary_
Args:
subdir (str): _description_
Returns:
_type_: _description_
"""
try:
data: dict[str, object] = self.getjsondata(subdir).getdata()
for element in ["initialReduxState", "categ", "content"]:
data: dict[str, object] = cast(dict[str, object], data.get(element))
if not isinstance(data, dict):
return None
products: list[str] = cast(list[str], data.get("products"))
if isinstance(products, list):
return products
except (JSONDecodeError, HTTPError):
return None
def _save_progress(self, page: int, i: int, last_link: str) -> None:
Path("progress.txt").write_text(f"{page},{i},{last_link}", encoding="utf-8")
def _load_progress(self) -> tuple[int, int, str | None]:
p = Path("progress.txt")
if not p.exists():
return (1, 0, None)
try:
parts = p.read_text(encoding="utf-8").strip().split(",", 2)
page = int(parts[0])
i = int(parts[1])
last_link = parts[2] if len(parts) == 3 and parts[2] != "" else None
return (page, i, last_link)
except Exception:
return (1, 0, None)
def getvins(self, subdir: str, filename: str):
"""_summary_
Args:
subdir (str): _description_
filename (str): _description_
"""
start_page, start_i, last_link = self._load_progress()
print(f"__INFO__ Reprise à page={start_page}, index={start_i}, last_link={last_link}")
with open(filename, "a", encoding="utf-8") as f:
cache: set[str] = set[str]()
if f.tell() == 0:
_ = f.write("Appellation,Robert,Robinson,Suckling,Prix\n")
page = start_page - 1
while True:
page += 1
products_list = self._geturlproductslist(f"{subdir}?page={page}")
if not products_list:
break
products_list_length = len(products_list)
start_at = start_i if page == start_page else 0
for i in range(start_at, products_list_length):
product = products_list[i]
if not isinstance(product, dict):
continue
link = product.get("seoKeyword")
if not link:
continue
# pour eviter les doublons :
if (page == start_page) and (last_link is not None) and (link == last_link):
self._save_progress(page, + 1, link)
continue
self._save_progress(page, i + 1, link)
if link in cache:
continue
try:
infos = self.getjsondata(link).informations()
_ = f.write(infos + "\n")
print(f"page: {page} | {i + 1}/{products_list_length} {link}")
cache.add(link)
except (JSONDecodeError, HTTPError) as e:
print(f"Erreur sur le produit {link}: {e}")
f.flush()
Path("progress.txt").unlink(missing_ok=True)
def main() -> None:
if len(argv) != 2:
raise ValueError(f"{argv[0]} <sous-url>")
scraper: Scraper = Scraper()
scraper.getvins(argv[1], "donnee.csv")
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"ERREUR: {e}")

113
src/cleaning.py Executable file
View File

@@ -0,0 +1,113 @@
#!/usr/bin/env python3
from os import getcwd
from os.path import normpath, join
from typing import cast
from pandas import DataFrame, read_csv, to_numeric, get_dummies
from sys import argv
def path_filename(filename: str) -> str:
return normpath(join(getcwd(), filename))
class Cleaning:
def __init__(self, filename) -> None:
self._vins: DataFrame = read_csv(filename)
# créer la liste de tout les scores
self.SCORE_COLS: list[str] = [
c for c in self._vins.columns if c not in ["Appellation", "Prix"]
]
# transforme tout les colonnes score en numérique
for col in self.SCORE_COLS:
self._vins[col] = to_numeric(self._vins[col], errors="coerce")
def getVins(self) -> DataFrame:
return self._vins.copy(deep=True)
def __str__(self) -> str:
"""
Affiche un résumé du DataFrame
- la taille
- types des colonnes
- valeurs manquantes
- statistiques numériques
"""
return (
f"Shape : {self._vins.shape[0]} lignes x {self._vins.shape[1]} colonnes\n\n"
f"Types des colonnes :\n{self._vins.dtypes}\n\n"
f"Valeurs manquantes :\n{self._vins.isna().sum()}\n\n"
f"Statistiques numériques :\n{self._vins.describe().round(2)}\n\n"
)
def drop_empty_appellation(self) -> "Cleaning":
self._vins = self._vins.dropna(subset=["Appellation"])
return self
def _mean_score(self, col: str) -> DataFrame:
"""
Calcule la moyenne d'une colonne de score par appellation.
- Convertit les valeurs en numériques, en remplaçant les non-convertibles par NaN
- Calcule la moyenne par appellation
- Remplace les NaN résultants par 0
"""
means = self._vins.groupby("Appellation", as_index=False)[col].mean()
means = means.rename(
columns={col: f"mean_{col}"}
) # pyright: ignore[reportCallIssue]
return cast(DataFrame, means.fillna(0))
def _mean_robert(self) -> DataFrame:
return self._mean_score("Robert")
def _mean_robinson(self) -> DataFrame:
return self._mean_score("Robinson")
def _mean_suckling(self) -> DataFrame:
return self._mean_score("Suckling")
def fill_missing_scores(self) -> "Cleaning":
"""
Remplacer les notes manquantes par la moyenne
des vins de la même appellation.
"""
for element in self.SCORE_COLS:
means = self._mean_score(element)
self._vins = self._vins.merge(means, on="Appellation", how="left")
mean_col = f"mean_{element}"
self._vins[element] = self._vins[element].fillna(self._vins[mean_col])
self._vins = self._vins.drop(columns=["mean_" + element])
return self
def encode_appellation(self, column: str = "Appellation") -> "Cleaning":
"""
Remplace la colonne 'Appellation' par des colonnes indicatrices
"""
appellations = self._vins[column].astype(str).str.strip()
appellation_dummies = get_dummies(appellations, prefix="App")
self._vins = self._vins.drop(columns=[column])
self._vins = self._vins.join(appellation_dummies)
return self
def main() -> None:
if len(argv) != 2:
raise ValueError(f"Usage: {argv[0]} <filename.csv>")
filename = argv[1]
cleaning: Cleaning = Cleaning(filename)
cleaning.drop_empty_appellation() \
.fill_missing_scores() \
.encode_appellation() \
.getVins() \
.to_csv("clean.csv", index=False)
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"ERREUR: {e}")

510
src/scraper.py Executable file
View File

@@ -0,0 +1,510 @@
#!/usr/bin/env python3
from collections import OrderedDict
from io import SEEK_END, SEEK_SET, BufferedWriter, TextIOWrapper
from json import JSONDecodeError, loads
from os import makedirs
from os.path import dirname, exists, join, normpath, realpath
from pickle import UnpicklingError, dump, load
from sys import argv
from tqdm.std import tqdm
from typing import Any, Callable, Literal, TypeVar, cast
from bs4 import BeautifulSoup, Tag
from requests import HTTPError, Response, Session
_dir: str = dirname(realpath(__name__))
T = TypeVar("T")
def _getcache(mode: Literal["rb", "wb"], fn: Callable[[Any], T]) -> T | None:
"""_summary_
Returns:
_type_: _description_
"""
cache_dirname = normpath(join(_dir, ".cache"))
save_path = normpath(join(cache_dirname, "save"))
if not exists(cache_dirname):
makedirs(cache_dirname)
try:
with open(save_path, mode) as f:
return fn(f)
except (FileNotFoundError, EOFError, UnpicklingError):
return None
def savestate(data: tuple[int, set[str]]) -> None:
def save(f: BufferedWriter) -> None:
_ = f.seek(0)
_ = f.truncate()
dump(data, f)
f.flush()
_getcache("wb", save)
def loadstate() -> tuple[int, set[str]] | None:
return _getcache("rb", lambda f: load(f))
class _ScraperData:
"""
Conteneur de données spécialisé pour extraire les informations des dictionnaires JSON.
Cette classe agit comme une interface simplifiée au-dessus du dictionnaire brut
renvoyé par la balise __NEXT_DATA__ du site Millesima.
"""
def __init__(self, data: dict[str, object]) -> None:
"""
Initialise le conteneur avec un dictionnaire de données.
Args:
data (dict[str, object]): Le dictionnaire JSON brut extrait de la page.
"""
self._data: dict[str, object] = data
def _getcontent(self) -> dict[str, object] | None:
"""
Navigue dans l'arborescence Redux pour atteindre le contenu du produit.
Returns:
dict[str, object] | None: Le dictionnaire du produit ou None si la structure diffère.
"""
current_data: dict[str, object] = self._data
for key in ["initialReduxState", "product", "content"]:
new_data: object | None = current_data.get(key)
if new_data is None:
return None
current_data: dict[str, object] = cast(dict[str, object], new_data)
return current_data
def _getattributes(self) -> dict[str, object] | None:
"""
Extrait les attributs techniques (notes, appellations, etc.) du produit.
Returns:
dict[str, object] | None: Les attributs du vin ou None.
"""
current_data: object = self._getcontent()
if current_data is None:
return None
return cast(dict[str, object], current_data.get("attributes"))
def prix(self) -> float | None:
"""
Calcule le prix unitaire d'une bouteille (standardisée à 75cl).
Le site vend souvent par caisses (6, 12 bouteilles) ou formats (Magnum).
Cette méthode normalise le prix pour obtenir celui d'une seule unité.
Returns:
float | None: Le prix calculé arrondi à 2 décimales, ou None.
"""
content = self._getcontent()
if content is None:
return None
items = content.get("items")
# Vérifie que items existe et n'est pas vide
if not isinstance(items, list) or len(items) == 0:
return None
prix_calcule: float | None = None
for item in items:
if not isinstance(item, dict):
continue
p = item.get("offerPrice")
attrs = item.get("attributes", {})
nbunit = attrs.get("nbunit", {}).get("value")
equivbtl = attrs.get("equivbtl", {}).get("value")
if not isinstance(p, (int, float)) or not nbunit or not equivbtl:
continue
nb = float(nbunit)
eq = float(equivbtl)
if nb <= 0 or eq <= 0:
continue
if nb == 1 and eq == 1:
return float(p)
prix_calcule = round(float(p) / (nb * eq), 2)
return prix_calcule
def appellation(self) -> str | None:
"""
Extrait le nom de l'appellation du vin.
Returns:
str | None: Le nom (ex: 'Pauillac') ou None.
"""
attrs: dict[str, object] | None = self._getattributes()
if attrs is not None:
app_dict: object | None = attrs.get("appellation")
if isinstance(app_dict, dict):
return cast(str, app_dict.get("value"))
return None
def _getcritiques(self, name: str) -> str | None:
"""
Méthode générique pour parser les notes des critiques (Parker, Suckling, etc.).
Gère les notes simples ("95") et les plages de notes ("95-97") en faisant la moyenne.
Args:
name (str): La clé de l'attribut dans le JSON (ex: 'note_rp').
Returns:
str | None: La note formatée en chaîne de caractères ou None.
"""
current_value: dict[str, object] | None = self._getattributes()
if current_value is not None:
app_dict: dict[str, object] = cast(
dict[str, object], current_value.get(name)
)
if not app_dict:
return None
val = cast(str, app_dict.get("value")).rstrip("+").split("-")
if len(val) > 1 and val[1] != "":
val[0] = str(round((float(val[0]) + float(val[1])) / 2, 1))
return val[0]
return None
def parker(self) -> str | None:
"""Note Robert Parker."""
return self._getcritiques("note_rp")
def robinson(self) -> str | None:
"""Note Jancis Robinson."""
return self._getcritiques("note_jr")
def suckling(self) -> str | None:
"""Note James Suckling."""
return self._getcritiques("note_js")
def getdata(self) -> dict[str, object]:
"""Retourne le dictionnaire de données complet."""
return self._data
def informations(self) -> str:
"""
Agrège les données clés pour l'export CSV.
Returns:
str: Ligne formatée : "Appellation,Parker,Robinson,Suckling,Prix".
"""
appellation = self.appellation()
parker = self.parker()
robinson = self.robinson()
suckling = self.suckling()
prix = self.prix()
prix = self.prix()
return f"{appellation},{parker},{robinson},{suckling},{prix}"
class Scraper:
"""
Client HTTP optimisé pour le scraping de millesima.fr.
Gère la session persistante, les headers de navigation et un cache double
pour optimiser les performances et la discrétion.
"""
def __init__(self) -> None:
"""
Initialise l'infrastructure de navigation:
- créer une session pour éviter de faire un handshake pour chaque requête
- ajout d'un header pour éviter le blocage de l'accès au site
- ajout d'un système de cache
"""
self._url: str = "https://www.millesima.fr/"
# Très utile pour éviter de renvoyer toujours les mêmes handshake
# TCP et d'avoir toujours une connexion constante avec le server
self._session: Session = Session()
# Crée une "fausse carte d'identité" pour éviter que le site nous
# bloque car on serait des robots
self._session.headers.update(
{
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) \
AppleWebKit/537.36 (KHTML, like Gecko) \
Chrome/122.0.0.0 Safari/537.36",
"Accept-Language": "fr-FR,fr;q=0.9,en;q=0.8",
}
)
# Système de cache pour éviter de solliciter le serveur inutilement
# utilise pour _request
self._latest_request: tuple[(str, Response)] | None = None
# utilise pour getsoup
self._latest_soups: OrderedDict[str, BeautifulSoup] = OrderedDict[
str, BeautifulSoup
]()
def _request(self, subdir: str) -> Response:
"""
Effectue une requête GET sur le serveur Millesima.
Args:
subdir (str): Le sous-répertoire ou chemin de l'URL (ex: "/vins").
Returns:
Response: L'objet réponse de la requête.
Raises:
HTTPError: Si le serveur renvoie un code d'erreur (4xx, 5xx).
"""
target_url: str = self._url + subdir.lstrip("/")
# envoyer une requête GET sur la page si erreur, renvoie un raise
response: Response = self._session.get(url=target_url, timeout=30)
response.raise_for_status()
return response
def getresponse(self, subdir: str = "", use_cache: bool = True) -> Response:
"""
Récupère la réponse d'une page, en utilisant le cache si possible.
Args:
subdir (str, optional): Le chemin de la page.
use_cache (bool, optional): Utilise la donnée deja sauvegarder ou
écrase la donnée utilisé avec la nouvelle
Returns:
Response: L'objet réponse (cache ou nouvelle requête).
Raises:
HTTPError: Si le serveur renvoie un code d'erreur (4xx, 5xx).
"""
# si dans le cache, latest_request existe
if use_cache and self._latest_request is not None:
rq_subdir, rq_response = self._latest_request
# si c'est la meme requete et que use_cache est true,
# on renvoie celle enregistrer
if subdir == rq_subdir:
return rq_response
request: Response = self._request(subdir)
# on recrée la structure pour le systeme de cache si activer
if use_cache:
self._latest_request = (subdir, request)
return request
def getsoup(self, subdir: str, use_cache: bool = True) -> BeautifulSoup:
"""
Récupère le contenu HTML d'une page et le transforme en objet BeautifulSoup.
Args:
subdir (str, optional): Le chemin de la page.
Returns:
BeautifulSoup: L'objet parsé pour extraction de données.
Raises:
HTTPError: Si le serveur renvoie un code d'erreur (4xx, 5xx).
"""
if use_cache and subdir in self._latest_soups:
return self._latest_soups[subdir]
markup: str = self.getresponse(subdir).text
soup: BeautifulSoup = BeautifulSoup(markup, features="html.parser")
if use_cache:
self._latest_soups[subdir] = soup
if len(self._latest_soups) > 10:
_ = self._latest_soups.popitem(last=False)
return soup
def getjsondata(self, subdir: str, id: str = "__NEXT_DATA__") -> _ScraperData:
"""
Extrait les données JSON contenues dans la balise __NEXT_DATA__ du site.
Args:
subdir (str): Le chemin de la page.
id (str, optional): L'identifiant de la balise script.
Raises:
HTTPError: Erreur renvoyée par le serveur (4xx, 5xx).
JSONDecodeError: Si le contenu de la balise n'est pas un JSON valide.
ValueError: Si les clés 'props' ou 'pageProps' sont absentes.
Returns:
_ScraperData: Instance contenant les données extraites.
"""
soup: BeautifulSoup = self.getsoup(subdir)
script: Tag | None = soup.find("script", id=id)
if script is None or not script.string:
raise ValueError(f"le script id={id} est introuvable")
current_data: object = cast(object, loads(script.string))
for key in ["props", "pageProps"]:
if isinstance(current_data, dict) and key in current_data:
current_data = cast(object, current_data[key])
continue
raise ValueError(f"Clé manquante dans le JSON : {key}")
return _ScraperData(cast(dict[str, object], current_data))
def _geturlproductslist(self, subdir: str) -> list[dict[str, Any]] | None:
"""
Récupère la liste des produits d'une page de catégorie.
"""
try:
data: dict[str, object] = self.getjsondata(subdir).getdata()
for element in ["initialReduxState", "categ", "content"]:
data = cast(dict[str, object], data.get(element))
products: list[dict[str, Any]] = cast(
list[dict[str, Any]], data.get("products")
)
return products
except (JSONDecodeError, HTTPError):
return None
def _writevins(self, cache: set[str], product: dict[str, Any], f: Any) -> None:
"""_summary_
Args:
cache (set[str]): _description_
product (dict): _description_
f (Any): _description_
"""
if isinstance(product, dict):
link: Any | None = product.get("seoKeyword")
if link and link not in cache:
try:
infos = self.getjsondata(link).informations()
_ = f.write(infos + "\n")
cache.add(link)
except (JSONDecodeError, HTTPError) as e:
print(f"Erreur sur le produit {link}: {e}")
def _initstate(self, reset: bool) -> tuple[int, set[str]]:
"""
appelle la fonction pour load le cache, si il existe
pas, il utilise les variables de base sinon il override
toute les variables pour continuer et pas recommencer le
processus en entier.
Args:
reset (bool): pouvoir le reset ou pas
Returns:
tuple[int, set[str]]: le contenu de la page et du cache
"""
if not reset:
#
serializable: tuple[int, set[str]] | None = loadstate()
if isinstance(serializable, tuple):
return serializable
return 1, set()
def _ensuretitle(self, f: TextIOWrapper, title: str) -> None:
"""
check si le titre est bien présent au début du buffer
sinon il l'ecrit, petit bug potentiel, a+ ecrit tout le
temps a la fin du buffer, si on a ecrit des choses avant
le titre sera apres ces données mais on part du principe
que personne va toucher le fichier.
Args:
f (TextIOWrapper): buffer stream fichier
title (str): titre du csv
"""
_ = f.seek(0, SEEK_SET)
if not (f.read(len(title)) == title):
_ = f.write(title)
else:
_ = f.seek(0, SEEK_END)
def getvins(self, subdir: str, filename: str, reset: bool = False) -> None:
"""
Scrape toutes les pages d'une catégorie et sauvegarde en CSV.
Args:
subdir (str): La catégorie (ex: '/vins-rouges').
filename (str): Nom du fichier de sortie (ex: 'vins.csv').
reset (bool): (Optionnel) pour réinitialiser le processus.
"""
# mode d'écriture fichier
mode: Literal["w", "a+"] = "w" if reset else "a+"
# titre
title: str = "Appellation,Robert,Robinson,Suckling,Prix\n"
# page: page où commence le scraper
# cache: tout les pages déjà parcourir
page, cache = self._initstate(reset)
try:
with open(filename, mode) as f:
self._ensuretitle(f, title)
while True:
products_list: list[dict[str, Any]] | None = (
self._geturlproductslist(f"{subdir}?page={page}")
)
if not products_list:
break
pbar: tqdm[dict[str, Any]] = tqdm(
products_list, bar_format="{l_bar} {bar:20} {r_bar}"
)
for product in pbar:
keyword: str = cast(
str, product.get("seoKeyword", "Inconnu")[:40]
)
pbar.set_description(
f"Page: {page:<3} | Product: {keyword:<40}"
)
self._writevins(cache, product, f)
page += 1
# va créer un fichier au début et l'override
# tout les 5 pages au cas où SIGHUP ou autre
if page % 5 == 0 and not reset:
savestate((page, cache))
except (Exception, HTTPError, KeyboardInterrupt, JSONDecodeError):
if not reset:
savestate((page, cache))
def main() -> None:
if len(argv) != 3:
raise ValueError(f"{argv[0]} <filename> <sous-url>")
filename = argv[1]
suburl = argv[2]
scraper: Scraper = Scraper()
scraper.getvins(suburl, filename)
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"ERREUR: {e}")

View File

67
tests/test_cleaning.py Executable file
View File

@@ -0,0 +1,67 @@
import pytest
from unittest.mock import patch, mock_open
from cleaning import Cleaning
@pytest.fixture
def cleaning_raw() -> Cleaning:
"""
"Appellation": ["Pauillac", "Pauillac ", "Margaux", None , "Pomerol", "Pomerol"],
"Robert": ["95" , None , "bad" , 90 , None , None ],
"Robinson": [None , "93" , 18 , None , None , None ],
"Suckling": [96 , None , None , None , 91 , None ],
"Prix": ["10.0" , "11.0" , "20.0" , "30.0", "40.0" , "50.0" ],
"""
csv_content = """Appellation,Robert,Robinson,Suckling,Prix
Pauillac,95,,96,10.0
Pauillac ,,93,,11.0
Margaux,bad,18,,20.0
,90,,,30.0
Pomerol,,,91,40.0
Pomerol,,,,50.0
"""
m = mock_open(read_data=csv_content)
with patch("builtins.open", m):
return Cleaning("donnee.csv")
def test_drop_empty_appellation(cleaning_raw: Cleaning) -> None:
out = cleaning_raw.drop_empty_appellation().getVins()
assert out["Appellation"].isna().sum() == 0
assert len(out) == 5
def test_mean_score_zero_when_no_scores(cleaning_raw: Cleaning) -> None:
out = cleaning_raw.drop_empty_appellation()
m = out._mean_score("Robert")
assert list(m.columns) == ["Appellation", "mean_Robert"]
pomerol_mean = m.loc[m["Appellation"].str.strip() == "Pomerol", "mean_Robert"].iloc[
0
]
assert pomerol_mean == 0
def test_fill_missing_scores(cleaning_raw: Cleaning):
cleaning_raw._vins["Appellation"] = cleaning_raw._vins["Appellation"].str.strip()
cleaning_raw.drop_empty_appellation()
filled = cleaning_raw.fill_missing_scores().getVins()
for col in cleaning_raw.SCORE_COLS:
assert filled[col].isna().sum() == 0
pauillac_robert = filled[filled["Appellation"] == "Pauillac"]["Robert"]
assert (pauillac_robert == 95.0).all()
def test_encode_appellation(cleaning_raw: Cleaning):
cleaning_raw._vins["Appellation"] = cleaning_raw._vins["Appellation"].str.strip()
out = (
cleaning_raw.drop_empty_appellation()
.fill_missing_scores()
.encode_appellation()
.getVins()
)
assert "App_Appellation" not in out.columns
assert "App_Pauillac" in out.columns
assert int(out.loc[0, "App_Pauillac"]) == 1

2
test_scraper.py → tests/test_scraper.py Normal file → Executable file
View File

@@ -319,7 +319,7 @@ def test_informations(scraper: Scraper):
def test_search(scraper: Scraper):
m = mock_open()
with patch("builtins.open", m):
scraper.getvins("wine.html", "fake_file.csv")
scraper.getvins("wine.html", "fake_file.csv", True)
assert m().write.called
all_writes = "".join(call.args[0] for call in m().write.call_args_list)