Écriture de sélection câblée (PUT cart) + auth par cookie storage_state

Découvert via attache CDP au vrai Chrome (contourne le blocage automation) :
- set_selection = PUT /gw/v1/carts/{week}, body {meals:[{index,quantity}], extras:[]}
  sélection par index de course, params (customer/subscription/sku/cutoff) dérivés
  dynamiquement de /subscriptions + /deliveries (aucun id en dur)
- Recipe.course_index conservé depuis le menu pour le mapping id->index
- get_editable_weeks via /deliveries (modèle Delivery: cutoff, status, editable)
- Token lu depuis le cookie apiV2Auth (storage_state) -> auth sans navigateur, headless OK
- hf_confirm_selection: garde-fou coco + dry_run; tool attach_capture.py ajouté
- Dry-run validé: requête identique à l'appel réel capturé
This commit is contained in:
2026-06-15 22:57:36 +02:00
parent 30b950ec41
commit 051ecb50d8
9 changed files with 388 additions and 69 deletions

View File

@@ -9,3 +9,7 @@ ANTICOCO_PORT=9200
# headless=1 sur le homelab (pas d'écran), headless=0 en local pour le 1er login.
ANTICOCO_HEADLESS=1
# Navigateur : 'chrome' en local (le Chromium bundlé peut être bloqué/instable),
# laisser VIDE sur le homelab (l'image Docker fournit le chromium Playwright).
ANTICOCO_BROWSER_CHANNEL=

2
.gitignore vendored
View File

@@ -4,6 +4,8 @@
.session/
# config/endpoints.json EST versionné : pas de secret, fruit de la discovery.
# Brouillon de discovery (transitoire) :
config/endpoints_discovered.json
# Python
__pycache__/

View File

@@ -1,5 +1,5 @@
{
"_comment": "Endpoints HelloFresh FR confirmés via discovery + tests (2026-06). L'API interne gw/ peut changer ; rejouer tools/discover_api.py si besoin. set_selection reste à découvrir sur un compte avec abonnement actif.",
"_comment": "Endpoints HelloFresh FR confirmés via discovery + tests (2026-06). API interne gw/ non publique, peut changer. Écriture = PUT du 'cart' hebdo, sélection par index de course.",
"base": "https://www.hellofresh.fr/gw",
"country": "FR",
"locale": "fr-FR",
@@ -7,6 +7,8 @@
"menu": "https://www.hellofresh.fr/gw/menus-service/menus",
"recipe_details": "https://www.hellofresh.fr/gw/recipes/recipes",
"weeks": "https://www.hellofresh.fr/gw/api/customers/me/deliveries",
"set_selection": "",
"set_selection_method": "PUT"
"subscriptions": "https://www.hellofresh.fr/gw/api/customers/me/subscriptions",
"set_selection": "https://www.hellofresh.fr/gw/v1/carts/{week}",
"set_selection_method": "PUT",
"selection_preference": "quick"
}

View File

