Le refresh du token passe désormais par POST /gw/refresh (l'endpoint que la SPA appelle) au lieu d'un navigateur headless : pur httpx, refresh_token rotaté persisté dans token.json, fenêtre 60j remise à zéro à chaque refresh. Lock single-flight pour la rotation. get_token()/auth_status() tentent /gw/refresh avant le filet Playwright. Homelab allumé = authentifié indéfiniment, sans re-sync.
445 lines
17 KiB
Python
445 lines
17 KiB
Python
"""Authentification HelloFresh via Playwright + capture du bearer token.
|
|
|
|
Stratégie (cf. plan) :
|
|
- **Login local d'abord** : sur le Mac, fenêtre visible (`headless=False`) → l'utilisateur
|
|
se connecte (captcha/2FA gérés à la main). La session est persistée dans `.session/profile`.
|
|
- **Homelab** : `headless=True`, réutilise la session synchronisée. `HF_EMAIL`/`HF_PASSWORD`
|
|
servent uniquement de fallback de re-login auto si la session a expiré.
|
|
- **Token** : on intercepte l'en-tête `Authorization: Bearer …` envoyé aux hôtes gateway et on
|
|
le met en cache dans `.session/token.json`. `api.py` le réutilise pour les appels httpx.
|
|
|
|
Pattern persistant repris d'`Automood/scraper.py` (`launch_persistent_context`).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import json
|
|
import os
|
|
import threading
|
|
import time
|
|
import urllib.parse
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
from playwright.sync_api import sync_playwright
|
|
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
SESSION_DIR = ROOT / ".session"
|
|
PROFILE_DIR = SESSION_DIR / "profile"
|
|
TOKEN_CACHE = SESSION_DIR / "token.json"
|
|
STATE_PATH = SESSION_DIR / "storage_state.json"
|
|
|
|
BASE_URL = "https://www.hellofresh.fr"
|
|
# Passerelle de renouvellement de HelloFresh : `POST {BASE_URL}/gw/refresh` avec
|
|
# `{"refresh_token": ...}` renvoie un bundle frais (access_token JWT + nouveau
|
|
# refresh_token rotaté + refresh_expires_in ~60 j). C'est ce que la SPA appelle ;
|
|
# côté serveur HelloFresh c'est wrappé sur Auth0 (le client_secret reste chez eux,
|
|
# d'où l'impossibilité d'appeler Auth0 en direct). Pur HTTP, pas de navigateur,
|
|
# hors du parcours web protégé par l'anti-bot. Chaque refresh remet la fenêtre à
|
|
# 60 j : un homelab allumé reste authentifié indéfiniment.
|
|
REFRESH_URL = f"{BASE_URL}/gw/refresh"
|
|
|
|
# Sérialise les renouvellements : les outils MCP tournent dans un worker thread
|
|
# (anyio.to_thread) → appels concurrents possibles. La rotation du refresh_token
|
|
# rendrait fragile un double appel ; le lock sérialise (avec re-test du cache frais
|
|
# après acquisition).
|
|
_refresh_lock = threading.Lock()
|
|
# Page qui déclenche des appels gateway authentifiés (menu de la semaine).
|
|
MENU_PAGE = f"{BASE_URL}/my-account/deliveries/menu"
|
|
ACCOUNT_PAGE = f"{BASE_URL}/my-account"
|
|
# Endpoint gateway authentifié, léger, utilisé pour VÉRIFIER qu'un token est réellement
|
|
# accepté (200) ou non (401). Vérité de terrain, contre les faux positifs de détection UI.
|
|
VALIDATE_URL = f"{BASE_URL}/gw/api/customers/me/subscriptions"
|
|
|
|
ATTENTE_LOGIN_S = 180 # temps laissé pour un login manuel (captcha / 2FA)
|
|
TOKEN_TTL_S = 30 * 60 # on rafraîchit le token au-delà de 30 min par prudence
|
|
|
|
|
|
def _headless() -> bool:
|
|
return os.environ.get("ANTICOCO_HEADLESS", "1") not in ("0", "false", "False", "")
|
|
|
|
|
|
def _channel() -> str | None:
|
|
"""Canal navigateur : 'chrome' en local (évite les blocages du Chromium bundlé),
|
|
vide/None sur le homelab headless (image Docker = chromium Playwright).
|
|
|
|
Défini par ANTICOCO_BROWSER_CHANNEL ('chrome', 'msedge', ou '' pour chromium).
|
|
"""
|
|
ch = os.environ.get("ANTICOCO_BROWSER_CHANNEL", "")
|
|
return ch or None
|
|
|
|
|
|
def _is_gateway_request(url: str) -> bool:
|
|
return ("/gw/" in url or url.startswith("https://gw.")) and "hellofresh" in url
|
|
|
|
|
|
def _is_logged_in(page) -> bool:
|
|
"""Connecté = cookie d'auth `apiV2Auth` présent avec un access_token non expiré.
|
|
|
|
Signal POSITIF (cookie réel) plutôt que l'absence d'un champ mot de passe : cette
|
|
dernière produisait des faux positifs en headless (page de login non rendue/redirigée,
|
|
bannière cookies) → la session paraissait valide alors qu'elle ne l'était pas.
|
|
"""
|
|
try:
|
|
for c in page.context.cookies():
|
|
if c.get("name") == "apiV2Auth":
|
|
data = json.loads(urllib.parse.unquote(c.get("value", "")))
|
|
tok = data.get("access_token")
|
|
if tok and _jwt_exp(tok) > time.time() + 60:
|
|
return True
|
|
return False
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _auto_login(page) -> bool:
|
|
"""Tente un login automatique avec HF_EMAIL/HF_PASSWORD. Best-effort.
|
|
|
|
Les sélecteurs exacts du formulaire HelloFresh sont à confirmer ; on cible les
|
|
champs standards. En cas d'échec (captcha, sélecteurs changés), renvoie False et
|
|
on retombe sur le login manuel.
|
|
"""
|
|
email = os.environ.get("HF_EMAIL")
|
|
password = os.environ.get("HF_PASSWORD")
|
|
if not email or not password:
|
|
return False
|
|
try:
|
|
page.fill('input[type="email"], input[name="email"], input#email', email, timeout=8000)
|
|
page.fill('input[type="password"], input[name="password"]', password, timeout=8000)
|
|
page.click('button[type="submit"], button[data-test-id="login-submit"]', timeout=8000)
|
|
page.wait_for_timeout(4000)
|
|
return _is_logged_in(page)
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _open_context(pw):
|
|
kwargs = dict(
|
|
user_data_dir=str(PROFILE_DIR),
|
|
headless=_headless(),
|
|
locale="fr-FR",
|
|
viewport={"width": 1280, "height": 900},
|
|
)
|
|
channel = _channel()
|
|
if channel:
|
|
kwargs["channel"] = channel
|
|
return pw.chromium.launch_persistent_context(**kwargs)
|
|
|
|
|
|
def ensure_logged_in() -> bool:
|
|
"""Garantit une session connectée dans le profil persistant.
|
|
|
|
- Si déjà connecté : retourne True immédiatement.
|
|
- Sinon, tente l'auto-login (env) ; à défaut attend un login manuel (fenêtre visible).
|
|
Retourne True si la session est établie.
|
|
"""
|
|
SESSION_DIR.mkdir(parents=True, exist_ok=True)
|
|
with sync_playwright() as pw:
|
|
ctx = _open_context(pw)
|
|
try:
|
|
page = ctx.pages[0] if ctx.pages else ctx.new_page()
|
|
page.goto(ACCOUNT_PAGE, wait_until="domcontentloaded", timeout=30000)
|
|
page.wait_for_timeout(2000)
|
|
if _is_logged_in(page):
|
|
return True
|
|
|
|
# Fallback 1 : auto-login si identifiants fournis.
|
|
if _auto_login(page):
|
|
page.wait_for_timeout(2000)
|
|
return True
|
|
|
|
# Fallback 2 : login manuel (uniquement utile en fenêtre visible).
|
|
if _headless():
|
|
raise RuntimeError(
|
|
"Session HelloFresh expirée et auto-login impossible en headless. "
|
|
"Refaire le login en local (ANTICOCO_HEADLESS=0) puis re-sync .session/."
|
|
)
|
|
debut = time.time()
|
|
while time.time() - debut < ATTENTE_LOGIN_S:
|
|
if _is_logged_in(page):
|
|
page.wait_for_timeout(2000)
|
|
return True
|
|
page.wait_for_timeout(2000)
|
|
return False
|
|
finally:
|
|
ctx.close()
|
|
|
|
|
|
def _capture_from_context(ctx, roll_state: bool) -> dict:
|
|
"""Charge le menu dans `ctx` et capture le bearer token (refraîchi par la SPA).
|
|
|
|
`roll_state=True` : ré-exporte storage_state.json après coup, pour faire « rouler »
|
|
la session (le refresh_token tourne ~60 j) sans intervention manuelle.
|
|
"""
|
|
observed = {"token": None, "gateways": set()}
|
|
page = ctx.pages[0] if ctx.pages else ctx.new_page()
|
|
|
|
def on_request(req):
|
|
try:
|
|
if not _is_gateway_request(req.url):
|
|
return
|
|
base = req.url.split("/gw/")[0] + "/gw" if "/gw/" in req.url else req.url
|
|
observed["gateways"].add(base)
|
|
a = req.headers.get("authorization") or req.headers.get("Authorization")
|
|
if a and a.lower().startswith("bearer "):
|
|
observed["token"] = a.split(" ", 1)[1].strip()
|
|
except Exception:
|
|
pass
|
|
|
|
page.on("request", on_request)
|
|
page.goto(MENU_PAGE, wait_until="domcontentloaded", timeout=45000)
|
|
# SPA lourde : on attend que les appels gateway (et un éventuel refresh) partent,
|
|
# plutôt que networkidle qui ne se déclenche jamais.
|
|
for _ in range(15):
|
|
page.wait_for_timeout(1000)
|
|
if observed["token"]:
|
|
page.wait_for_timeout(1000)
|
|
break
|
|
|
|
if not observed["token"]:
|
|
raise RuntimeError(
|
|
"Aucun bearer token capturé (session expirée ? refaire le login + re-sync). "
|
|
)
|
|
|
|
if roll_state:
|
|
try:
|
|
ctx.storage_state(path=str(STATE_PATH))
|
|
except Exception:
|
|
pass
|
|
|
|
result = {
|
|
"token": observed["token"],
|
|
"gateways": sorted(observed["gateways"]),
|
|
"captured_at": time.time(),
|
|
}
|
|
TOKEN_CACHE.write_text(json.dumps(result, indent=2), encoding="utf-8")
|
|
return result
|
|
|
|
|
|
def capture_token(force: bool = False) -> dict:
|
|
"""Renvoie un bearer token frais (et les hôtes gateway), via navigateur.
|
|
|
|
Préfère `storage_state.json` (cookies 60 j) → fonctionne en **headless** sur le homelab :
|
|
la SPA rafraîchit elle-même le token (contourne la protection anti-bot des endpoints
|
|
OAuth bruts). À défaut, retombe sur le profil persistant (login interactif).
|
|
"""
|
|
cached = _read_token_cache()
|
|
if cached and not force and (time.time() - cached.get("captured_at", 0)) < TOKEN_TTL_S:
|
|
return cached
|
|
|
|
SESSION_DIR.mkdir(parents=True, exist_ok=True)
|
|
with sync_playwright() as pw:
|
|
if STATE_PATH.exists():
|
|
launch = {"headless": _headless()}
|
|
if _channel():
|
|
launch["channel"] = _channel()
|
|
browser = pw.chromium.launch(**launch)
|
|
ctx = browser.new_context(storage_state=str(STATE_PATH), locale="fr-FR")
|
|
try:
|
|
return _capture_from_context(ctx, roll_state=True)
|
|
finally:
|
|
ctx.close()
|
|
browser.close()
|
|
ctx = _open_context(pw)
|
|
try:
|
|
return _capture_from_context(ctx, roll_state=False)
|
|
finally:
|
|
ctx.close()
|
|
|
|
|
|
def _read_token_cache() -> dict | None:
|
|
if TOKEN_CACHE.exists():
|
|
try:
|
|
return json.loads(TOKEN_CACHE.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return None
|
|
return None
|
|
|
|
|
|
def _jwt_exp(token: str) -> float:
|
|
"""Renvoie le timestamp d'expiration d'un JWT (0 si indéterminable)."""
|
|
try:
|
|
payload = token.split(".")[1]
|
|
payload += "=" * (-len(payload) % 4) # padding base64url
|
|
data = json.loads(base64.urlsafe_b64decode(payload))
|
|
return float(data.get("exp", 0))
|
|
except Exception:
|
|
return 0.0
|
|
|
|
|
|
def token_from_storage_state() -> str | None:
|
|
"""Extrait l'access_token du cookie `apiV2Auth` de .session/storage_state.json.
|
|
|
|
Permet de s'authentifier SANS navigateur (utile en headless sur le homelab),
|
|
tant que la session synchronisée n'a pas expiré. Renvoie None si absent/expiré.
|
|
"""
|
|
if not STATE_PATH.exists():
|
|
return None
|
|
try:
|
|
state = json.loads(STATE_PATH.read_text(encoding="utf-8"))
|
|
for c in state.get("cookies", []):
|
|
if c.get("name") == "apiV2Auth":
|
|
data = json.loads(urllib.parse.unquote(c["value"]))
|
|
tok = data.get("access_token")
|
|
if tok and _jwt_exp(tok) > time.time() + 60:
|
|
return tok
|
|
except Exception:
|
|
return None
|
|
return None
|
|
|
|
|
|
def _save_token(access_token: str, refresh_token: str | None = None,
|
|
gateways: list[str] | None = None) -> dict:
|
|
"""Met à jour `.session/token.json` (access token + refresh_token rotatif).
|
|
|
|
On préserve les `gateways`/`refresh_token` déjà connus si non fournis, pour ne
|
|
pas perdre le refresh_token courant lors d'un simple rafraîchissement d'access.
|
|
"""
|
|
prev = _read_token_cache() or {}
|
|
result = {
|
|
"token": access_token,
|
|
"gateways": gateways if gateways is not None else prev.get("gateways", [f"{BASE_URL}/gw"]),
|
|
"captured_at": time.time(),
|
|
}
|
|
rt = refresh_token or prev.get("refresh_token")
|
|
if rt:
|
|
result["refresh_token"] = rt
|
|
SESSION_DIR.mkdir(parents=True, exist_ok=True)
|
|
TOKEN_CACHE.write_text(json.dumps(result, indent=2), encoding="utf-8")
|
|
return result
|
|
|
|
|
|
def _refresh_token_from_storage_state() -> str | None:
|
|
"""Extrait le refresh_token du cookie `apiV2Auth` de storage_state.json.
|
|
|
|
Sert de bootstrap : c'est le refresh_token déjà présent dans la session capturée,
|
|
donc aucun nouveau login local n'est requis pour migrer vers le refresh HTTP.
|
|
"""
|
|
if not STATE_PATH.exists():
|
|
return None
|
|
try:
|
|
state = json.loads(STATE_PATH.read_text(encoding="utf-8"))
|
|
for c in state.get("cookies", []):
|
|
if c.get("name") == "apiV2Auth":
|
|
data = json.loads(urllib.parse.unquote(c["value"]))
|
|
return data.get("refresh_token") or None
|
|
except Exception:
|
|
return None
|
|
return None
|
|
|
|
|
|
def _get_refresh_token() -> str | None:
|
|
"""Refresh token courant : cache token.json (rotations) puis bootstrap cookie."""
|
|
cached = _read_token_cache()
|
|
if cached and cached.get("refresh_token"):
|
|
return cached["refresh_token"]
|
|
return _refresh_token_from_storage_state()
|
|
|
|
|
|
def _refresh_session() -> str | None:
|
|
"""Renouvelle la session via `POST /gw/refresh` (pur HTTP, sans navigateur).
|
|
|
|
Persiste le bundle frais : access_token + nouveau refresh_token (rotaté). Renvoie
|
|
l'access token, ou None si pas de refresh_token disponible / échec réseau ou HTTP.
|
|
"""
|
|
rt = _get_refresh_token()
|
|
if not rt:
|
|
return None
|
|
try:
|
|
resp = httpx.post(REFRESH_URL, json={"refresh_token": rt}, timeout=20.0,
|
|
headers={"Accept": "application/json"})
|
|
except Exception as e:
|
|
print(f"[auth] /gw/refresh réseau KO: {e}")
|
|
return None
|
|
if resp.status_code != 200:
|
|
print(f"[auth] /gw/refresh → HTTP {resp.status_code} ({resp.text[:160]})")
|
|
return None
|
|
data = resp.json()
|
|
access = data.get("access_token")
|
|
if not access:
|
|
return None
|
|
_save_token(access, refresh_token=data.get("refresh_token"))
|
|
return access
|
|
|
|
|
|
def get_token(force: bool = False) -> str:
|
|
"""Renvoie un bearer token valide, par ordre de préférence :
|
|
|
|
1. token en cache encore frais ;
|
|
2. cookie `apiV2Auth` de la session synchronisée (sans navigateur) ;
|
|
3. refresh via `/gw/refresh` (pur HTTP, sans navigateur) ;
|
|
4. capture via navigateur (filet de secours : auto-login HF_EMAIL/HF_PASSWORD
|
|
ou login interactif) — impossible en headless si la session est morte.
|
|
"""
|
|
if not force:
|
|
cached = _read_token_cache()
|
|
if cached and (time.time() - cached.get("captured_at", 0)) < TOKEN_TTL_S:
|
|
return cached["token"]
|
|
tok = token_from_storage_state()
|
|
if tok:
|
|
return tok
|
|
# Renouvellement nécessaire. On privilégie le HTTP pur, sérialisé pour ne pas
|
|
# brûler un refresh_token rotatif via deux appels concurrents.
|
|
with _refresh_lock:
|
|
cached = _read_token_cache() # un autre thread a pu rafraîchir entre-temps
|
|
if cached and (time.time() - cached.get("captured_at", 0)) < TOKEN_TTL_S:
|
|
return cached["token"]
|
|
tok = _refresh_session()
|
|
if tok:
|
|
return tok
|
|
# Dernier recours : navigateur. Session synchronisée → refresh headless ;
|
|
# sinon login interactif via le profil persistant (inutile en headless homelab).
|
|
if STATE_PATH.exists():
|
|
return capture_token(force=force)["token"]
|
|
ensure_logged_in()
|
|
return capture_token(force=force)["token"]
|
|
|
|
|
|
def _token_works(token: str) -> bool:
|
|
"""Vrai si le token est réellement accepté par l'API gateway (appel léger, 200 vs 401).
|
|
|
|
Vérité de terrain : c'est ce qui empêche de déclarer « connecté » une session morte.
|
|
En cas d'erreur réseau (résultat indéterminable), renvoie False par prudence.
|
|
"""
|
|
try:
|
|
resp = httpx.get(
|
|
VALIDATE_URL,
|
|
params={"country": "FR"},
|
|
headers={"Authorization": f"Bearer {token}", "Accept": "application/json"},
|
|
timeout=15.0,
|
|
)
|
|
except Exception:
|
|
return False
|
|
return resp.status_code == 200
|
|
|
|
|
|
def auth_status() -> dict:
|
|
"""État de connexion VÉRIFIÉ par un vrai appel API — pas de faux positif.
|
|
|
|
N'ouvre jamais de navigateur : on prend le meilleur token disponible (cache frais,
|
|
puis cookie storage_state) et on le valide contre l'API. Si aucun token n'est
|
|
exploitable ou s'il est rejeté, renvoie logged_in=False avec une consigne de re-login.
|
|
"""
|
|
cached = _read_token_cache()
|
|
if cached and (time.time() - cached.get("captured_at", 0)) < TOKEN_TTL_S:
|
|
token = cached.get("token")
|
|
if token and _token_works(token):
|
|
age = int(time.time() - cached["captured_at"])
|
|
return {"logged_in": True, "source": "cache", "token_age_s": age,
|
|
"gateways": cached.get("gateways", [])}
|
|
tok = token_from_storage_state()
|
|
if tok and _token_works(tok):
|
|
return {"logged_in": True, "source": "storage_state"}
|
|
# Récupération autonome SANS navigateur : refresh via /gw/refresh.
|
|
with _refresh_lock:
|
|
tok = _refresh_session()
|
|
if tok and _token_works(tok):
|
|
return {"logged_in": True, "source": "gw_refresh"}
|
|
return {
|
|
"logged_in": False,
|
|
"error": "Session HelloFresh absente ou expirée (aucun token valide accepté par l'API). "
|
|
"Refaire le login en local (ANTICOCO_HEADLESS=0, auto-login via HF_EMAIL/"
|
|
"HF_PASSWORD ou manuel) puis re-sync .session/.",
|
|
}
|