Files
Monitorink/backend/integrations/claude_usage.py
Jerem 53fefd4654 Claude: refresh OAuth proactif + erreurs fatales distinctes (fini les reconnexions manuelles)
Le refresh token est rotatif : la chaîne se régénère seule indéfiniment tant que
le nouveau token est persisté. La reconnexion manuelle n'était requise que lorsque
cet invariant cassait. Trois correctifs :

- Refresh PROACTIF : on rafraîchit dès qu'il reste < 2h sur le token (~8h de vie)
  au lieu des 2 dernières minutes. Un échec transitoire a des heures de marge avant
  que l'access token meure ; la fenêtre où un kill/timeout perd le token rotatif
  fraîchement rotaté passe de ~8h à quelques ms. Réglable via
  MONITORINK_CLAUDE_REFRESH_LEAD_MIN (défaut 120).
- Distinction FATAL vs TRANSITOIRE : 400 invalid_grant / 401 sur l'endpoint token
  -> _RefreshFatal, sans backoff ni re-soumission en boucle (évite la reuse-detection
  qui révoque toute la famille). 429/5xx/réseau gardent le backoff exponentiel.
- Visibilité + auto-réparation : le cas fatal affiche "Reconnexion Claude requise"
  (pas de repli cache silencieux) et l'alerte se referme seule dès qu'un token frais
  réapparaît sur disque (re-login isolé), sans redémarrer le conteneur.

Timeout du POST de refresh porté à 45s (réglable, MONITORINK_CLAUDE_REFRESH_TIMEOUT)
pour réduire la fenêtre de perte du token après rotation serveur.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-18 10:46:21 +02:00