@@ -20,7 +20,7 @@ from typing import Any
import httpx
from . import auth
from .models import Recipe, Week, _first
from .models import Delivery, Recipe, _first
ENDPOINTS_PATH = auth.ROOT / "config" / "endpoints.json"
BATCH_SIZE = 80 # nb max d'ids par appel détails (le menu en compte ~85)
@@ -112,8 +112,7 @@ class HelloFreshClient:
return resp
# --- API métier ---------------------------------------------------------
def get_editable_weeks(self) -> list[Week]:
"""Semaines de livraison de l'abonnement (vide si pas d'abonnement actif)."""
def _deliveries(self) -> list[Delivery]:
params = {
"country": self._country,
"locale": self._locale,
@@ -122,31 +121,64 @@ class HelloFreshClient:
}
data = self._request("GET", self._ep["weeks"], params=params).json()
items = data.get("items", []) if isinstance(data, dict) else (data or [])
return [Week.from_api(w) for w in items]
return [Delivery.from_api(w) for w in items]
def get_menu(self, week: str | None = None) -> list[Recipe]:
"""Recettes proposées pour une semaine (défaut : semaine courante), complètes.
def get_editable_weeks(self) -> list[Delivery]:
"""Semaines de livraison encore modifiables (vide si pas d'abonnement actif)."""
return [d for d in self._deliveries() if d.editable]
Deux appels : le menu (ids) puis le batch de détails (ingrédients/allergènes).
"""
week = week or current_week()
menu_params = {
def _subscription_info(self) -> dict[str, Any]:
"""Récupère {sub_id, customer_id, sku} depuis l'abonnement (pas de valeurs en dur)."""
ep = self._ep.get("subscriptions")
if not ep:
raise EndpointsNotConfigured("Endpoint 'subscriptions' manquant.")
data = self._request("GET", ep, params={"country": self._country}).json()
if isinstance(data, dict) and "id" in data:
sub = data
else:
items = data.get("items", []) if isinstance(data, dict) else (data or [])
if not items:
raise RuntimeError("Aucun abonnement actif sur ce compte.")
sub = items[0]
cust = sub.get("customer") or {}
prod = sub.get("product") or {}
return {"sub_id": str(sub.get("id", "")),
"customer_id": str(cust.get("id", "")),
"sku": str(prod.get("sku", ""))}
def _menu_courses(self, week: str) -> list[dict]:
"""Courses bruts du menu (chacun : index + recipe). Base de l'écriture."""
params = {
"country": self._country,
"locale": self._locale,
"product": self._product,
"weeks": week,
}
menu = self._request("GET", self._ep["menu"], params=menu_params).json()
menu = self._request("GET", self._ep["menu"], params=params).json()
items = menu.get("items", []) if isinstance(menu, dict) else []
if not items:
return items[0].get("courses", []) if items else []
def get_menu(self, week: str | None = None) -> list[Recipe]:
"""Recettes proposées pour une semaine (défaut : courante), complètes.
Deux appels : menu (ids + index de course) puis batch de détails
(ingrédients/allergènes). L'`index` de course est conservé pour l'écriture.
"""
week = week or current_week()
courses = self._menu_courses(week)
if not courses:
return []
courses = items[0].get("courses", [])
id_index: dict[str, int] = {}
ids: list[str] = []
for c in courses:
rid = (c.get("recipe") or {}).get("id")
if rid and rid not in ids:
if rid and rid not in id_index:
id_index[rid] = c.get("index")
ids.append(rid)
return _dedupe_by_name(self._fetch_details(ids))
recipes = self._fetch_details(ids)
for r in recipes:
r.course_index = id_index.get(r.id)
return _dedupe_by_name(recipes)
def _fetch_details(self, ids: list[str]) -> list[Recipe]:
"""Récupère les recettes complètes par batch d'ids."""
@@ -164,22 +196,59 @@ class HelloFreshClient:
recipes.extend(Recipe.from_api(r) for r in raw)
return recipes
def set_selection(self, week: str, recipe_ids: list[str]) -> dict[str, Any]:
"""ÉCRIT la sélection de recettes pour une semaine (après confirmation).
def selection_indices(self, week: str, recipe_ids: list[str]) -> tuple[list[int], list[str]]:
"""Mappe des ids de recette vers leurs index de course pour la semaine.
Endpoint à découvrir sur un compte avec abonnement actif (cf. set_selection vide).
Renvoie (indices_trouvés, ids_inconnus).
"""
url = self._ep.get("set_selection") or ""
id_index = {(c.get("recipe") or {}).get("id"): c.get("index")
for c in self._menu_courses(week)}
indices, unknown = [], []
for rid in recipe_ids:
idx = id_index.get(rid)
if idx is None:
unknown.append(rid)
else:
indices.append(idx)
return indices, unknown
def set_selection(self, week: str, recipe_ids: list[str], dry_run: bool = False) -> dict[str, Any]:
"""ÉCRIT la sélection de recettes pour une semaine (PUT du cart hebdo).
La sélection se fait par `index` de course (mappé depuis les ids de recette).
`dry_run=True` construit la requête sans l'envoyer (pour vérification).
"""
url = (self._ep.get("set_selection") or "").replace("{week}", str(week))
if not url:
raise EndpointsNotConfigured(
"Endpoint 'set_selection' inconnu : à capturer via discover_api.py sur un "
"compte avec box active (modifier une recette pour observer l'appel PUT/POST)."
)
url = url.replace("{week}", str(week))
raise EndpointsNotConfigured("Endpoint 'set_selection' non configuré.")
indices, unknown = self.selection_indices(week, recipe_ids)
if unknown:
raise ValueError(f"Recette(s) absente(s) du menu {week}: {unknown}")
sub = self._subscription_info()
deliv = next((d for d in self._deliveries() if d.week == week), None)
if deliv and not deliv.editable:
raise ValueError(f"Semaine {week} non modifiable (statut {deliv.status}).")
params = {
"customer": sub["customer_id"],
"subscription": sub["sub_id"],
"product-sku": sub["sku"],
"week": week,
"cutoff_time": deliv.cutoff_date if deliv else "",
"update_quantity": "true",
"ignore_addons": "false",
"preference": self._ep.get("selection_preference", "quick"),
}
body = {"meals": [{"index": i, "quantity": 1} for i in indices], "extras": []}
if dry_run:
return {"dry_run": True, "url": url, "params": params, "body": body}
method = self._ep.get("set_selection_method", "PUT").upper()
payload = {"week": week, "recipes": [{"id": rid, "quantity": 1} for rid in recipe_ids]}
resp = self._request(method, url, json=payload)
resp = self._request(method, url, params=params, json=body)
try:
return resp.json()
return {"ok": True, "status": resp.status_code, "response": resp.json()}
except Exception:
return {"status": resp.status_code, "ok": True}
return {"ok": True, "status": resp.status_code}

View File

@@ -13,9 +13,11 @@ Pattern persistant repris d'`Automood/scraper.py` (`launch_persistent_context`).
from __future__ import annotations
import base64
import json
import os
import time
import urllib.parse
from pathlib import Path
from playwright.sync_api import sync_playwright
@@ -24,6 +26,7 @@ ROOT = Path(__file__).resolve().parent.parent
SESSION_DIR = ROOT / ".session"
PROFILE_DIR = SESSION_DIR / "profile"
TOKEN_CACHE = SESSION_DIR / "token.json"
STATE_PATH = SESSION_DIR / "storage_state.json"
BASE_URL = "https://www.hellofresh.fr"
# Page qui déclenche des appels gateway authentifiés (menu de la semaine).
@@ -38,6 +41,16 @@ def _headless() -> bool:
return os.environ.get("ANTICOCO_HEADLESS", "1") not in ("0", "false", "False", "")
def _channel() -> str | None:
"""Canal navigateur : 'chrome' en local (évite les blocages du Chromium bundlé),
vide/None sur le homelab headless (image Docker = chromium Playwright).
Défini par ANTICOCO_BROWSER_CHANNEL ('chrome', 'msedge', ou '' pour chromium).
"""
ch = os.environ.get("ANTICOCO_BROWSER_CHANNEL", "")
return ch or None
def _is_gateway_request(url: str) -> bool:
return ("/gw/" in url or url.startswith("https://gw.")) and "hellofresh" in url
@@ -75,12 +88,16 @@ def _auto_login(page) -> bool:
def _open_context(pw):
return pw.chromium.launch_persistent_context(
kwargs = dict(
user_data_dir=str(PROFILE_DIR),
headless=_headless(),
locale="fr-FR",
viewport={"width": 1280, "height": 900},
)
channel = _channel()
if channel:
kwargs["channel"] = channel
return pw.chromium.launch_persistent_context(**kwargs)
def ensure_logged_in() -> bool:
@@ -185,16 +202,52 @@ def _read_token_cache() -> dict | None:
return None
def get_token(force: bool = False) -> str:
"""Renvoie un bearer token valide.
def _jwt_exp(token: str) -> float:
"""Renvoie le timestamp d'expiration d'un JWT (0 si indéterminable)."""
try:
payload = token.split(".")[1]
payload += "=" * (-len(payload) % 4) # padding base64url
data = json.loads(base64.urlsafe_b64decode(payload))
return float(data.get("exp", 0))
except Exception:
return 0.0
Si un token en cache est encore frais, on l'utilise sans ouvrir de navigateur.
Sinon on s'assure d'être connecté puis on en capture un neuf.
def token_from_storage_state() -> str | None:
"""Extrait l'access_token du cookie `apiV2Auth` de .session/storage_state.json.
Permet de s'authentifier SANS navigateur (utile en headless sur le homelab),
tant que la session synchronisée n'a pas expiré. Renvoie None si absent/expiré.
"""
if not STATE_PATH.exists():
return None
try:
state = json.loads(STATE_PATH.read_text(encoding="utf-8"))
for c in state.get("cookies", []):
if c.get("name") == "apiV2Auth":
data = json.loads(urllib.parse.unquote(c["value"]))
tok = data.get("access_token")
if tok and _jwt_exp(tok) > time.time() + 60:
return tok
except Exception:
return None
return None
def get_token(force: bool = False) -> str:
"""Renvoie un bearer token valide, par ordre de préférence :
1. token en cache encore frais ;
2. cookie `apiV2Auth` de la session synchronisée (sans navigateur) ;
3. capture via navigateur (login si besoin) — impossible en headless si session morte.
"""
if not force:
cached = _read_token_cache()
if cached and (time.time() - cached.get("captured_at", 0)) < TOKEN_TTL_S:
return cached["token"]
tok = token_from_storage_state()
if tok:
return tok
ensure_logged_in()
return capture_token(force=force)["token"]
@@ -205,6 +258,8 @@ def auth_status() -> dict:
if cached and (time.time() - cached.get("captured_at", 0)) < TOKEN_TTL_S:
age = int(time.time() - cached["captured_at"])
return {"logged_in": True, "source": "cache", "token_age_s": age, "gateways": cached.get("gateways", [])}
if token_from_storage_state():
return {"logged_in": True, "source": "storage_state"}
try:
ok = ensure_logged_in()
return {"logged_in": bool(ok), "source": "browser"}

View File

@@ -30,6 +30,7 @@ class Recipe:
tags: list[str] = field(default_factory=list)
image_url: str = ""
prep_time: str = ""
course_index: int | None = None # index du course dans le menu (sert à l'écriture)
# Champs calculés par le filtre (remplis plus tard)
contains_excluded: bool = False
matched_excludes: list[str] = field(default_factory=list)
@@ -100,6 +101,31 @@ class Recipe:
}
@dataclass
class Delivery:
"""Semaine de livraison de l'abonnement (issue de /deliveries)."""
week: str
delivery_date: str = ""
cutoff_date: str = ""
status: str = ""
editable: bool = False
@classmethod
def from_api(cls, raw: dict[str, Any]) -> "Delivery":
actionable = bool(_first(raw, "actionable", "isEditable", default=False))
status = str(_first(raw, "status", default=""))
return cls(
week=str(_first(raw, "id", "week", "handle", default="")),
delivery_date=str(_first(raw, "deliveryDate", "date", default="")),
cutoff_date=str(_first(raw, "cutoffDate", "cutoff", default="")),
status=status,
editable=actionable and status.upper() not in ("SKIPPED", "DELIVERED", "CUTOFF"),
)
def to_dict(self) -> dict[str, Any]:
return asdict(self)
@dataclass
class Week:
id: str # handle de semaine, ex. "2026-W25"

