Files
Monitorink/backend/integrations/claude_usage.py
Jerem abaede799e Claude: persiste le backoff OAuth sur disque (un redéploiement ne re-sollicite plus le 429)
Le backoff anti-429 du refresh OAuth vivait uniquement en mémoire : chaque
redéploiement le remettait à zéro et re-sollicitait IMMÉDIATEMENT l'endpoint de
refresh rate-limité, entretenant le 429 qu'on cherche justement à laisser retomber.

Persiste backoff_until + le palier exponentiel (failures) sur /data
(claude_oauth_state.json), écriture atomique best-effort à la manière du cache
trackers. Chargé une fois par process en tête de fetch_usage, sauvé à chaque
échec et effacé à chaque succès. Un token frais court-circuite de toute façon le
backoff, donc un re-login isolé débloque immédiatement même si une fenêtre court.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-19 01:04:53 +02:00

468 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Récupération de l'usage de l'abonnement Claude via l'endpoint OAuth `/usage`.
Validé empiriquement (compte Max 5x) :
GET https://api.anthropic.com/api/oauth/usage
headers: Authorization: Bearer <token>, anthropic-beta: oauth-2025-04-20,
User-Agent: claude-code/<version>, Content-Type: application/json
réponse: { five_hour:{utilization,resets_at}, seven_day:{...},
seven_day_opus, seven_day_sonnet, extra_usage }
⚠️ L'endpoint exige le scope OAuth `user:profile`. Le token `claude setup-token` ne l'a PAS
(403). On utilise donc les credentials d'un **login Claude isolé dédié** (CLAUDE_CONFIG_DIR
séparé) : structure `{ claudeAiOauth: { accessToken, refreshToken, expiresAt(ms) } }`.
Le token expire (~8 h) ; on le rafraîchit via platform.claude.com et on réécrit le fichier
isolé de façon atomique. On ne touche jamais le ~/.claude partagé.
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import subprocess
import tempfile
import time
from dataclasses import dataclass
from datetime import datetime, timezone
import httpx
from config import config
log = logging.getLogger("monitorink.claude")
USAGE_URL = "https://api.anthropic.com/api/oauth/usage"
REFRESH_URL = "https://platform.claude.com/v1/oauth/token"
CLIENT_ID = "9d1c250a-e61b-44d9-88ed-5944d1962f5e" # client public Claude Code
EXPIRY_BUFFER_MS = 120_000 # rafraîchit 2 min avant expiration
_refresh_lock = asyncio.Lock()
# Backoff après échec du refresh OAuth (typiquement 429) : sans ça, le token expiré déclenche un
# refresh À CHAQUE rendu, l'échec n'étant pas mémorisé -> on martèle platform.claude.com et on
# entretient le rate-limit. Pendant la fenêtre on ne retente pas (on sert la dernière valeur connue).
#
# Le backoff est EXPONENTIEL (10min, 20, 40... plafonné à 6h) et non plus fixe : un backoff court
# (5min) couplé à la cadence de rendu (~15min) faisait retenter le refresh ~4×/h en boucle, ce qui
# resaturait en permanence le rate-limit 429 et l'empêchait de se résorber (token figé >15h observé).
# Comme le token vit ~8h, espacer fortement les retentatives ne coûte rien et laisse le 429 retomber.
# On respecte aussi l'en-tête Retry-After de l'API quand il est plus long que notre palier.
REFRESH_BACKOFF_BASE = 600 # 1er palier après un échec : 10 min
REFRESH_BACKOFF_CAP = 6 * 3600 # plafond : 6 h
_refresh_state: dict[str, float] = {"backoff_until": 0.0, "failures": 0.0}
# Le backoff ci-dessus est PERSISTÉ sur disque (/data). En mémoire seule, un redéploiement le
# remettait à zéro et re-sollicitait l'endpoint rate-limité dès le 1er rendu -> 429 entretenu en
# boucle de rebuild. Le persister fait survivre la fenêtre (et le palier exponentiel #failures)
# aux redémarrages. Un token frais court-circuite de toute façon le backoff (cf. _valid_token),
# donc un re-login isolé débloque immédiatement même si une fenêtre persistée court encore.
_state_loaded = False # l'état de backoff n'est lu du disque qu'une fois par process
def _load_state() -> None:
"""Hydrate `_refresh_state` depuis le disque (une fois par process). Fichier absent/illisible
/corrompu -> backoff vide (on retentera : comportement sûr au pire)."""
global _state_loaded
if _state_loaded:
return
_state_loaded = True
try:
with open(config.claude_state_file, encoding="utf-8") as fh:
data = json.load(fh)
_refresh_state["backoff_until"] = float(data["backoff_until"])
_refresh_state["failures"] = float(data["failures"])
except (OSError, ValueError, KeyError, TypeError):
pass
def _save_state() -> None:
"""Persiste `_refresh_state` de façon atomique (tmp + os.replace). Best-effort : une erreur
d'écriture ne doit jamais casser le rendu (au pire on perd le backoff au prochain reboot)."""
path = config.claude_state_file
try:
os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
tmp = f"{path}.tmp"
with open(tmp, "w", encoding="utf-8") as fh:
json.dump({"backoff_until": _refresh_state["backoff_until"],
"failures": _refresh_state["failures"]}, fh)
os.replace(tmp, path)
except OSError:
pass
def _retry_after_seconds(exc: Exception) -> float | None:
"""Secondes de l'en-tête Retry-After d'une réponse HTTP d'erreur, si présent et numérique."""
resp = getattr(exc, "response", None)
if resp is None:
return None
raw = resp.headers.get("retry-after")
if not raw:
return None
try:
return float(raw)
except ValueError:
return None # forme date-HTTP : on ignore, le palier exponentiel suffit
def _note_refresh_failure(exc: Exception) -> None:
"""Enregistre un échec de refresh et arme le backoff exponentiel (borné, ≥ Retry-After)."""
n = int(_refresh_state["failures"]) + 1
_refresh_state["failures"] = n
delay = min(REFRESH_BACKOFF_BASE * (2 ** (n - 1)), REFRESH_BACKOFF_CAP)
retry_after = _retry_after_seconds(exc)
if retry_after is not None:
delay = max(delay, retry_after)
_refresh_state["backoff_until"] = time.time() + delay
_save_state() # la fenêtre doit survivre à un redéploiement (sinon 429 re-sollicité au reboot)
log.warning("refresh OAuth Claude échoué (échec #%d) — backoff %ds : %s", n, int(delay), exc)
def _note_refresh_success() -> None:
"""Refresh réussi : on réarme à zéro (palier + fenêtre de backoff)."""
_refresh_state["failures"] = 0
_refresh_state["backoff_until"] = 0.0
_save_state() # efface le backoff persisté : le prochain reboot repart propre
class _RefreshThrottled(Exception):
"""Refresh en backoff (anti-429), pas une vraie erreur réseau/auth."""
class _RefreshFatal(Exception):
"""Refresh refusé DÉFINITIVEMENT (invalid_grant / invalid_request / 401) : le refresh token
est mort, seul un nouveau login isolé répare. À distinguer absolument d'un 429/réseau (transitoire) :
inutile de backoff/retenter, et surtout il ne faut PAS re-soumettre ce token en boucle."""
# Un login isolé est requis (refresh révoqué). Armé sur erreur fatale, désarmé dès qu'un refresh
# réussit OU qu'on relit un token redevenu frais (= l'utilisateur a relancé le login CLI). Sert à
# court-circuiter les refresh (on sait le token mort) et à afficher une alerte claire à l'écran.
_login_required: dict[str, object] = {"flag": False, "detail": ""}
def _set_login_required(detail: str) -> None:
_login_required["flag"] = True
_login_required["detail"] = detail
def _clear_login_required() -> None:
_login_required["flag"] = False
_login_required["detail"] = ""
@dataclass
class Window:
"""Une fenêtre glissante d'usage (5h ou 7j)."""
utilization: float # 0..100
resets_at: datetime | None
@property
def remaining_pct(self) -> float:
return max(0.0, 100.0 - self.utilization)
@property
def resets_in_human(self) -> str:
if not self.resets_at:
return ""
delta = self.resets_at - datetime.now(timezone.utc)
secs = int(delta.total_seconds())
if secs <= 0:
return "bientôt"
h, m = divmod(secs // 60, 60)
if h >= 24:
return f"{h // 24}j {h % 24}h"
if h:
return f"{h}h{m:02d}"
return f"{m}min"
@dataclass
class ExtraUsage:
"""Crédits « extra usage » (pay-as-you-go au-delà de l'abonnement)."""
used: float
limit: float | None
currency: str = "EUR"
def _fmt(self, cents: float) -> str:
"""Montant en euros : les valeurs de l'API sont en centimes (136 -> 1,36€)."""
sym = "" if self.currency == "EUR" else f" {self.currency}"
v = cents / 100
s = f"{int(v)}" if v == int(v) else f"{v:.2f}".replace(".", ",")
return f"{s}{sym}"
@property
def label(self) -> str:
# L'API n'expose que l'extra dépensé sur le mois et le plafond mensuel
# (pas le solde de crédits prépayés). On affiche donc utilisé / plafond.
if self.limit:
return f"Extra : {self._fmt(self.used)} utilisé / {self._fmt(self.limit)} ce mois"
return f"Extra : {self._fmt(self.used)} utilisé"
@dataclass
class ClaudeUsage:
ok: bool
error: str | None = None
# True quand l'erreur exige une action humaine (relancer le login isolé). Empêche le repli
# silencieux sur le cache : on veut que le message remonte à l'écran.
fatal: bool = False
five_hour: Window | None = None
seven_day: Window | None = None
seven_day_opus: Window | None = None
seven_day_sonnet: Window | None = None
extra: ExtraUsage | None = None
burn_rate: float | None = None # tokens/min (ccusage, optionnel)
def _parse_window(raw: dict | None) -> Window | None:
if not raw:
return None
util = float(raw.get("utilization", 0) or 0)
resets = raw.get("resets_at")
dt = None
if resets:
try:
dt = datetime.fromisoformat(str(resets).replace("Z", "+00:00"))
except ValueError:
dt = None
return Window(utilization=util, resets_at=dt)
def _load_creds() -> dict:
with open(config.claude_creds_path) as f:
return json.load(f)
def _write_creds(data: dict) -> None:
"""Écriture atomique du fichier de credentials isolé (mode 0600)."""
path = config.claude_creds_path
fd, tmp = tempfile.mkstemp(dir=os.path.dirname(path) or ".")
try:
with os.fdopen(fd, "w") as f:
json.dump(data, f)
os.chmod(tmp, 0o600)
os.replace(tmp, path)
except BaseException:
if os.path.exists(tmp):
os.unlink(tmp)
raise
# Codes d'erreur OAuth qui signifient « ce refresh token est mort » (≠ rate-limit/réseau).
_FATAL_REFRESH_ERRORS = {"invalid_grant", "invalid_request", "invalid_client"}
async def _refresh(data: dict) -> str:
"""Rafraîchit l'access token, réécrit le fichier isolé, renvoie le nouvel access token.
Le refresh token est ROTATIF : un refresh réussi le remplace et invalide l'ancien. On NE
retente JAMAIS en interne avec le même token (pas de boucle for / tenacity ici) : re-soumettre
un token déjà consommé peut déclencher la reuse-detection côté Anthropic et révoquer toute la
famille -> login forcé. L'espacement des tentatives vient du backoff de l'appelant."""
o = data["claudeAiOauth"]
body = {
"grant_type": "refresh_token",
"refresh_token": o["refreshToken"],
"client_id": CLIENT_ID,
}
async with httpx.AsyncClient(timeout=config.claude_refresh_timeout) as client:
resp = await client.post(
REFRESH_URL, json=body,
headers={"Content-Type": "application/json", "User-Agent": config.claude_ua},
)
# 400 avec un code d'erreur OAuth fatal (ou 401) -> le refresh token est mort : seul un nouveau
# login isolé répare. On arme l'alerte et on lève _RefreshFatal SANS backoff (le 429, lui, est
# transitoire et part dans raise_for_status ci-dessous).
if resp.status_code in (400, 401):
err = ""
try:
err = str((resp.json() or {}).get("error", ""))
except ValueError:
pass
if err in _FATAL_REFRESH_ERRORS or resp.status_code == 401:
_set_login_required(err or f"HTTP {resp.status_code}")
log.error("LOGIN CLAUDE REQUIS — relancer le login isolé "
"(CLAUDE_CONFIG_DIR=… claude auth login) ; refresh rejeté: %s",
err or resp.status_code)
raise _RefreshFatal(err or f"HTTP {resp.status_code}")
resp.raise_for_status() # 429/5xx/autre -> httpx.HTTPError, traité en transitoire (backoff)
tok = resp.json()
o["accessToken"] = tok["access_token"]
o["refreshToken"] = tok["refresh_token"]
o["expiresAt"] = int(time.time() * 1000) + int(tok["expires_in"]) * 1000
try:
_write_creds(data)
except OSError:
# Token frais en mémoire mais non persisté : au prochain démarrage le disque garde l'ANCIEN
# token (déjà consommé) -> rejoue la cause n°1 des reconnexions. On le signale fort.
log.error("nouveau refresh token NON persisté (échec écriture %s)", config.claude_creds_path)
raise
_note_refresh_success()
_clear_login_required() # un refresh réussi prouve que l'auth est de nouveau bonne
log.info("refresh OAuth Claude réussi — token valide jusqu'à %s",
datetime.fromtimestamp(o["expiresAt"] / 1000, timezone.utc).isoformat())
return o["accessToken"]
async def _valid_token() -> str:
"""Renvoie un access token valide, en rafraîchissant PROACTIVEMENT bien avant l'expiration.
- Refresh dès qu'il reste moins de `claude_refresh_lead_ms` (≈2 h) sur le token (~8 h de vie) :
un échec transitoire (429) a alors des heures de marge pour réussir avant que l'access token
meure, et le refresh token rotatif tourne tôt (loin du chemin critique).
- Échec transitoire -> backoff + _RefreshThrottled : l'appelant sert la dernière valeur connue.
- Refresh token mort (invalid_grant/401) -> _RefreshFatal, sans backoff ni re-soumission en
boucle. L'alerte ne retombe qu'au prochain login isolé (token redevenu frais sur disque)."""
# Plancher défensif : même mal configuré (LEAD_MIN=0), on rafraîchit au moins juste avant l'expiry.
lead = max(config.claude_refresh_lead_ms, EXPIRY_BUFFER_MS)
data = _load_creds()
o = data["claudeAiOauth"]
if int(o.get("expiresAt", 0)) - int(time.time() * 1000) > lead:
_clear_login_required() # token frais : si une alerte était armée, un re-login l'a réparée
return o["accessToken"]
if _login_required["flag"]:
raise _RefreshFatal(str(_login_required["detail"])) # token mort connu : pas d'appel réseau
if time.time() < _refresh_state["backoff_until"]:
raise _RefreshThrottled()
async with _refresh_lock:
# Re-lecture après acquisition du lock : un autre coroutine a peut-être déjà rafraîchi, ou
# l'utilisateur a relancé le login isolé entre-temps (token redevenu frais -> on désarme).
data = _load_creds()
o = data["claudeAiOauth"]
if int(o.get("expiresAt", 0)) - int(time.time() * 1000) > lead:
_clear_login_required()
return o["accessToken"]
if _login_required["flag"]:
raise _RefreshFatal(str(_login_required["detail"]))
if time.time() < _refresh_state["backoff_until"]:
raise _RefreshThrottled()
try:
return await _refresh(data)
except httpx.HTTPError as exc:
# Transitoire (429/5xx/réseau/timeout). _RefreshFatal, lui, n'est PAS un httpx.HTTPError :
# il remonte tel quel, sans backoff.
_note_refresh_failure(exc)
raise
async def _get_usage(token: str) -> httpx.Response:
headers = {
"Authorization": f"Bearer {token}",
"anthropic-beta": "oauth-2025-04-20",
"User-Agent": config.claude_ua,
"Content-Type": "application/json",
}
async with httpx.AsyncClient(timeout=15) as client:
return await client.get(USAGE_URL, headers=headers)
def _burn_rate_from_ccusage() -> float | None:
"""Burn rate du bloc 5h actif via `ccusage blocks --json` (best-effort)."""
try:
proc = subprocess.run(
["ccusage", "blocks", "--active", "--json"],
capture_output=True, text=True, timeout=20,
)
if proc.returncode != 0:
return None
data = json.loads(proc.stdout)
for b in data.get("blocks") or []:
if b.get("isActive"):
bp = b.get("burnRate") or {}
rate = bp.get("tokensPerMinute") or bp.get("tokensPerMinuteForIndicator")
return float(rate) if rate is not None else None
except (subprocess.SubprocessError, json.JSONDecodeError, ValueError, FileNotFoundError):
return None
return None
# Dernier usage récupéré avec succès : sert de cache (throttle) ET de repli en cas d'erreur
# transitoire (429, réseau) pour ne pas afficher "HTTP 429" sur l'e-ink. Les libellés dynamiques
# (resets_in_human) restent corrects car recalculés à la volée depuis resets_at.
_usage_cache: dict[str, object] = {"value": None, "ts": 0.0}
async def fetch_usage() -> ClaudeUsage:
_load_state() # hydrate le backoff persisté avant toute logique de refresh (1×/process)
now = time.time()
cached = _usage_cache["value"]
if isinstance(cached, ClaudeUsage) and cached.ok and (now - float(_usage_cache["ts"])) < config.usage_ttl_seconds:
return cached
result = await _fetch_usage()
if result.ok:
_usage_cache["value"] = result
_usage_cache["ts"] = now
return result
# Erreur TRANSITOIRE (429, réseau, auth temporaire) : on réaffiche la dernière valeur correcte
# connue plutôt qu'un message d'erreur, le temps que ça se rétablisse. En revanche une erreur
# FATALE (login isolé à relancer) doit passer en clair à l'écran : pas de repli cache.
if not result.fatal and isinstance(cached, ClaudeUsage) and cached.ok:
return cached
return result
async def _fetch_usage() -> ClaudeUsage:
if not os.path.exists(config.claude_creds_path):
return ClaudeUsage(ok=False, error="credentials Claude absents — login isolé requis")
try:
token = await _valid_token()
except _RefreshThrottled:
return ClaudeUsage(ok=False, error="refresh en pause (anti-429)")
except _RefreshFatal:
return ClaudeUsage(ok=False, fatal=True, error="Reconnexion Claude requise")
except (OSError, KeyError, json.JSONDecodeError) as exc:
return ClaudeUsage(ok=False, error=f"credentials illisibles: {exc}")
except httpx.HTTPError as exc:
return ClaudeUsage(ok=False, error=f"refresh échoué: {exc}")
try:
resp = await _get_usage(token)
# Token rejeté malgré la marge -> une tentative de refresh forcé (soumise au backoff).
if resp.status_code == 401:
try:
data = _load_creds()
async with _refresh_lock:
token = await _refresh(data)
except _RefreshFatal:
return ClaudeUsage(ok=False, fatal=True, error="Reconnexion Claude requise")
except httpx.HTTPError as exc:
_note_refresh_failure(exc)
raise
resp = await _get_usage(token)
except httpx.HTTPError as exc:
return ClaudeUsage(ok=False, error=f"réseau: {exc}")
if resp.status_code == 401:
return ClaudeUsage(ok=False, error="auth invalide — relancer le login isolé")
if resp.status_code == 403:
return ClaudeUsage(ok=False, error="scope insuffisant (login complet requis)")
if resp.status_code != 200:
return ClaudeUsage(ok=False, error=f"HTTP {resp.status_code}")
data = resp.json()
burn = _burn_rate_from_ccusage() if config.ccusage_enabled else None
extra = None
eu = data.get("extra_usage") or {}
if eu.get("is_enabled") and eu.get("used_credits"):
extra = ExtraUsage(
used=float(eu.get("used_credits", 0)),
limit=float(eu["monthly_limit"]) if eu.get("monthly_limit") else None,
currency=eu.get("currency", "EUR"),
)
return ClaudeUsage(
ok=True,
five_hour=_parse_window(data.get("five_hour")),
seven_day=_parse_window(data.get("seven_day")),
seven_day_opus=_parse_window(data.get("seven_day_opus")),
seven_day_sonnet=_parse_window(data.get("seven_day_sonnet")),
extra=extra,
burn_rate=burn,
)