Files
Monitorink/backend/integrations/claude_usage.py
jerem 0f6286c154 claude_usage: backoff après échec de refresh OAuth (anti-429)
Token expiré -> refresh tenté à chaque rendu, échec non mémorisé -> on martelait
platform.claude.com et le 429 s'entretenait. On impose désormais un backoff de 5 min
après un refresh échoué (et sur la voie de refresh forcé 401), pendant lequel on sert
la dernière valeur connue au lieu de re-tenter.
2026-06-15 19:50:52 +02:00

306 lines
11 KiB
Python

"""Récupération de l'usage de l'abonnement Claude via l'endpoint OAuth `/usage`.
Validé empiriquement (compte Max 5x) :
GET https://api.anthropic.com/api/oauth/usage
headers: Authorization: Bearer <token>, anthropic-beta: oauth-2025-04-20,
User-Agent: claude-code/<version>, Content-Type: application/json
réponse: { five_hour:{utilization,resets_at}, seven_day:{...},
seven_day_opus, seven_day_sonnet, extra_usage }
⚠️ L'endpoint exige le scope OAuth `user:profile`. Le token `claude setup-token` ne l'a PAS
(403). On utilise donc les credentials d'un **login Claude isolé dédié** (CLAUDE_CONFIG_DIR
séparé) : structure `{ claudeAiOauth: { accessToken, refreshToken, expiresAt(ms) } }`.
Le token expire (~8 h) ; on le rafraîchit via platform.claude.com et on réécrit le fichier
isolé de façon atomique. On ne touche jamais le ~/.claude partagé.
"""
from __future__ import annotations
import asyncio
import json
import os
import subprocess
import tempfile
import time
from dataclasses import dataclass
from datetime import datetime, timezone
import httpx
from config import config
USAGE_URL = "https://api.anthropic.com/api/oauth/usage"
REFRESH_URL = "https://platform.claude.com/v1/oauth/token"
CLIENT_ID = "9d1c250a-e61b-44d9-88ed-5944d1962f5e" # client public Claude Code
EXPIRY_BUFFER_MS = 120_000 # rafraîchit 2 min avant expiration
_refresh_lock = asyncio.Lock()
# Backoff après échec du refresh OAuth (typiquement 429) : sans ça, le token expiré déclenche un
# refresh À CHAQUE rendu, l'échec n'étant pas mémorisé -> on martèle platform.claude.com et on
# entretient le rate-limit. Pendant la fenêtre on ne retente pas (on sert la dernière valeur connue).
REFRESH_BACKOFF_SECONDS = 300
_refresh_state: dict[str, float] = {"backoff_until": 0.0}
class _RefreshThrottled(Exception):
"""Refresh en backoff (anti-429), pas une vraie erreur réseau/auth."""
@dataclass
class Window:
"""Une fenêtre glissante d'usage (5h ou 7j)."""
utilization: float # 0..100
resets_at: datetime | None
@property
def remaining_pct(self) -> float:
return max(0.0, 100.0 - self.utilization)
@property
def resets_in_human(self) -> str:
if not self.resets_at:
return ""
delta = self.resets_at - datetime.now(timezone.utc)
secs = int(delta.total_seconds())
if secs <= 0:
return "bientôt"
h, m = divmod(secs // 60, 60)
if h >= 24:
return f"{h // 24}j {h % 24}h"
if h:
return f"{h}h{m:02d}"
return f"{m}min"
@dataclass
class ExtraUsage:
"""Crédits « extra usage » (pay-as-you-go au-delà de l'abonnement)."""
used: float
limit: float | None
currency: str = "EUR"
def _fmt(self, cents: float) -> str:
"""Montant en euros : les valeurs de l'API sont en centimes (136 -> 1,36€)."""
sym = "" if self.currency == "EUR" else f" {self.currency}"
v = cents / 100
s = f"{int(v)}" if v == int(v) else f"{v:.2f}".replace(".", ",")
return f"{s}{sym}"
@property
def label(self) -> str:
# L'API n'expose que l'extra dépensé sur le mois et le plafond mensuel
# (pas le solde de crédits prépayés). On affiche donc utilisé / plafond.
if self.limit:
return f"Extra : {self._fmt(self.used)} utilisé / {self._fmt(self.limit)} ce mois"
return f"Extra : {self._fmt(self.used)} utilisé"
@dataclass
class ClaudeUsage:
ok: bool
error: str | None = None
five_hour: Window | None = None
seven_day: Window | None = None
seven_day_opus: Window | None = None
seven_day_sonnet: Window | None = None
extra: ExtraUsage | None = None
burn_rate: float | None = None # tokens/min (ccusage, optionnel)
def _parse_window(raw: dict | None) -> Window | None:
if not raw:
return None
util = float(raw.get("utilization", 0) or 0)
resets = raw.get("resets_at")
dt = None
if resets:
try:
dt = datetime.fromisoformat(str(resets).replace("Z", "+00:00"))
except ValueError:
dt = None
return Window(utilization=util, resets_at=dt)
def _load_creds() -> dict:
with open(config.claude_creds_path) as f:
return json.load(f)
def _write_creds(data: dict) -> None:
"""Écriture atomique du fichier de credentials isolé (mode 0600)."""
path = config.claude_creds_path
fd, tmp = tempfile.mkstemp(dir=os.path.dirname(path) or ".")
try:
with os.fdopen(fd, "w") as f:
json.dump(data, f)
os.chmod(tmp, 0o600)
os.replace(tmp, path)
except BaseException:
if os.path.exists(tmp):
os.unlink(tmp)
raise
async def _refresh(data: dict) -> str:
"""Rafraîchit l'access token, réécrit le fichier isolé, renvoie le nouvel access token."""
o = data["claudeAiOauth"]
body = {
"grant_type": "refresh_token",
"refresh_token": o["refreshToken"],
"client_id": CLIENT_ID,
}
async with httpx.AsyncClient(timeout=20) as client:
resp = await client.post(
REFRESH_URL, json=body,
headers={"Content-Type": "application/json", "User-Agent": config.claude_ua},
)
resp.raise_for_status()
tok = resp.json()
o["accessToken"] = tok["access_token"]
o["refreshToken"] = tok["refresh_token"]
o["expiresAt"] = int(time.time() * 1000) + int(tok["expires_in"]) * 1000
_write_creds(data)
return o["accessToken"]
async def _valid_token() -> str:
"""Renvoie un access token valide, en rafraîchissant si nécessaire (sérialisé).
Après un échec de refresh (ex. 429), on impose un backoff et on lève _RefreshThrottled
pendant la pause : l'appelant sert alors la dernière valeur connue au lieu de re-marteler."""
data = _load_creds()
o = data["claudeAiOauth"]
if int(o.get("expiresAt", 0)) - int(time.time() * 1000) > EXPIRY_BUFFER_MS:
return o["accessToken"]
if time.time() < _refresh_state["backoff_until"]:
raise _RefreshThrottled()
async with _refresh_lock:
# Re-lecture après acquisition du lock : un autre coroutine a peut-être déjà rafraîchi.
data = _load_creds()
o = data["claudeAiOauth"]
if int(o.get("expiresAt", 0)) - int(time.time() * 1000) > EXPIRY_BUFFER_MS:
return o["accessToken"]
if time.time() < _refresh_state["backoff_until"]:
raise _RefreshThrottled()
try:
return await _refresh(data)
except httpx.HTTPError:
_refresh_state["backoff_until"] = time.time() + REFRESH_BACKOFF_SECONDS
raise
async def _get_usage(token: str) -> httpx.Response:
headers = {
"Authorization": f"Bearer {token}",
"anthropic-beta": "oauth-2025-04-20",
"User-Agent": config.claude_ua,
"Content-Type": "application/json",
}
async with httpx.AsyncClient(timeout=15) as client:
return await client.get(USAGE_URL, headers=headers)
def _burn_rate_from_ccusage() -> float | None:
"""Burn rate du bloc 5h actif via `ccusage blocks --json` (best-effort)."""
try:
proc = subprocess.run(
["ccusage", "blocks", "--active", "--json"],
capture_output=True, text=True, timeout=20,
)
if proc.returncode != 0:
return None
data = json.loads(proc.stdout)
for b in data.get("blocks") or []:
if b.get("isActive"):
bp = b.get("burnRate") or {}
rate = bp.get("tokensPerMinute") or bp.get("tokensPerMinuteForIndicator")
return float(rate) if rate is not None else None
except (subprocess.SubprocessError, json.JSONDecodeError, ValueError, FileNotFoundError):
return None
return None
# Dernier usage récupéré avec succès : sert de cache (throttle) ET de repli en cas d'erreur
# transitoire (429, réseau) pour ne pas afficher "HTTP 429" sur l'e-ink. Les libellés dynamiques
# (resets_in_human) restent corrects car recalculés à la volée depuis resets_at.
_usage_cache: dict[str, object] = {"value": None, "ts": 0.0}
async def fetch_usage() -> ClaudeUsage:
now = time.time()
cached = _usage_cache["value"]
if isinstance(cached, ClaudeUsage) and cached.ok and (now - float(_usage_cache["ts"])) < config.usage_ttl_seconds:
return cached
result = await _fetch_usage()
if result.ok:
_usage_cache["value"] = result
_usage_cache["ts"] = now
return result
# Erreur (429, réseau, auth transitoire) : on réaffiche la dernière valeur correcte connue
# plutôt qu'un message d'erreur, le temps que ça se rétablisse.
if isinstance(cached, ClaudeUsage) and cached.ok:
return cached
return result
async def _fetch_usage() -> ClaudeUsage:
if not os.path.exists(config.claude_creds_path):
return ClaudeUsage(ok=False, error="credentials Claude absents — login isolé requis")
try:
token = await _valid_token()
except _RefreshThrottled:
return ClaudeUsage(ok=False, error="refresh en pause (anti-429)")
except (OSError, KeyError, json.JSONDecodeError) as exc:
return ClaudeUsage(ok=False, error=f"credentials illisibles: {exc}")
except httpx.HTTPError as exc:
return ClaudeUsage(ok=False, error=f"refresh échoué: {exc}")
try:
resp = await _get_usage(token)
# Token rejeté malgré le buffer -> une tentative de refresh forcé (soumise au backoff).
if resp.status_code == 401:
try:
data = _load_creds()
async with _refresh_lock:
token = await _refresh(data)
except httpx.HTTPError:
_refresh_state["backoff_until"] = time.time() + REFRESH_BACKOFF_SECONDS
raise
resp = await _get_usage(token)
except httpx.HTTPError as exc:
return ClaudeUsage(ok=False, error=f"réseau: {exc}")
if resp.status_code == 401:
return ClaudeUsage(ok=False, error="auth invalide — relancer le login isolé")
if resp.status_code == 403:
return ClaudeUsage(ok=False, error="scope insuffisant (login complet requis)")
if resp.status_code != 200:
return ClaudeUsage(ok=False, error=f"HTTP {resp.status_code}")
data = resp.json()
burn = _burn_rate_from_ccusage() if config.ccusage_enabled else None
extra = None
eu = data.get("extra_usage") or {}
if eu.get("is_enabled") and eu.get("used_credits"):
extra = ExtraUsage(
used=float(eu.get("used_credits", 0)),
limit=float(eu["monthly_limit"]) if eu.get("monthly_limit") else None,
currency=eu.get("currency", "EUR"),
)
return ClaudeUsage(
ok=True,
five_hour=_parse_window(data.get("five_hour")),
seven_day=_parse_window(data.get("seven_day")),
seven_day_opus=_parse_window(data.get("seven_day_opus")),
seven_day_sonnet=_parse_window(data.get("seven_day_sonnet")),
extra=extra,
burn_rate=burn,
)