View File

@@ -44,11 +44,11 @@ def hf_login() -> dict:
@mcp.tool()
def hf_list_weeks() -> list[dict]:
"""Liste les semaines de l'abonnement encore modifiables (handle + date livraison)."""
"""Liste les semaines de l'abonnement encore modifiables (handle + dates)."""
with api.HelloFreshClient() as client:
weeks = client.get_editable_weeks()
return [{"week": w.id, "delivery_date": w.delivery_date, "editable": w.editable,
"max_selectable": w.max_selectable} for w in weeks]
return [{"week": d.week, "delivery_date": d.delivery_date, "cutoff_date": d.cutoff_date,
"status": d.status, "editable": d.editable} for d in weeks]
@mcp.tool()
@@ -90,10 +90,11 @@ def hf_propose(week: str = "", count: int = 0) -> dict:
@mcp.tool()
def hf_confirm_selection(week: str, recipe_ids: list[str]) -> dict:
def hf_confirm_selection(week: str, recipe_ids: list[str], dry_run: bool = False) -> dict:
"""ÉCRIT la sélection de recettes dans la box de la semaine (après confirmation).
Garde-fou : refuse une recette contenant un ingrédient exclu.
Garde-fou : refuse toute recette contenant un ingrédient exclu (coco !).
`dry_run=True` : construit et renvoie la requête sans l'envoyer (vérification).
"""
with api.HelloFreshClient() as client:
menu = client.get_menu(week)
@@ -110,8 +111,9 @@ def hf_confirm_selection(week: str, recipe_ids: list[str]) -> dict:
if unknown:
return {"ok": False, "error": "Recette(s) inconnue(s) pour cette semaine.",
"unknown_ids": unknown}
result = client.set_selection(week, recipe_ids)
return {"ok": True, "week": week, "selected": recipe_ids, "api_response": result}
result = client.set_selection(week, recipe_ids, dry_run=dry_run)
return {"ok": True, "week": week, "selected": recipe_ids, "dry_run": dry_run,
"api_response": result}
@mcp.tool()

