mirror of
https://github.com/guezoloic/millesima-ai-engine.git
synced 2026-03-31 03:01:36 +00:00
fix(main.py): changement donnée data
This commit is contained in:
112
main.py
112
main.py
@@ -1,47 +1,68 @@
|
|||||||
from sys import stderr
|
|
||||||
from typing import cast
|
from typing import cast
|
||||||
from requests import Response, Session
|
from requests import Response, Session
|
||||||
from bs4 import BeautifulSoup, Tag
|
from bs4 import BeautifulSoup, Tag
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
from json import JSONDecodeError, loads
|
from json import loads
|
||||||
|
|
||||||
|
|
||||||
class ScraperData:
|
class _ScraperData:
|
||||||
def __init__(self, data: dict[str, object]) -> None:
|
def __init__(self, data: dict[str, object]) -> None:
|
||||||
if not data:
|
|
||||||
raise ValueError("Données insuffisantes pour créer un ScraperData.")
|
|
||||||
self._data: dict[str, object] = data
|
self._data: dict[str, object] = data
|
||||||
|
|
||||||
def _getattributes(self) -> dict[str, object] | None:
|
def _getcontent(self) -> dict[str, object]:
|
||||||
current_data: object = self._data.get("attributes")
|
"""_summary_
|
||||||
if isinstance(current_data, dict):
|
|
||||||
return cast(dict[str, object], current_data)
|
|
||||||
return None
|
|
||||||
|
|
||||||
def appellation(self) -> str | None:
|
Returns:
|
||||||
current_value: dict[str, object] | None = self._getattributes()
|
dict[str, object]: _description_
|
||||||
if current_value is not None:
|
"""
|
||||||
|
current_data: dict[str, object] = self._data
|
||||||
|
for key in ["initialReduxState", "product", "content"]:
|
||||||
|
current_data = cast(dict[str, object], current_data[key])
|
||||||
|
return current_data
|
||||||
|
|
||||||
|
def _getattributes(self) -> dict[str, object]:
|
||||||
|
"""_summary_
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict[str, object]: _description_
|
||||||
|
"""
|
||||||
|
current_data: object = self._getcontent()["attributes"]
|
||||||
|
return cast(dict[str, object], current_data)
|
||||||
|
|
||||||
|
def appellation(self) -> str:
|
||||||
|
"""_summary_
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: _description_
|
||||||
|
"""
|
||||||
|
current_value: dict[str, object] = self._getattributes()
|
||||||
app_dict: dict[str, object] = cast(
|
app_dict: dict[str, object] = cast(
|
||||||
dict[str, object], current_value.get("appellation")
|
dict[str, object], current_value["appellation"]
|
||||||
)
|
)
|
||||||
if app_dict:
|
return cast(str, app_dict["value"])
|
||||||
return cast(str, app_dict.get("value"))
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _getvin(self, name: str) -> str | None:
|
def _getvin(self, name: str) -> str | None:
|
||||||
current_value: dict[str, object] | None = self._getattributes()
|
"""_summary_
|
||||||
if current_value is not None:
|
|
||||||
app_dict: dict[str, object] = cast(
|
Args:
|
||||||
dict[str, object], current_value.get(name)
|
name (str): _description_
|
||||||
)
|
|
||||||
if app_dict:
|
Returns:
|
||||||
val: list[str] = (
|
str | None: _description_
|
||||||
cast(str, app_dict.get("valueId")).rstrip("+").split("-")
|
"""
|
||||||
|
current_value: dict[str, object] = self._getattributes()
|
||||||
|
app_dict: dict[str, object] | None = cast(
|
||||||
|
dict[str, object] | None, current_value.get(name)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if app_dict is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
val: list[str] = cast(str, app_dict.get("attributes")).rstrip("+").split("-")
|
||||||
|
# dans le cas où 93-94 -> [93, 94] -> 93.5
|
||||||
if len(val) > 1:
|
if len(val) > 1:
|
||||||
val[0] = str((int(val[0]) + int(val[1])) / 2)
|
val[0] = str((int(val[0]) + int(val[1])) / 2)
|
||||||
return val[0]
|
return val[0]
|
||||||
return None
|
|
||||||
|
|
||||||
def parker(self) -> str | None:
|
def parker(self) -> str | None:
|
||||||
return self._getvin("note_rp")
|
return self._getvin("note_rp")
|
||||||
@@ -127,7 +148,7 @@ class Scraper:
|
|||||||
|
|
||||||
return request
|
return request
|
||||||
|
|
||||||
def getsoup(self, subdir: str = "", use_cache: bool = True) -> BeautifulSoup:
|
def getsoup(self, subdir: str, use_cache: bool = True) -> BeautifulSoup:
|
||||||
"""
|
"""
|
||||||
Récupère le contenu HTML d'une page et le transforme en objet BeautifulSoup.
|
Récupère le contenu HTML d'une page et le transforme en objet BeautifulSoup.
|
||||||
|
|
||||||
@@ -155,14 +176,14 @@ class Scraper:
|
|||||||
|
|
||||||
return soup
|
return soup
|
||||||
|
|
||||||
def getjsondata(self, subdir: str = "", id: str = "__NEXT_DATA__") -> ScraperData:
|
def getjsondata(self, subdir: str, id: str = "__NEXT_DATA__") -> _ScraperData:
|
||||||
"""
|
"""
|
||||||
Extrait les données JSON contenues dans la balise __NEXT_DATA__ du site.
|
Extrait les données JSON contenues dans la balise __NEXT_DATA__ du site.
|
||||||
Beaucoup de sites modernes (Next.js) stockent leur état initial dans
|
Beaucoup de sites modernes (Next.js) stockent leur état initial dans
|
||||||
une balise <script> pour l'hydratation côté client.
|
une balise <script> pour l'hydratation côté client.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
subdir (str, optional): Le chemin de la page.
|
subdir (str): Le chemin de la page.
|
||||||
id (str, optional): L'identifiant de la balise script (par défaut __NEXT_DATA__).
|
id (str, optional): L'identifiant de la balise script (par défaut __NEXT_DATA__).
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
@@ -178,32 +199,17 @@ class Scraper:
|
|||||||
soup: BeautifulSoup = self.getsoup(subdir)
|
soup: BeautifulSoup = self.getsoup(subdir)
|
||||||
script: Tag | None = soup.find("script", id=id)
|
script: Tag | None = soup.find("script", id=id)
|
||||||
|
|
||||||
if isinstance(script, Tag) and script.string:
|
if script is None or not script.string:
|
||||||
try:
|
raise ValueError(f"le script id={id} est introuvable")
|
||||||
current_data: object = loads(script.string)
|
|
||||||
# tout le chemin à parcourir pour arriver au données
|
current_data: object = cast(object, loads(script.string))
|
||||||
# (plein d'information inutile)
|
|
||||||
keys: list[str] = [
|
for key in ["props", "pageProps"]:
|
||||||
"props",
|
|
||||||
"pageProps",
|
|
||||||
"initialReduxState",
|
|
||||||
"product",
|
|
||||||
"content",
|
|
||||||
]
|
|
||||||
for key in keys:
|
|
||||||
# si current_data est bien un dictionnaire et que la clé
|
|
||||||
# est bien dedans
|
|
||||||
if isinstance(current_data, dict) and key in current_data:
|
if isinstance(current_data, dict) and key in current_data:
|
||||||
current_data: object = current_data[key]
|
current_data = cast(object, current_data[key])
|
||||||
else:
|
continue
|
||||||
raise ValueError(f"Clé manquante dans le JSON : {key}")
|
raise ValueError(f"Clé manquante dans le JSON : {key}")
|
||||||
|
|
||||||
if isinstance(current_data, dict):
|
return _ScraperData(cast(dict[str, object], current_data))
|
||||||
return ScraperData(data=cast(dict[str, object], current_data))
|
|
||||||
|
|
||||||
except (JSONDecodeError, ValueError) as e:
|
|
||||||
print(f"Erreur lors de l'extraction JSON : {e}", file=stderr)
|
|
||||||
return ScraperData({})
|
|
||||||
|
|
||||||
|
|
||||||
# print(Scraper().getjsondata("bordeaux.html?page=1").getdata())
|
# print(Scraper().getjsondata("bordeaux.html?page=1").getdata())
|
||||||
|
|||||||
@@ -122,7 +122,7 @@ def scraper() -> Scraper:
|
|||||||
|
|
||||||
|
|
||||||
def test_soup(scraper: Scraper):
|
def test_soup(scraper: Scraper):
|
||||||
h1: Tag | None = scraper.getsoup().find("h1")
|
h1: Tag | None = scraper.getsoup("").find("h1")
|
||||||
|
|
||||||
assert isinstance(h1, Tag)
|
assert isinstance(h1, Tag)
|
||||||
assert h1.text == "MILLESIMA"
|
assert h1.text == "MILLESIMA"
|
||||||
|
|||||||
Reference in New Issue
Block a user