427 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Récupération de l'usage de l'abonnement Claude via l'endpoint OAuth `/usage`.
Validé empiriquement (compte Max 5x) :
GET https://api.anthropic.com/api/oauth/usage
headers: Authorization: Bearer <token>, anthropic-beta: oauth-2025-04-20,
User-Agent: claude-code/<version>, Content-Type: application/json
réponse: { five_hour:{utilization,resets_at}, seven_day:{...},
seven_day_opus, seven_day_sonnet, extra_usage }
⚠️ L'endpoint exige le scope OAuth `user:profile`. Le token `claude setup-token` ne l'a PAS
(403). On utilise donc les credentials d'un **login Claude isolé dédié** (CLAUDE_CONFIG_DIR
séparé) : structure `{ claudeAiOauth: { accessToken, refreshToken, expiresAt(ms) } }`.
Le token expire (~8 h) ; on le rafraîchit via platform.claude.com et on réécrit le fichier
isolé de façon atomique. On ne touche jamais le ~/.claude partagé.
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import subprocess
import tempfile
import time
from dataclasses import dataclass
from datetime import datetime, timezone
import httpx
from config import config
log = logging.getLogger("monitorink.claude")
USAGE_URL = "https://api.anthropic.com/api/oauth/usage"
REFRESH_URL = "https://platform.claude.com/v1/oauth/token"
CLIENT_ID = "9d1c250a-e61b-44d9-88ed-5944d1962f5e" # client public Claude Code
EXPIRY_BUFFER_MS = 120_000 # rafraîchit 2 min avant expiration
_refresh_lock = asyncio.Lock()
# Backoff après échec du refresh OAuth (typiquement 429) : sans ça, le token expiré déclenche un
# refresh À CHAQUE rendu, l'échec n'étant pas mémorisé -> on martèle platform.claude.com et on
# entretient le rate-limit. Pendant la fenêtre on ne retente pas (on sert la dernière valeur connue).
#
# Le backoff est EXPONENTIEL (10min, 20, 40... plafonné à 6h) et non plus fixe : un backoff court
# (5min) couplé à la cadence de rendu (~15min) faisait retenter le refresh ~4×/h en boucle, ce qui
# resaturait en permanence le rate-limit 429 et l'empêchait de se résorber (token figé >15h observé).
# Comme le token vit ~8h, espacer fortement les retentatives ne coûte rien et laisse le 429 retomber.
# On respecte aussi l'en-tête Retry-After de l'API quand il est plus long que notre palier.
REFRESH_BACKOFF_BASE = 600 # 1er palier après un échec : 10 min
REFRESH_BACKOFF_CAP = 6 * 3600 # plafond : 6 h
_refresh_state: dict[str, float] = {"backoff_until": 0.0, "failures": 0.0}
def _retry_after_seconds(exc: Exception) -> float | None:
"""Secondes de l'en-tête Retry-After d'une réponse HTTP d'erreur, si présent et numérique."""
resp = getattr(exc, "response", None)
if resp is None:
return None
raw = resp.headers.get("retry-after")
if not raw:
return None
try:
return float(raw)
except ValueError:
return None # forme date-HTTP : on ignore, le palier exponentiel suffit
def _note_refresh_failure(exc: Exception) -> None:
"""Enregistre un échec de refresh et arme le backoff exponentiel (borné, ≥ Retry-After)."""
n = int(_refresh_state["failures"]) + 1
_refresh_state["failures"] = n
delay = min(REFRESH_BACKOFF_BASE * (2 ** (n - 1)), REFRESH_BACKOFF_CAP)
retry_after = _retry_after_seconds(exc)
if retry_after is not None:
delay = max(delay, retry_after)
_refresh_state["backoff_until"] = time.time() + delay
log.warning("refresh OAuth Claude échoué (échec #%d) — backoff %ds : %s", n, int(delay), exc)
def _note_refresh_success() -> None:
"""Refresh réussi : on réarme à zéro (palier + fenêtre de backoff)."""
_refresh_state["failures"] = 0
_refresh_state["backoff_until"] = 0.0
class _RefreshThrottled(Exception):
"""Refresh en backoff (anti-429), pas une vraie erreur réseau/auth."""
class _RefreshFatal(Exception):
"""Refresh refusé DÉFINITIVEMENT (invalid_grant / invalid_request / 401) : le refresh token
est mort, seul un nouveau login isolé répare. À distinguer absolument d'un 429/réseau (transitoire) :
inutile de backoff/retenter, et surtout il ne faut PAS re-soumettre ce token en boucle."""
# Un login isolé est requis (refresh révoqué). Armé sur erreur fatale, désarmé dès qu'un refresh
# réussit OU qu'on relit un token redevenu frais (= l'utilisateur a relancé le login CLI). Sert à
# court-circuiter les refresh (on sait le token mort) et à afficher une alerte claire à l'écran.
_login_required: dict[str, object] = {"flag": False, "detail": ""}
def _set_login_required(detail: str) -> None:
_login_required["flag"] = True
_login_required["detail"] = detail
def _clear_login_required() -> None:
_login_required["flag"] = False
_login_required["detail"] = ""
@dataclass
class Window:
"""Une fenêtre glissante d'usage (5h ou 7j)."""
utilization: float # 0..100
resets_at: datetime | None
@property
def remaining_pct(self) -> float:
return max(0.0, 100.0 - self.utilization)
@property
def resets_in_human(self) -> str:
if not self.resets_at:
return ""
delta = self.resets_at - datetime.now(timezone.utc)
secs = int(delta.total_seconds())
if secs <= 0:
return "bientôt"
h, m = divmod(secs // 60, 60)
if h >= 24:
return f"{h // 24}j {h % 24}h"
if h:
return f"{h}h{m:02d}"
return f"{m}min"
@dataclass
class ExtraUsage:
"""Crédits « extra usage » (pay-as-you-go au-delà de l'abonnement)."""
used: float
limit: float | None
currency: str = "EUR"
def _fmt(self, cents: float) -> str:
"""Montant en euros : les valeurs de l'API sont en centimes (136 -> 1,36€)."""
sym = "" if self.currency == "EUR" else f" {self.currency}"
v = cents / 100
s = f"{int(v)}" if v == int(v) else f"{v:.2f}".replace(".", ",")
return f"{s}{sym}"
@property
def label(self) -> str:
# L'API n'expose que l'extra dépensé sur le mois et le plafond mensuel
# (pas le solde de crédits prépayés). On affiche donc utilisé / plafond.
if self.limit:
return f"Extra : {self._fmt(self.used)} utilisé / {self._fmt(self.limit)} ce mois"
return f"Extra : {self._fmt(self.used)} utilisé"
@dataclass
class ClaudeUsage:
ok: bool
error: str | None = None
# True quand l'erreur exige une action humaine (relancer le login isolé). Empêche le repli
# silencieux sur le cache : on veut que le message remonte à l'écran.
fatal: bool = False
five_hour: Window | None = None
seven_day: Window | None = None
seven_day_opus: Window | None = None
seven_day_sonnet: Window | None = None
extra: ExtraUsage | None = None
burn_rate: float | None = None # tokens/min (ccusage, optionnel)
def _parse_window(raw: dict | None) -> Window | None:
if not raw:
return None
util = float(raw.get("utilization", 0) or 0)
resets = raw.get("resets_at")
dt = None
if resets:
try:
dt = datetime.fromisoformat(str(resets).replace("Z", "+00:00"))
except ValueError:
dt = None
return Window(utilization=util, resets_at=dt)
def _load_creds() -> dict:
with open(config.claude_creds_path) as f:
return json.load(f)
def _write_creds(data: dict) -> None:
"""Écriture atomique du fichier de credentials isolé (mode 0600)."""
path = config.claude_creds_path
fd, tmp = tempfile.mkstemp(dir=os.path.dirname(path) or ".")
try:
with os.fdopen(fd, "w") as f:
json.dump(data, f)
os.chmod(tmp, 0o600)
os.replace(tmp, path)
except BaseException:
if os.path.exists(tmp):
os.unlink(tmp)
raise
# Codes d'erreur OAuth qui signifient « ce refresh token est mort » (≠ rate-limit/réseau).
_FATAL_REFRESH_ERRORS = {"invalid_grant", "invalid_request", "invalid_client"}
async def _refresh(data: dict) -> str:
"""Rafraîchit l'access token, réécrit le fichier isolé, renvoie le nouvel access token.
Le refresh token est ROTATIF : un refresh réussi le remplace et invalide l'ancien. On NE
retente JAMAIS en interne avec le même token (pas de boucle for / tenacity ici) : re-soumettre
un token déjà consommé peut déclencher la reuse-detection côté Anthropic et révoquer toute la
famille -> login forcé. L'espacement des tentatives vient du backoff de l'appelant."""
o = data["claudeAiOauth"]
body = {
"grant_type": "refresh_token",
"refresh_token": o["refreshToken"],
"client_id": CLIENT_ID,
}
async with httpx.AsyncClient(timeout=config.claude_refresh_timeout) as client:
resp = await client.post(
REFRESH_URL, json=body,
headers={"Content-Type": "application/json", "User-Agent": config.claude_ua},
)
# 400 avec un code d'erreur OAuth fatal (ou 401) -> le refresh token est mort : seul un nouveau
# login isolé répare. On arme l'alerte et on lève _RefreshFatal SANS backoff (le 429, lui, est
# transitoire et part dans raise_for_status ci-dessous).
if resp.status_code in (400, 401):
err = ""
try:
err = str((resp.json() or {}).get("error", ""))
except ValueError:
pass
if err in _FATAL_REFRESH_ERRORS or resp.status_code == 401:
_set_login_required(err or f"HTTP {resp.status_code}")
log.error("LOGIN CLAUDE REQUIS — relancer le login isolé "
"(CLAUDE_CONFIG_DIR=… claude auth login) ; refresh rejeté: %s",
err or resp.status_code)
raise _RefreshFatal(err or f"HTTP {resp.status_code}")
resp.raise_for_status() # 429/5xx/autre -> httpx.HTTPError, traité en transitoire (backoff)
tok = resp.json()
o["accessToken"] = tok["access_token"]
o["refreshToken"] = tok["refresh_token"]
o["expiresAt"] = int(time.time() * 1000) + int(tok["expires_in"]) * 1000
try:
_write_creds(data)
except OSError:
# Token frais en mémoire mais non persisté : au prochain démarrage le disque garde l'ANCIEN
# token (déjà consommé) -> rejoue la cause n°1 des reconnexions. On le signale fort.
log.error("nouveau refresh token NON persisté (échec écriture %s)", config.claude_creds_path)
raise
_note_refresh_success()
_clear_login_required() # un refresh réussi prouve que l'auth est de nouveau bonne
log.info("refresh OAuth Claude réussi — token valide jusqu'à %s",
datetime.fromtimestamp(o["expiresAt"] / 1000, timezone.utc).isoformat())
return o["accessToken"]
async def _valid_token() -> str:
"""Renvoie un access token valide, en rafraîchissant PROACTIVEMENT bien avant l'expiration.
- Refresh dès qu'il reste moins de `claude_refresh_lead_ms` (≈2 h) sur le token (~8 h de vie) :
un échec transitoire (429) a alors des heures de marge pour réussir avant que l'access token
meure, et le refresh token rotatif tourne tôt (loin du chemin critique).
- Échec transitoire -> backoff + _RefreshThrottled : l'appelant sert la dernière valeur connue.
- Refresh token mort (invalid_grant/401) -> _RefreshFatal, sans backoff ni re-soumission en
boucle. L'alerte ne retombe qu'au prochain login isolé (token redevenu frais sur disque)."""
# Plancher défensif : même mal configuré (LEAD_MIN=0), on rafraîchit au moins juste avant l'expiry.
lead = max(config.claude_refresh_lead_ms, EXPIRY_BUFFER_MS)
data = _load_creds()
o = data["claudeAiOauth"]
if int(o.get("expiresAt", 0)) - int(time.time() * 1000) > lead:
_clear_login_required() # token frais : si une alerte était armée, un re-login l'a réparée
return o["accessToken"]
if _login_required["flag"]:
raise _RefreshFatal(str(_login_required["detail"])) # token mort connu : pas d'appel réseau
if time.time() < _refresh_state["backoff_until"]:
raise _RefreshThrottled()
async with _refresh_lock:
# Re-lecture après acquisition du lock : un autre coroutine a peut-être déjà rafraîchi, ou
# l'utilisateur a relancé le login isolé entre-temps (token redevenu frais -> on désarme).
data = _load_creds()
o = data["claudeAiOauth"]
if int(o.get("expiresAt", 0)) - int(time.time() * 1000) > lead:
_clear_login_required()
return o["accessToken"]
if _login_required["flag"]:
raise _RefreshFatal(str(_login_required["detail"]))
if time.time() < _refresh_state["backoff_until"]:
raise _RefreshThrottled()
try:
return await _refresh(data)
except httpx.HTTPError as exc:
# Transitoire (429/5xx/réseau/timeout). _RefreshFatal, lui, n'est PAS un httpx.HTTPError :
# il remonte tel quel, sans backoff.
_note_refresh_failure(exc)
raise
async def _get_usage(token: str) -> httpx.Response:
headers = {
"Authorization": f"Bearer {token}",
"anthropic-beta": "oauth-2025-04-20",
"User-Agent": config.claude_ua,
"Content-Type": "application/json",
}
async with httpx.AsyncClient(timeout=15) as client:
return await client.get(USAGE_URL, headers=headers)
def _burn_rate_from_ccusage() -> float | None:
"""Burn rate du bloc 5h actif via `ccusage blocks --json` (best-effort)."""
try:
proc = subprocess.run(
["ccusage", "blocks", "--active", "--json"],
capture_output=True, text=True, timeout=20,
)
if proc.returncode != 0:
return None
data = json.loads(proc.stdout)
for b in data.get("blocks") or []:
if b.get("isActive"):
bp = b.get("burnRate") or {}
rate = bp.get("tokensPerMinute") or bp.get("tokensPerMinuteForIndicator")
return float(rate) if rate is not None else None
except (subprocess.SubprocessError, json.JSONDecodeError, ValueError, FileNotFoundError):
return None
return None
# Dernier usage récupéré avec succès : sert de cache (throttle) ET de repli en cas d'erreur
# transitoire (429, réseau) pour ne pas afficher "HTTP 429" sur l'e-ink. Les libellés dynamiques
# (resets_in_human) restent corrects car recalculés à la volée depuis resets_at.
_usage_cache: dict[str, object] = {"value": None, "ts": 0.0}
async def fetch_usage() -> ClaudeUsage:
now = time.time()
cached = _usage_cache["value"]
if isinstance(cached, ClaudeUsage) and cached.ok and (now - float(_usage_cache["ts"])) < config.usage_ttl_seconds:
return cached
result = await _fetch_usage()
if result.ok:
_usage_cache["value"] = result
_usage_cache["ts"] = now
return result
# Erreur TRANSITOIRE (429, réseau, auth temporaire) : on réaffiche la dernière valeur correcte
# connue plutôt qu'un message d'erreur, le temps que ça se rétablisse. En revanche une erreur
# FATALE (login isolé à relancer) doit passer en clair à l'écran : pas de repli cache.
if not result.fatal and isinstance(cached, ClaudeUsage) and cached.ok:
return cached
return result
async def _fetch_usage() -> ClaudeUsage:
if not os.path.exists(config.claude_creds_path):
return ClaudeUsage(ok=False, error="credentials Claude absents — login isolé requis")
try:
token = await _valid_token()
except _RefreshThrottled:
return ClaudeUsage(ok=False, error="refresh en pause (anti-429)")
except _RefreshFatal:
return ClaudeUsage(ok=False, fatal=True, error="Reconnexion Claude requise")
except (OSError, KeyError, json.JSONDecodeError) as exc:
return ClaudeUsage(ok=False, error=f"credentials illisibles: {exc}")
except httpx.HTTPError as exc:
return ClaudeUsage(ok=False, error=f"refresh échoué: {exc}")
try:
resp = await _get_usage(token)
# Token rejeté malgré la marge -> une tentative de refresh forcé (soumise au backoff).
if resp.status_code == 401:
try:
data = _load_creds()
async with _refresh_lock:
token = await _refresh(data)
except _RefreshFatal:
return ClaudeUsage(ok=False, fatal=True, error="Reconnexion Claude requise")
except httpx.HTTPError as exc:
_note_refresh_failure(exc)
raise
resp = await _get_usage(token)
except httpx.HTTPError as exc:
return ClaudeUsage(ok=False, error=f"réseau: {exc}")
if resp.status_code == 401:
return ClaudeUsage(ok=False, error="auth invalide — relancer le login isolé")
if resp.status_code == 403:
return ClaudeUsage(ok=False, error="scope insuffisant (login complet requis)")
if resp.status_code != 200:
return ClaudeUsage(ok=False, error=f"HTTP {resp.status_code}")
data = resp.json()
burn = _burn_rate_from_ccusage() if config.ccusage_enabled else None
extra = None
eu = data.get("extra_usage") or {}
if eu.get("is_enabled") and eu.get("used_credits"):
extra = ExtraUsage(
used=float(eu.get("used_credits", 0)),
limit=float(eu["monthly_limit"]) if eu.get("monthly_limit") else None,
currency=eu.get("currency", "EUR"),
)
return ClaudeUsage(
ok=True,
five_hour=_parse_window(data.get("five_hour")),
seven_day=_parse_window(data.get("seven_day")),
seven_day_opus=_parse_window(data.get("seven_day_opus")),
seven_day_sonnet=_parse_window(data.get("seven_day_sonnet")),
extra=extra,
burn_rate=burn,
)