132
tools/attach_capture.py Normal file
View File

@@ -0,0 +1,132 @@
"""Attache Playwright à TON Chrome (via CDP) au lieu de lancer un navigateur piloté.
Pourquoi : un navigateur lancé par Playwright porte des drapeaux d'automatisation que
HelloFresh peut rejeter au login. Ici, c'est TOI qui lances Chrome (session normale, le
login marche), et on s'y attache en lecture pour :
- capturer le trafic gateway (dont l'appel d'écriture de sélection de recettes) ;
- exporter ta session (cookies) -> .session/storage_state.json, réutilisable en headless.
Prérequis — lance Chrome avec le port de debug et un PROFIL DÉDIÉ (Chrome 149 interdit le
debug sur le profil par défaut). Ta fenêtre Chrome habituelle peut rester ouverte :
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome" \\
--remote-debugging-port=9222 --user-data-dir="$HOME/.hf-chrome-debug" \\
https://www.hellofresh.fr/my-account/deliveries/menu
Puis, dans cette fenêtre : connecte-toi (email + mot de passe) et change une recette.
Usage :
python tools/attach_capture.py # attend ~5 min puis sauvegarde
ANTICOCO_DISCOVER_WAIT=600 python tools/attach_capture.py
"""
from __future__ import annotations
import json
import os
import sys
import time
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT))
from playwright.sync_api import sync_playwright # noqa: E402
from hellofresh import auth # noqa: E402
CDP_URL = os.environ.get("ANTICOCO_CDP_URL", "http://localhost:9222")
LOG_PATH = auth.SESSION_DIR / "discovery_log_cdp.json"
STATE_PATH = auth.SESSION_DIR / "storage_state.json"
DISCOVERED = ROOT / "config" / "endpoints_discovered.json"
def _connect(pw, timeout_s: int = 60):
"""Connexion CDP avec attente que Chrome (port debug) soit disponible."""
deadline = time.time() + timeout_s
last = None
while time.time() < deadline:
try:
return pw.chromium.connect_over_cdp(CDP_URL)
except Exception as e: # Chrome pas encore prêt
last = e
print(f" ...en attente de Chrome sur {CDP_URL}")
time.sleep(3)
raise RuntimeError(f"Impossible de se connecter à {CDP_URL} : {last}")
def main() -> None:
auth.SESSION_DIR.mkdir(parents=True, exist_ok=True)
requests_seen: list[dict] = []
def on_request(req):
if not auth._is_gateway_request(req.url):
return
entry = {"method": req.method, "url": req.url,
"has_auth": bool(req.headers.get("authorization"))}
if req.method in ("POST", "PUT", "PATCH"):
try:
entry["post_data"] = req.post_data
except Exception:
entry["post_data"] = None
requests_seen.append(entry)
print(f" [{req.method}] {req.url}")
with sync_playwright() as pw:
print(f"Connexion à Chrome via {CDP_URL} ...")
browser = _connect(pw)
contexts = browser.contexts
print(f"Connecté. {len(contexts)} contexte(s).")
def wire_page(page):
try:
page.on("request", on_request)
except Exception:
pass
for ctx in contexts:
for p in ctx.pages:
wire_page(p)
ctx.on("page", wire_page)
wait_s = int(os.environ.get("ANTICOCO_DISCOVER_WAIT", "300"))
print(f">>> Connecte-toi et change une recette. Capture {wait_s}s max "
"(Ctrl-C pour finir plus tôt).")
try:
waited = 0
while waited < wait_s and browser.is_connected():
time.sleep(2)
waited += 2
except KeyboardInterrupt:
pass
# Export de la session (cookies) tant que la connexion tient.
try:
if browser.contexts:
browser.contexts[0].storage_state(path=str(STATE_PATH))
print(f"Session exportée -> {STATE_PATH}")
except Exception as e:
print("Export storage_state impossible:", e)
_save_report(requests_seen)
def _save_report(requests_seen: list[dict]) -> None:
LOG_PATH.write_text(json.dumps(requests_seen, indent=2, ensure_ascii=False), encoding="utf-8")
print(f"\n{len(requests_seen)} requêtes gateway -> {LOG_PATH}")
noise = ("otlp/traces", "/login", "auth/email", "translations", "/bot/")
writes = [r for r in requests_seen
if r["method"] in ("POST", "PUT", "PATCH")
and not any(n in r["url"] for n in noise)]
skeleton = {
"_comment": "Brouillon issu de attach_capture (ne remplace pas config/endpoints.json).",
"set_selection_candidates": [f"[{r['method']}] {r['url']}" for r in writes],
}
DISCOVERED.write_text(json.dumps(skeleton, indent=2, ensure_ascii=False), encoding="utf-8")
print(f"Candidats d'écriture ({len(writes)}) -> {DISCOVERED}")
for r in writes:
print(f" [{r['method']}] {r['url']}")
if __name__ == "__main__":
main()

