Deux outils MCP pour qu'Hermes n'ait plus de scripts à écrire :
- hf_next_delivery() : prochaine box RÉELLEMENT sélectionnée (≈4 recettes,
pas le menu complet) + date/cutoff ; erreur stricte si introuvable
(jamais de repli propose). Saute les semaines PAUSED via next_delivery.
- hf_favorites() : recettes favorites du compte. Champ is_favorite ajouté
partout (hf_get_menu inclus).
Endpoints découverts (probe CDP) :
- sélection : GET /gw/my-deliveries/menu -> meals[].selection.quantity>0
- favoris : GET /gw/cfs/v2/favorites/recipe -> items[].object_id
(GET /gw/v1/carts/{week} renvoie 404 : pas la lecture de sélection.)
Images : URLs recettes CloudFront (502) réécrites vers
img.hellofresh.com/.../hellofresh_s3/... (hellofresh/images.py),
appliqué dans Recipe.summary() -> profite à tous les outils.
README : procédure de ré-auth CDP clarifiée (refresh tokens rotatifs,
backups inutiles, page /login, profil Chrome dédié).
Outils de re-découverte : tools/probe_selection.py, tools/probe_menu_capture.py
156 lines
7.3 KiB
Python
156 lines
7.3 KiB
Python
"""ÉTAPE 1 — Probe de découverte (jetable).
|
|
|
|
Sonde, sur le compte abonné réel, trois inconnues impossibles à lever en lecture seule :
|
|
1. SÉLECTION : quel GET renvoie les recettes RÉELLEMENT dans la box de la prochaine
|
|
livraison (≠ menu complet), et sous quelle forme (ids vs index de course).
|
|
2. IMAGE : quelle transformation rend une URL d'image téléchargeable (les URLs brutes
|
|
cloudfront renvoient des 502).
|
|
3. FAVORIS : quel endpoint gateway sert la page /recipes/favorites, et sa forme.
|
|
|
|
Réutilise HelloFreshClient (token + re-auth 401 déjà gérés). Ce sont des GET → lecture seule.
|
|
|
|
Usage :
|
|
python tools/probe_selection.py
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
sys.path.insert(0, str(ROOT))
|
|
|
|
from hellofresh import api # noqa: E402
|
|
|
|
|
|
def _short(obj, n: int = 1400) -> str:
|
|
"""JSON tronqué pour garder la sortie lisible."""
|
|
s = json.dumps(obj, ensure_ascii=False, indent=2)
|
|
return s if len(s) <= n else s[:n] + f"\n… (+{len(s) - n} chars)"
|
|
|
|
|
|
def _keys(obj) -> str:
|
|
if isinstance(obj, dict):
|
|
return "dict keys=" + ", ".join(list(obj.keys())[:25])
|
|
if isinstance(obj, list):
|
|
head = obj[0] if obj else None
|
|
return f"list len={len(obj)} head={_keys(head) if isinstance(head, (dict, list)) else type(head).__name__}"
|
|
return type(obj).__name__
|
|
|
|
|
|
def _get(client: api.HelloFreshClient, url: str, params: dict) -> None:
|
|
print(f"\n>>> GET {url}\n params={params}")
|
|
try:
|
|
resp = client._client.request("GET", url, params=params)
|
|
if resp.status_code == 401:
|
|
from hellofresh import auth
|
|
client._token = auth.get_token(force=True)
|
|
client._client.headers["Authorization"] = f"Bearer {client._token}"
|
|
resp = client._client.request("GET", url, params=params)
|
|
print(f" -> {resp.status_code} {resp.headers.get('content-type', '')}")
|
|
if resp.status_code >= 400:
|
|
print(f" body[:300]={resp.text[:300]!r}")
|
|
return
|
|
try:
|
|
data = resp.json()
|
|
except Exception:
|
|
print(f" (non-JSON) body[:300]={resp.text[:300]!r}")
|
|
return
|
|
print(f" shape: {_keys(data)}")
|
|
print(" " + _short(data).replace("\n", "\n "))
|
|
except Exception as e: # noqa: BLE001
|
|
print(f" !! exception: {type(e).__name__}: {e}")
|
|
|
|
|
|
def main() -> None:
|
|
base = "https://www.hellofresh.fr/gw"
|
|
with api.HelloFreshClient() as client:
|
|
country, locale = client._country, client._locale
|
|
sub = client._subscription_info()
|
|
acct = client.account_info()
|
|
nd = acct.get("next_delivery") or {}
|
|
week = nd.get("week") or api.current_week()
|
|
deliv = next((d for d in client._deliveries() if d.week == week), None)
|
|
cutoff = deliv.cutoff_date if deliv else (nd.get("cutoff") or "")
|
|
|
|
print("=" * 70)
|
|
print(f"sub_id={sub['sub_id']} customer_id={sub['customer_id']} sku={sub['sku']}")
|
|
print(f"next_delivery week={week} date={nd.get('date')} cutoff={cutoff}")
|
|
print(f"meals attendus = {acct.get('subscription', {}).get('meals')}")
|
|
print("=" * 70)
|
|
|
|
# --- 1. SÉLECTION ---------------------------------------------------
|
|
full_params = {
|
|
"customer": sub["customer_id"], "subscription": sub["sub_id"],
|
|
"product-sku": sub["sku"], "week": week, "cutoff_time": cutoff,
|
|
"country": country, "locale": locale,
|
|
}
|
|
print("\n########## 1. SÉLECTION ##########")
|
|
# Candidat A — symétrique du PUT /gw/v1/carts/{week}
|
|
_get(client, f"{base}/v1/carts/{week}", full_params)
|
|
_get(client, f"{base}/v1/carts/{week}", {"country": country, "locale": locale})
|
|
# Candidat C — /deliveries porte peut-être la sélection : dump brut du Delivery
|
|
print("\n>>> RAW /deliveries (cherche meals/courses/selectedRecipes/cart par semaine)")
|
|
draw = client._request("GET", client._ep["weeks"], params={
|
|
"country": country, "locale": locale,
|
|
"rangeStart": api.current_week(0), "rangeEnd": api.current_week(10),
|
|
}).json()
|
|
items = draw.get("items", []) if isinstance(draw, dict) else (draw or [])
|
|
match = next((w for w in items if str(w.get("id") or w.get("week")) == week), None)
|
|
print(" " + _short(match or {"_no_match_for_week": week}).replace("\n", "\n "))
|
|
# Candidat B — cartId réel de l'abonné, cherché dans /subscriptions brut
|
|
print("\n>>> RAW /subscriptions (cherche un id de cart hebdo)")
|
|
sraw = client._request("GET", client._ep["subscriptions"],
|
|
params={"country": country}).json()
|
|
sitems = sraw.get("items", []) if isinstance(sraw, dict) else (sraw or [])
|
|
print(" " + _short(sitems[0] if sitems else sraw).replace("\n", "\n "))
|
|
|
|
# --- 2. IMAGE -------------------------------------------------------
|
|
print("\n########## 2. IMAGE ##########")
|
|
recipes = client.get_menu(week)
|
|
raw_url = next((r.image_url for r in recipes if r.image_url), "")
|
|
print(f"image brute = {raw_url}")
|
|
if raw_url:
|
|
from urllib.parse import urlsplit, urlunsplit
|
|
parts = urlsplit(raw_url)
|
|
for host in ("media.hellofresh.com", "img.hellofresh.com"):
|
|
cand = urlunsplit((parts.scheme, host, parts.path, parts.query, parts.fragment))
|
|
try:
|
|
r = client._client.request("GET", cand, headers={"Range": "bytes=0-0"})
|
|
print(f" {host:24} -> {r.status_code} {r.headers.get('content-type', '')}")
|
|
except Exception as e: # noqa: BLE001
|
|
print(f" {host:24} -> !! {type(e).__name__}: {e}")
|
|
try:
|
|
r = client._client.request("GET", raw_url, headers={"Range": "bytes=0-0"})
|
|
print(f" {'(brute)':24} -> {r.status_code} {r.headers.get('content-type', '')}")
|
|
except Exception as e: # noqa: BLE001
|
|
print(f" {'(brute)':24} -> !! {type(e).__name__}: {e}")
|
|
|
|
# --- 3. FAVORIS (endpoint réel capturé via CDP) --------------------
|
|
print("\n########## 3. FAVORIS ##########")
|
|
_get(client, f"{base}/cfs/v2/favorites/recipe",
|
|
{"country": country, "locale": locale, "ids": ""})
|
|
|
|
# --- 4. PLAN (candidat sélection) ----------------------------------
|
|
print("\n########## 4. PLAN (candidat sélection) ##########")
|
|
# planId vu dans /gw/api/plans/{id} ; on le cherche dans subscriptions/me brut
|
|
plan_id = None
|
|
for cand in (sitems[0] if sitems else {}, sraw if isinstance(sraw, dict) else {}):
|
|
if isinstance(cand, dict):
|
|
for k in ("planId", "plan_id"):
|
|
if cand.get(k):
|
|
plan_id = cand[k]
|
|
meraw = client._request("GET", f"{base}/api/customers/me",
|
|
params={"country": country, "locale": locale}).json()
|
|
print(" me keys:", _keys(meraw))
|
|
if plan_id:
|
|
_get(client, f"{base}/api/plans/{plan_id}", {})
|
|
else:
|
|
print(" (planId introuvable dans subscriptions/me — voir dump deliveries ci-dessus)")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|