AntiCoco: serveur MCP HelloFresh sans noix de coco
- Auth Playwright (login local, session persistee, capture du bearer token) - Client httpx vers l'API interne (endpoints via discover_api.py) - Filtre d'exclusion insensible aux accents (coco & co) - Serveur FastMCP (streamable-http) + outils hf_* - Docker + compose pour deploiement homelab
This commit is contained in:
6
hellofresh/__init__.py
Normal file
6
hellofresh/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
"""Accès personnel au compte HelloFresh (région France).
|
||||
|
||||
L'API interne `gw/` de hellofresh.fr n'est pas une API publique documentée et peut
|
||||
changer ; usage strictement personnel. Les endpoints réels sont découverts via
|
||||
`tools/discover_api.py` (capture du trafic d'une session connectée).
|
||||
"""
|
||||
150
hellofresh/api.py
Normal file
150
hellofresh/api.py
Normal file
@@ -0,0 +1,150 @@
|
||||
"""Client httpx vers l'API interne HelloFresh.
|
||||
|
||||
Les URLs réelles ne sont pas codées en dur : elles viennent de `config/endpoints.json`,
|
||||
généré/validé via `tools/discover_api.py`. Tant que ce fichier n'est pas rempli, les
|
||||
appels lèvent une erreur explicite.
|
||||
|
||||
Le token bearer est fourni par `auth.get_token()`. Les réponses (forme variable) sont
|
||||
mappées vers `models.Recipe` / `models.Week` de façon tolérante.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
|
||||
from . import auth
|
||||
from .models import Recipe, Week, _first
|
||||
|
||||
ENDPOINTS_PATH = auth.ROOT / "config" / "endpoints.json"
|
||||
|
||||
DEFAULT_HEADERS = {
|
||||
"Accept": "application/json",
|
||||
"Content-Type": "application/json",
|
||||
"Origin": auth.BASE_URL,
|
||||
"Referer": auth.BASE_URL + "/",
|
||||
"User-Agent": (
|
||||
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 "
|
||||
"(KHTML, like Gecko) Chrome/124.0 Safari/537.36"
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
class EndpointsNotConfigured(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
def _load_endpoints() -> dict:
|
||||
if not ENDPOINTS_PATH.exists():
|
||||
raise EndpointsNotConfigured(
|
||||
f"{ENDPOINTS_PATH} absent. Lancer `python tools/discover_api.py` pour le générer."
|
||||
)
|
||||
data = json.loads(ENDPOINTS_PATH.read_text(encoding="utf-8"))
|
||||
missing = [k for k in ("weeks", "menu", "set_selection") if not data.get(k)]
|
||||
if missing:
|
||||
raise EndpointsNotConfigured(
|
||||
f"Endpoints manquants dans {ENDPOINTS_PATH}: {missing}. "
|
||||
"Compléter via tools/discover_api.py."
|
||||
)
|
||||
return data
|
||||
|
||||
|
||||
class HelloFreshClient:
|
||||
def __init__(self, token: str | None = None):
|
||||
self._endpoints = _load_endpoints()
|
||||
self._token = token or auth.get_token()
|
||||
self._client = httpx.Client(
|
||||
headers={**DEFAULT_HEADERS, "Authorization": f"Bearer {self._token}"},
|
||||
timeout=30.0,
|
||||
follow_redirects=True,
|
||||
)
|
||||
|
||||
def close(self) -> None:
|
||||
self._client.close()
|
||||
|
||||
def __enter__(self) -> "HelloFreshClient":
|
||||
return self
|
||||
|
||||
def __exit__(self, *exc) -> None:
|
||||
self.close()
|
||||
|
||||
# --- requêtes bas niveau avec re-auth transparent sur 401 ---------------
|
||||
def _request(self, method: str, url: str, **kwargs) -> httpx.Response:
|
||||
resp = self._client.request(method, url, **kwargs)
|
||||
if resp.status_code == 401:
|
||||
# Token expiré → on en capture un neuf et on rejoue une fois.
|
||||
self._token = auth.get_token(force=True)
|
||||
self._client.headers["Authorization"] = f"Bearer {self._token}"
|
||||
resp = self._client.request(method, url, **kwargs)
|
||||
resp.raise_for_status()
|
||||
return resp
|
||||
|
||||
# --- API métier ---------------------------------------------------------
|
||||
def get_editable_weeks(self) -> list[Week]:
|
||||
"""Liste les semaines de l'abonnement encore modifiables."""
|
||||
resp = self._request("GET", self._endpoints["weeks"])
|
||||
data = resp.json()
|
||||
raw_weeks = _extract_list(data, "weeks", "deliveries", "items", "data")
|
||||
weeks = [Week.from_api(w) for w in raw_weeks]
|
||||
editable = [w for w in weeks if w.editable] or weeks
|
||||
return editable
|
||||
|
||||
def get_menu(self, week: str) -> list[Recipe]:
|
||||
"""Recettes proposées pour une semaine donnée."""
|
||||
url = self._endpoints["menu"].replace("{week}", str(week))
|
||||
resp = self._request("GET", url, params=None if "{week}" in self._endpoints["menu"] else {"week": week})
|
||||
data = resp.json()
|
||||
raw_recipes = _extract_recipes(data)
|
||||
return [Recipe.from_api(r) for r in raw_recipes]
|
||||
|
||||
def set_selection(self, week: str, recipe_ids: list[str]) -> dict[str, Any]:
|
||||
"""Enregistre la sélection de recettes pour une semaine (écriture).
|
||||
|
||||
N'est appelé qu'après confirmation côté serveur MCP. La forme du payload
|
||||
dépend de l'endpoint découvert ; on envoie une structure courante, à ajuster
|
||||
selon discover_api.py si nécessaire.
|
||||
"""
|
||||
url = self._endpoints["set_selection"].replace("{week}", str(week))
|
||||
method = self._endpoints.get("set_selection_method", "PUT").upper()
|
||||
payload = {
|
||||
"week": week,
|
||||
"recipes": [{"id": rid, "quantity": 1} for rid in recipe_ids],
|
||||
}
|
||||
resp = self._request(method, url, json=payload)
|
||||
try:
|
||||
return resp.json()
|
||||
except Exception:
|
||||
return {"status": resp.status_code, "ok": True}
|
||||
|
||||
|
||||
def _extract_list(data: Any, *keys: str) -> list[dict]:
|
||||
if isinstance(data, list):
|
||||
return data
|
||||
if isinstance(data, dict):
|
||||
found = _first(data, *keys, default=None)
|
||||
if isinstance(found, list):
|
||||
return found
|
||||
# Parfois imbriqué sous "data"/"items"
|
||||
for v in data.values():
|
||||
if isinstance(v, list):
|
||||
return v
|
||||
return []
|
||||
|
||||
|
||||
def _extract_recipes(data: Any) -> list[dict]:
|
||||
"""Extrait la liste de recettes, qui peut être imbriquée dans des 'courses'."""
|
||||
if isinstance(data, dict):
|
||||
courses = _first(data, "courses", "modules", "items", default=None)
|
||||
if isinstance(courses, list):
|
||||
recipes = []
|
||||
for c in courses:
|
||||
if isinstance(c, dict) and "recipe" in c and isinstance(c["recipe"], dict):
|
||||
recipes.append(c["recipe"])
|
||||
elif isinstance(c, dict):
|
||||
recipes.append(c)
|
||||
if recipes:
|
||||
return recipes
|
||||
return _extract_list(data, "recipes", "items", "data")
|
||||
204
hellofresh/auth.py
Normal file
204
hellofresh/auth.py
Normal file
@@ -0,0 +1,204 @@
|
||||
"""Authentification HelloFresh via Playwright + capture du bearer token.
|
||||
|
||||
Stratégie (cf. plan) :
|
||||
- **Login local d'abord** : sur le Mac, fenêtre visible (`headless=False`) → l'utilisateur
|
||||
se connecte (captcha/2FA gérés à la main). La session est persistée dans `.session/profile`.
|
||||
- **Homelab** : `headless=True`, réutilise la session synchronisée. `HF_EMAIL`/`HF_PASSWORD`
|
||||
servent uniquement de fallback de re-login auto si la session a expiré.
|
||||
- **Token** : on intercepte l'en-tête `Authorization: Bearer …` envoyé aux hôtes gateway et on
|
||||
le met en cache dans `.session/token.json`. `api.py` le réutilise pour les appels httpx.
|
||||
|
||||
Pattern persistant repris d'`Automood/scraper.py` (`launch_persistent_context`).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
from playwright.sync_api import sync_playwright
|
||||
|
||||
ROOT = Path(__file__).resolve().parent.parent
|
||||
SESSION_DIR = ROOT / ".session"
|
||||
PROFILE_DIR = SESSION_DIR / "profile"
|
||||
TOKEN_CACHE = SESSION_DIR / "token.json"
|
||||
|
||||
BASE_URL = "https://www.hellofresh.fr"
|
||||
# Page qui déclenche des appels gateway authentifiés (menu de la semaine).
|
||||
MENU_PAGE = f"{BASE_URL}/my-menu"
|
||||
ACCOUNT_PAGE = f"{BASE_URL}/my-account"
|
||||
|
||||
ATTENTE_LOGIN_S = 180 # temps laissé pour un login manuel (captcha / 2FA)
|
||||
TOKEN_TTL_S = 30 * 60 # on rafraîchit le token au-delà de 30 min par prudence
|
||||
|
||||
|
||||
def _headless() -> bool:
|
||||
return os.environ.get("ANTICOCO_HEADLESS", "1") not in ("0", "false", "False", "")
|
||||
|
||||
|
||||
def _is_gateway_request(url: str) -> bool:
|
||||
return ("/gw/" in url or url.startswith("https://gw.")) and "hellofresh" in url
|
||||
|
||||
|
||||
def _is_logged_in(page) -> bool:
|
||||
"""Pas connecté = un champ mot de passe est visible (page de login).
|
||||
|
||||
Détection volontairement indépendante de la locale (sélecteur CSS, pas de texte).
|
||||
"""
|
||||
try:
|
||||
return page.locator('input[type="password"]').count() == 0
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def _auto_login(page) -> bool:
|
||||
"""Tente un login automatique avec HF_EMAIL/HF_PASSWORD. Best-effort.
|
||||
|
||||
Les sélecteurs exacts du formulaire HelloFresh sont à confirmer ; on cible les
|
||||
champs standards. En cas d'échec (captcha, sélecteurs changés), renvoie False et
|
||||
on retombe sur le login manuel.
|
||||
"""
|
||||
email = os.environ.get("HF_EMAIL")
|
||||
password = os.environ.get("HF_PASSWORD")
|
||||
if not email or not password:
|
||||
return False
|
||||
try:
|
||||
page.fill('input[type="email"], input[name="email"], input#email', email, timeout=8000)
|
||||
page.fill('input[type="password"], input[name="password"]', password, timeout=8000)
|
||||
page.click('button[type="submit"], button[data-test-id="login-submit"]', timeout=8000)
|
||||
page.wait_for_timeout(4000)
|
||||
return _is_logged_in(page)
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def _open_context(pw):
|
||||
return pw.chromium.launch_persistent_context(
|
||||
user_data_dir=str(PROFILE_DIR),
|
||||
headless=_headless(),
|
||||
locale="fr-FR",
|
||||
viewport={"width": 1280, "height": 900},
|
||||
)
|
||||
|
||||
|
||||
def ensure_logged_in() -> bool:
|
||||
"""Garantit une session connectée dans le profil persistant.
|
||||
|
||||
- Si déjà connecté : retourne True immédiatement.
|
||||
- Sinon, tente l'auto-login (env) ; à défaut attend un login manuel (fenêtre visible).
|
||||
Retourne True si la session est établie.
|
||||
"""
|
||||
SESSION_DIR.mkdir(parents=True, exist_ok=True)
|
||||
with sync_playwright() as pw:
|
||||
ctx = _open_context(pw)
|
||||
try:
|
||||
page = ctx.pages[0] if ctx.pages else ctx.new_page()
|
||||
page.goto(ACCOUNT_PAGE, wait_until="domcontentloaded", timeout=30000)
|
||||
page.wait_for_timeout(2000)
|
||||
if _is_logged_in(page):
|
||||
return True
|
||||
|
||||
# Fallback 1 : auto-login si identifiants fournis.
|
||||
if _auto_login(page):
|
||||
page.wait_for_timeout(2000)
|
||||
return True
|
||||
|
||||
# Fallback 2 : login manuel (uniquement utile en fenêtre visible).
|
||||
if _headless():
|
||||
raise RuntimeError(
|
||||
"Session HelloFresh expirée et auto-login impossible en headless. "
|
||||
"Refaire le login en local (ANTICOCO_HEADLESS=0) puis re-sync .session/."
|
||||
)
|
||||
debut = time.time()
|
||||
while time.time() - debut < ATTENTE_LOGIN_S:
|
||||
if _is_logged_in(page):
|
||||
page.wait_for_timeout(2000)
|
||||
return True
|
||||
page.wait_for_timeout(2000)
|
||||
return False
|
||||
finally:
|
||||
ctx.close()
|
||||
|
||||
|
||||
def capture_token(force: bool = False) -> dict:
|
||||
"""Capture (ou relit depuis le cache) le bearer token et les hôtes gateway observés.
|
||||
|
||||
Retourne {"token": str, "gateways": [str], "captured_at": float}.
|
||||
"""
|
||||
cached = _read_token_cache()
|
||||
if cached and not force and (time.time() - cached.get("captured_at", 0)) < TOKEN_TTL_S:
|
||||
return cached
|
||||
|
||||
SESSION_DIR.mkdir(parents=True, exist_ok=True)
|
||||
observed = {"token": None, "gateways": set()}
|
||||
|
||||
with sync_playwright() as pw:
|
||||
ctx = _open_context(pw)
|
||||
try:
|
||||
page = ctx.pages[0] if ctx.pages else ctx.new_page()
|
||||
|
||||
def on_request(req):
|
||||
try:
|
||||
if not _is_gateway_request(req.url):
|
||||
return
|
||||
base = req.url.split("/gw/")[0] + "/gw" if "/gw/" in req.url else req.url
|
||||
observed["gateways"].add(base)
|
||||
auth = req.headers.get("authorization") or req.headers.get("Authorization")
|
||||
if auth and auth.lower().startswith("bearer "):
|
||||
observed["token"] = auth.split(" ", 1)[1].strip()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
page.on("request", on_request)
|
||||
page.goto(MENU_PAGE, wait_until="networkidle", timeout=45000)
|
||||
page.wait_for_timeout(3000)
|
||||
|
||||
if not _is_logged_in(page):
|
||||
raise RuntimeError(
|
||||
"Non connecté lors de la capture du token. Lancer ensure_logged_in() d'abord."
|
||||
)
|
||||
if not observed["token"]:
|
||||
raise RuntimeError(
|
||||
"Aucun bearer token capturé. Vérifier MENU_PAGE / le pattern gateway, "
|
||||
"ou rejouer tools/discover_api.py."
|
||||
)
|
||||
|
||||
result = {
|
||||
"token": observed["token"],
|
||||
"gateways": sorted(observed["gateways"]),
|
||||
"captured_at": time.time(),
|
||||
}
|
||||
TOKEN_CACHE.write_text(json.dumps(result, indent=2), encoding="utf-8")
|
||||
return result
|
||||
finally:
|
||||
ctx.close()
|
||||
|
||||
|
||||
def _read_token_cache() -> dict | None:
|
||||
if TOKEN_CACHE.exists():
|
||||
try:
|
||||
return json.loads(TOKEN_CACHE.read_text(encoding="utf-8"))
|
||||
except Exception:
|
||||
return None
|
||||
return None
|
||||
|
||||
|
||||
def get_token(force: bool = False) -> str:
|
||||
"""Renvoie un bearer token valide, en s'assurant d'être connecté au préalable."""
|
||||
ensure_logged_in()
|
||||
return capture_token(force=force)["token"]
|
||||
|
||||
|
||||
def auth_status() -> dict:
|
||||
"""État de connexion sans ouvrir de fenêtre si un token en cache est encore frais."""
|
||||
cached = _read_token_cache()
|
||||
if cached and (time.time() - cached.get("captured_at", 0)) < TOKEN_TTL_S:
|
||||
age = int(time.time() - cached["captured_at"])
|
||||
return {"logged_in": True, "source": "cache", "token_age_s": age, "gateways": cached.get("gateways", [])}
|
||||
try:
|
||||
ok = ensure_logged_in()
|
||||
return {"logged_in": bool(ok), "source": "browser"}
|
||||
except Exception as e:
|
||||
return {"logged_in": False, "error": str(e)}
|
||||
111
hellofresh/filter.py
Normal file
111
hellofresh/filter.py
Normal file
@@ -0,0 +1,111 @@
|
||||
"""Filtrage des recettes : exclusion d'ingrédients (coco !) + scoring par préférences.
|
||||
|
||||
Matching insensible à la casse ET aux accents : « Noix de Coco », « noix de coco rapée »
|
||||
et « creme de coco » matchent tous l'entrée « coco ». On compare des mots normalisés sur
|
||||
le nom des ingrédients, les allergènes, le nom et le titre de la recette.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import unicodedata
|
||||
from pathlib import Path
|
||||
|
||||
from .auth import ROOT
|
||||
from .models import Recipe
|
||||
|
||||
EXCLUDES_PATH = ROOT / "config" / "excludes.json"
|
||||
PREFS_PATH = ROOT / "config" / "prefs.json"
|
||||
|
||||
|
||||
def normalize(s: str) -> str:
|
||||
"""Minuscule + suppression des accents (NFD → drop des diacritiques)."""
|
||||
s = unicodedata.normalize("NFD", s or "")
|
||||
s = "".join(c for c in s if unicodedata.category(c) != "Mn")
|
||||
return s.lower().strip()
|
||||
|
||||
|
||||
# --- gestion de la liste d'exclusion ---------------------------------------
|
||||
def load_excludes() -> list[str]:
|
||||
if not EXCLUDES_PATH.exists():
|
||||
return []
|
||||
data = json.loads(EXCLUDES_PATH.read_text(encoding="utf-8"))
|
||||
return list(data.get("exclude", []))
|
||||
|
||||
|
||||
def save_excludes(terms: list[str]) -> None:
|
||||
existing = {}
|
||||
if EXCLUDES_PATH.exists():
|
||||
existing = json.loads(EXCLUDES_PATH.read_text(encoding="utf-8"))
|
||||
existing["exclude"] = terms
|
||||
EXCLUDES_PATH.write_text(json.dumps(existing, indent=2, ensure_ascii=False), encoding="utf-8")
|
||||
|
||||
|
||||
def add_exclude(term: str) -> list[str]:
|
||||
terms = load_excludes()
|
||||
if normalize(term) not in {normalize(t) for t in terms}:
|
||||
terms.append(term)
|
||||
save_excludes(terms)
|
||||
return terms
|
||||
|
||||
|
||||
def remove_exclude(term: str) -> list[str]:
|
||||
nt = normalize(term)
|
||||
terms = [t for t in load_excludes() if normalize(t) != nt]
|
||||
save_excludes(terms)
|
||||
return terms
|
||||
|
||||
|
||||
def load_prefs() -> dict:
|
||||
if not PREFS_PATH.exists():
|
||||
return {"liked": [], "disliked": []}
|
||||
data = json.loads(PREFS_PATH.read_text(encoding="utf-8"))
|
||||
return {"liked": data.get("liked", []), "disliked": data.get("disliked", [])}
|
||||
|
||||
|
||||
# --- application aux recettes ----------------------------------------------
|
||||
def _recipe_haystack(recipe: Recipe) -> str:
|
||||
parts = [recipe.name, recipe.headline, *recipe.ingredients, *recipe.allergens, *recipe.tags]
|
||||
return normalize(" | ".join(p for p in parts if p))
|
||||
|
||||
|
||||
def mark_excluded(recipe: Recipe, excludes: list[str] | None = None) -> Recipe:
|
||||
"""Remplit `contains_excluded` et `matched_excludes` sur la recette."""
|
||||
excludes = excludes if excludes is not None else load_excludes()
|
||||
hay = _recipe_haystack(recipe)
|
||||
matched = [term for term in excludes if normalize(term) and normalize(term) in hay]
|
||||
recipe.matched_excludes = matched
|
||||
recipe.contains_excluded = bool(matched)
|
||||
return recipe
|
||||
|
||||
|
||||
def score(recipe: Recipe, prefs: dict | None = None) -> float:
|
||||
prefs = prefs or load_prefs()
|
||||
hay = _recipe_haystack(recipe)
|
||||
s = 0.0
|
||||
for kw in prefs.get("liked", []):
|
||||
if normalize(kw) and normalize(kw) in hay:
|
||||
s += 1.0
|
||||
for kw in prefs.get("disliked", []):
|
||||
if normalize(kw) and normalize(kw) in hay:
|
||||
s -= 1.0
|
||||
recipe.score = s
|
||||
return s
|
||||
|
||||
|
||||
def annotate(recipes: list[Recipe]) -> list[Recipe]:
|
||||
"""Marque exclusions + score sur une liste de recettes (in place)."""
|
||||
excludes = load_excludes()
|
||||
prefs = load_prefs()
|
||||
for r in recipes:
|
||||
mark_excluded(r, excludes)
|
||||
score(r, prefs)
|
||||
return recipes
|
||||
|
||||
|
||||
def propose(recipes: list[Recipe], count: int | None = None) -> list[Recipe]:
|
||||
"""Retire les recettes exclues (coco…) et classe le reste par score décroissant."""
|
||||
annotate(recipes)
|
||||
safe = [r for r in recipes if not r.contains_excluded]
|
||||
safe.sort(key=lambda r: r.score, reverse=True)
|
||||
return safe[:count] if count else safe
|
||||
124
hellofresh/models.py
Normal file
124
hellofresh/models.py
Normal file
@@ -0,0 +1,124 @@
|
||||
"""Modèles de données HelloFresh, indépendants de la forme exacte de l'API interne.
|
||||
|
||||
Les dataclasses sont volontairement tolérantes : `Recipe.from_api` mappe au mieux les
|
||||
champs des réponses gateway (qui varient selon les endpoints découverts) vers une forme
|
||||
stable utilisée par le filtre et le serveur MCP.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field, asdict
|
||||
from typing import Any
|
||||
|
||||
|
||||
def _first(d: dict, *keys, default=None):
|
||||
"""Renvoie la première clé présente et non vide parmi `keys`."""
|
||||
for k in keys:
|
||||
v = d.get(k)
|
||||
if v not in (None, "", [], {}):
|
||||
return v
|
||||
return default
|
||||
|
||||
|
||||
@dataclass
|
||||
class Recipe:
|
||||
id: str
|
||||
name: str
|
||||
headline: str = ""
|
||||
ingredients: list[str] = field(default_factory=list)
|
||||
allergens: list[str] = field(default_factory=list)
|
||||
tags: list[str] = field(default_factory=list)
|
||||
image_url: str = ""
|
||||
prep_time: str = ""
|
||||
# Champs calculés par le filtre (remplis plus tard)
|
||||
contains_excluded: bool = False
|
||||
matched_excludes: list[str] = field(default_factory=list)
|
||||
score: float = 0.0
|
||||
|
||||
@classmethod
|
||||
def from_api(cls, raw: dict[str, Any]) -> "Recipe":
|
||||
"""Construit une Recipe depuis un objet recette brut de l'API gateway.
|
||||
|
||||
Les noms de champs HelloFresh varient ; on tente plusieurs alias. À ajuster
|
||||
une fois la forme réelle confirmée via discover_api.py.
|
||||
"""
|
||||
ingredients = []
|
||||
for ing in _first(raw, "ingredients", default=[]) or []:
|
||||
if isinstance(ing, dict):
|
||||
name = _first(ing, "name", "label", "title")
|
||||
if name:
|
||||
ingredients.append(str(name))
|
||||
elif isinstance(ing, str):
|
||||
ingredients.append(ing)
|
||||
|
||||
allergens = []
|
||||
for al in _first(raw, "allergens", default=[]) or []:
|
||||
if isinstance(al, dict):
|
||||
name = _first(al, "name", "label", "title")
|
||||
if name:
|
||||
allergens.append(str(name))
|
||||
elif isinstance(al, str):
|
||||
allergens.append(al)
|
||||
|
||||
tags = []
|
||||
for tg in _first(raw, "tags", "labels", default=[]) or []:
|
||||
if isinstance(tg, dict):
|
||||
name = _first(tg, "name", "label", "text")
|
||||
if name:
|
||||
tags.append(str(name))
|
||||
elif isinstance(tg, str):
|
||||
tags.append(tg)
|
||||
|
||||
image = _first(raw, "imageLink", "image", "imageUrl", "cardLink", default="")
|
||||
if isinstance(image, dict):
|
||||
image = _first(image, "url", "link", default="")
|
||||
|
||||
return cls(
|
||||
id=str(_first(raw, "id", "uuid", "recipeId", default="")),
|
||||
name=str(_first(raw, "name", "title", default="(sans nom)")),
|
||||
headline=str(_first(raw, "headline", "subtitle", "description", default="")),
|
||||
ingredients=ingredients,
|
||||
allergens=allergens,
|
||||
tags=tags,
|
||||
image_url=str(image or ""),
|
||||
prep_time=str(_first(raw, "prepTime", "totalTime", "time", default="")),
|
||||
)
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return asdict(self)
|
||||
|
||||
def summary(self) -> dict[str, Any]:
|
||||
"""Version compacte pour les réponses MCP (moins de tokens)."""
|
||||
return {
|
||||
"id": self.id,
|
||||
"name": self.name,
|
||||
"headline": self.headline,
|
||||
"contains_excluded": self.contains_excluded,
|
||||
"matched_excludes": self.matched_excludes,
|
||||
"score": self.score,
|
||||
"tags": self.tags,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class Week:
|
||||
id: str # handle de semaine, ex. "2026-W25"
|
||||
delivery_date: str = ""
|
||||
editable: bool = False
|
||||
max_selectable: int = 0
|
||||
recipes: list[Recipe] = field(default_factory=list)
|
||||
|
||||
@classmethod
|
||||
def from_api(cls, raw: dict[str, Any], recipes: list[Recipe] | None = None) -> "Week":
|
||||
return cls(
|
||||
id=str(_first(raw, "id", "week", "handle", "yearWeek", default="")),
|
||||
delivery_date=str(_first(raw, "deliveryDate", "date", default="")),
|
||||
editable=bool(_first(raw, "editable", "isEditable", "menuEditable", default=False)),
|
||||
max_selectable=int(_first(raw, "maxSelectable", "numberOfSelections", default=0) or 0),
|
||||
recipes=recipes or [],
|
||||
)
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
d = asdict(self)
|
||||
d["recipes"] = [r.summary() for r in self.recipes]
|
||||
return d
|
||||
Reference in New Issue
Block a user