View File

@@ -43,12 +43,17 @@ def main() -> None:
requests_seen: list[dict] = []
with sync_playwright() as pw:
ctx = pw.chromium.launch_persistent_context(
kwargs = dict(
user_data_dir=str(auth.PROFILE_DIR),
headless=False, # toujours visible : c'est une étape interactive
locale="fr-FR",
viewport={"width": 1280, "height": 900},
)
channel = auth._channel()
if channel:
kwargs["channel"] = channel
print(f"Navigateur : {channel}")
ctx = pw.chromium.launch_persistent_context(**kwargs)
page = ctx.pages[0] if ctx.pages else ctx.new_page()
def on_request(req):
@@ -67,49 +72,71 @@ def main() -> None:
requests_seen.append(entry)
print(f" [{req.method}] {req.url}")
closed = {"v": False}
ctx.on("close", lambda *_: closed.update(v=True))
page.on("close", lambda *_: closed.update(v=True))
page.on("request", on_request)
print("Ouvre le menu de la semaine, change une recette si tu veux capturer l'écriture.")
page.goto(auth.BASE_URL + "/my-account", wait_until="domcontentloaded", timeout=30000)
try:
page.goto(auth.BASE_URL + "/my-account", wait_until="domcontentloaded", timeout=30000)
except Exception as e:
print("goto initial échoué (on continue quand même):", e)
# Si on a un vrai terminal : on attend Entrée. Sinon (lancé en arrière-plan,
# sans TTY) : on attend une durée fixe pour laisser le temps de se connecter
# et de naviguer, tout en continuant à logger les requêtes.
if sys.stdin and sys.stdin.isatty():
# On capture jusqu'à : Entrée (si TTY), fermeture de la fenêtre, ou fin du délai.
# Le rapport est TOUJOURS écrit (finally) même si la fenêtre est fermée en cours.
try:
if sys.stdin and sys.stdin.isatty():
try:
input("\n>>> Quand tu as fini, appuie sur Entrée (ou ferme la fenêtre)...\n")
except (EOFError, KeyboardInterrupt):
pass
else:
wait_s = int(os.environ.get("ANTICOCO_DISCOVER_WAIT", "240"))
print(f"\n>>> Capture pendant {wait_s}s max. Connecte-toi (email + mot de passe, "
"PAS Google), ouvre le menu, change une recette. Ferme la fenêtre quand fini.")
waited = 0
while waited < wait_s and not closed["v"]:
try:
page.wait_for_timeout(2000)
except Exception:
break # page/contexte fermé
waited += 2
finally:
_save_report(requests_seen)
try:
input("\n>>> Quand tu as fini de naviguer, appuie sur Entrée pour générer le rapport...\n")
except (EOFError, KeyboardInterrupt):
if not closed["v"]:
ctx.close()
except Exception:
pass
else:
wait_s = int(os.environ.get("ANTICOCO_DISCOVER_WAIT", "240"))
print(f"\n>>> Pas de terminal interactif : capture pendant {wait_s}s. "
"Connecte-toi et navigue dans la fenêtre ouverte...")
page.wait_for_timeout(wait_s * 1000)
ctx.close()
def _save_report(requests_seen: list[dict]) -> None:
"""Écrit le log complet + un squelette d'endpoints découverts (sans écraser le vrai)."""
LOG_PATH.write_text(json.dumps(requests_seen, indent=2, ensure_ascii=False), encoding="utf-8")
print(f"\n{len(requests_seen)} requêtes gateway enregistrées dans {LOG_PATH}")
# Heuristiques pour pré-remplir le squelette d'endpoints.
def find(method: str, *needles: str) -> str:
for r in requests_seen:
if r["method"] != method:
continue
if all(n in r["url"].lower() for n in needles):
if r["method"] == method and all(n in r["url"].lower() for n in needles):
return r["url"]
return ""
noise = ("otlp/traces", "/login", "auth/email", "translations")
writes = [r for r in requests_seen
if r["method"] in ("POST", "PUT", "PATCH")
and not any(n in r["url"] for n in noise)]
discovered = ENDPOINTS_PATH.parent / "endpoints_discovered.json"
skeleton = {
"_comment": "Endpoints HelloFresh confirmés via discovery. Compléter/valider à la main. Utiliser {week} comme placeholder pour le handle de semaine.",
"base": "",
"weeks": find("GET", "subscription") or find("GET", "deliveries") or "",
"menu": find("GET", "menu") or find("GET", "courses") or "",
"set_selection": find("PUT", "menu") or find("POST", "menu") or "",
"_comment": "Brouillon issu de la dernière discovery (ne remplace pas config/endpoints.json).",
"menu": find("GET", "menus-service") or find("GET", "menu"),
"weeks": find("GET", "deliveries") or find("GET", "subscriptions"),
"set_selection_candidates": [f"[{r['method']}] {r['url']}" for r in writes],
}
ENDPOINTS_PATH.parent.mkdir(parents=True, exist_ok=True)
ENDPOINTS_PATH.write_text(json.dumps(skeleton, indent=2, ensure_ascii=False), encoding="utf-8")
print(f"Squelette d'endpoints écrit dans {ENDPOINTS_PATH} — à vérifier avant usage.")
discovered.parent.mkdir(parents=True, exist_ok=True)
discovered.write_text(json.dumps(skeleton, indent=2, ensure_ascii=False), encoding="utf-8")
print(f"Candidats d'écriture ({len(writes)}) -> {discovered}")
if __name__ == "__main__":