"""Récupération de l'usage de l'abonnement Claude via l'endpoint OAuth `/usage`. Validé empiriquement (compte Max 5x) : GET https://api.anthropic.com/api/oauth/usage headers: Authorization: Bearer , anthropic-beta: oauth-2025-04-20, User-Agent: claude-code/, Content-Type: application/json réponse: { five_hour:{utilization,resets_at}, seven_day:{...}, seven_day_opus, seven_day_sonnet, extra_usage } ⚠️ L'endpoint exige le scope OAuth `user:profile`. Le token `claude setup-token` ne l'a PAS (403). On utilise donc les credentials d'un **login Claude isolé dédié** (CLAUDE_CONFIG_DIR séparé) : structure `{ claudeAiOauth: { accessToken, refreshToken, expiresAt(ms) } }`. Le token expire (~8 h) ; on le rafraîchit via platform.claude.com et on réécrit le fichier isolé de façon atomique. On ne touche jamais le ~/.claude partagé. """ from __future__ import annotations import asyncio import json import logging import os import subprocess import tempfile import time from dataclasses import dataclass from datetime import datetime, timezone import httpx from config import config log = logging.getLogger("monitorink.claude") USAGE_URL = "https://api.anthropic.com/api/oauth/usage" REFRESH_URL = "https://platform.claude.com/v1/oauth/token" CLIENT_ID = "9d1c250a-e61b-44d9-88ed-5944d1962f5e" # client public Claude Code EXPIRY_BUFFER_MS = 120_000 # rafraîchit 2 min avant expiration _refresh_lock = asyncio.Lock() # Backoff après échec du refresh OAuth (typiquement 429) : sans ça, le token expiré déclenche un # refresh À CHAQUE rendu, l'échec n'étant pas mémorisé -> on martèle platform.claude.com et on # entretient le rate-limit. Pendant la fenêtre on ne retente pas (on sert la dernière valeur connue). # # Le backoff est EXPONENTIEL (10min, 20, 40... plafonné à 6h) et non plus fixe : un backoff court # (5min) couplé à la cadence de rendu (~15min) faisait retenter le refresh ~4×/h en boucle, ce qui # resaturait en permanence le rate-limit 429 et l'empêchait de se résorber (token figé >15h observé). # Comme le token vit ~8h, espacer fortement les retentatives ne coûte rien et laisse le 429 retomber. # On respecte aussi l'en-tête Retry-After de l'API quand il est plus long que notre palier. REFRESH_BACKOFF_BASE = 600 # 1er palier après un échec : 10 min REFRESH_BACKOFF_CAP = 6 * 3600 # plafond : 6 h _refresh_state: dict[str, float] = {"backoff_until": 0.0, "failures": 0.0} # Le backoff ci-dessus est PERSISTÉ sur disque (/data). En mémoire seule, un redéploiement le # remettait à zéro et re-sollicitait l'endpoint rate-limité dès le 1er rendu -> 429 entretenu en # boucle de rebuild. Le persister fait survivre la fenêtre (et le palier exponentiel #failures) # aux redémarrages. Un token frais court-circuite de toute façon le backoff (cf. _valid_token), # donc un re-login isolé débloque immédiatement même si une fenêtre persistée court encore. _state_loaded = False # l'état de backoff n'est lu du disque qu'une fois par process def _load_state() -> None: """Hydrate `_refresh_state` depuis le disque (une fois par process). Fichier absent/illisible /corrompu -> backoff vide (on retentera : comportement sûr au pire).""" global _state_loaded if _state_loaded: return _state_loaded = True try: with open(config.claude_state_file, encoding="utf-8") as fh: data = json.load(fh) _refresh_state["backoff_until"] = float(data["backoff_until"]) _refresh_state["failures"] = float(data["failures"]) except (OSError, ValueError, KeyError, TypeError): pass def _save_state() -> None: """Persiste `_refresh_state` de façon atomique (tmp + os.replace). Best-effort : une erreur d'écriture ne doit jamais casser le rendu (au pire on perd le backoff au prochain reboot).""" path = config.claude_state_file try: os.makedirs(os.path.dirname(path) or ".", exist_ok=True) tmp = f"{path}.tmp" with open(tmp, "w", encoding="utf-8") as fh: json.dump({"backoff_until": _refresh_state["backoff_until"], "failures": _refresh_state["failures"]}, fh) os.replace(tmp, path) except OSError: pass def _retry_after_seconds(exc: Exception) -> float | None: """Secondes de l'en-tête Retry-After d'une réponse HTTP d'erreur, si présent et numérique.""" resp = getattr(exc, "response", None) if resp is None: return None raw = resp.headers.get("retry-after") if not raw: return None try: return float(raw) except ValueError: return None # forme date-HTTP : on ignore, le palier exponentiel suffit def _note_refresh_failure(exc: Exception) -> None: """Enregistre un échec de refresh et arme le backoff exponentiel (borné, ≥ Retry-After).""" n = int(_refresh_state["failures"]) + 1 _refresh_state["failures"] = n delay = min(REFRESH_BACKOFF_BASE * (2 ** (n - 1)), REFRESH_BACKOFF_CAP) retry_after = _retry_after_seconds(exc) if retry_after is not None: delay = max(delay, retry_after) _refresh_state["backoff_until"] = time.time() + delay _save_state() # la fenêtre doit survivre à un redéploiement (sinon 429 re-sollicité au reboot) log.warning("refresh OAuth Claude échoué (échec #%d) — backoff %ds : %s", n, int(delay), exc) def _note_refresh_success() -> None: """Refresh réussi : on réarme à zéro (palier + fenêtre de backoff).""" _refresh_state["failures"] = 0 _refresh_state["backoff_until"] = 0.0 _save_state() # efface le backoff persisté : le prochain reboot repart propre class _RefreshThrottled(Exception): """Refresh en backoff (anti-429), pas une vraie erreur réseau/auth.""" class _RefreshFatal(Exception): """Refresh refusé DÉFINITIVEMENT (invalid_grant / invalid_request / 401) : le refresh token est mort, seul un nouveau login isolé répare. À distinguer absolument d'un 429/réseau (transitoire) : inutile de backoff/retenter, et surtout il ne faut PAS re-soumettre ce token en boucle.""" # Un login isolé est requis (refresh révoqué). Armé sur erreur fatale, désarmé dès qu'un refresh # réussit OU qu'on relit un token redevenu frais (= l'utilisateur a relancé le login CLI). Sert à # court-circuiter les refresh (on sait le token mort) et à afficher une alerte claire à l'écran. _login_required: dict[str, object] = {"flag": False, "detail": ""} def _set_login_required(detail: str) -> None: _login_required["flag"] = True _login_required["detail"] = detail def _clear_login_required() -> None: _login_required["flag"] = False _login_required["detail"] = "" @dataclass class Window: """Une fenêtre glissante d'usage (5h ou 7j).""" utilization: float # 0..100 resets_at: datetime | None @property def remaining_pct(self) -> float: return max(0.0, 100.0 - self.utilization) @property def resets_in_human(self) -> str: if not self.resets_at: return "" delta = self.resets_at - datetime.now(timezone.utc) secs = int(delta.total_seconds()) if secs <= 0: return "bientôt" h, m = divmod(secs // 60, 60) if h >= 24: return f"{h // 24}j {h % 24}h" if h: return f"{h}h{m:02d}" return f"{m}min" @dataclass class ExtraUsage: """Crédits « extra usage » (pay-as-you-go au-delà de l'abonnement).""" used: float limit: float | None currency: str = "EUR" def _fmt(self, cents: float) -> str: """Montant en euros : les valeurs de l'API sont en centimes (136 -> 1,36€).""" sym = "€" if self.currency == "EUR" else f" {self.currency}" v = cents / 100 s = f"{int(v)}" if v == int(v) else f"{v:.2f}".replace(".", ",") return f"{s}{sym}" @property def label(self) -> str: # L'API n'expose que l'extra dépensé sur le mois et le plafond mensuel # (pas le solde de crédits prépayés). On affiche donc utilisé / plafond. if self.limit: return f"Extra : {self._fmt(self.used)} utilisé / {self._fmt(self.limit)} ce mois" return f"Extra : {self._fmt(self.used)} utilisé" @dataclass class ClaudeUsage: ok: bool error: str | None = None # True quand l'erreur exige une action humaine (relancer le login isolé). Empêche le repli # silencieux sur le cache : on veut que le message remonte à l'écran. fatal: bool = False five_hour: Window | None = None seven_day: Window | None = None seven_day_opus: Window | None = None seven_day_sonnet: Window | None = None extra: ExtraUsage | None = None burn_rate: float | None = None # tokens/min (ccusage, optionnel) def _parse_window(raw: dict | None) -> Window | None: if not raw: return None util = float(raw.get("utilization", 0) or 0) resets = raw.get("resets_at") dt = None if resets: try: dt = datetime.fromisoformat(str(resets).replace("Z", "+00:00")) except ValueError: dt = None return Window(utilization=util, resets_at=dt) def _load_creds() -> dict: with open(config.claude_creds_path) as f: return json.load(f) def _write_creds(data: dict) -> None: """Écriture atomique du fichier de credentials isolé (mode 0600).""" path = config.claude_creds_path fd, tmp = tempfile.mkstemp(dir=os.path.dirname(path) or ".") try: with os.fdopen(fd, "w") as f: json.dump(data, f) os.chmod(tmp, 0o600) os.replace(tmp, path) except BaseException: if os.path.exists(tmp): os.unlink(tmp) raise # Codes d'erreur OAuth qui signifient « ce refresh token est mort » (≠ rate-limit/réseau). _FATAL_REFRESH_ERRORS = {"invalid_grant", "invalid_request", "invalid_client"} async def _refresh(data: dict) -> str: """Rafraîchit l'access token, réécrit le fichier isolé, renvoie le nouvel access token. Le refresh token est ROTATIF : un refresh réussi le remplace et invalide l'ancien. On NE retente JAMAIS en interne avec le même token (pas de boucle for / tenacity ici) : re-soumettre un token déjà consommé peut déclencher la reuse-detection côté Anthropic et révoquer toute la famille -> login forcé. L'espacement des tentatives vient du backoff de l'appelant.""" o = data["claudeAiOauth"] body = { "grant_type": "refresh_token", "refresh_token": o["refreshToken"], "client_id": CLIENT_ID, } async with httpx.AsyncClient(timeout=config.claude_refresh_timeout) as client: resp = await client.post( REFRESH_URL, json=body, headers={"Content-Type": "application/json", "User-Agent": config.claude_ua}, ) # 400 avec un code d'erreur OAuth fatal (ou 401) -> le refresh token est mort : seul un nouveau # login isolé répare. On arme l'alerte et on lève _RefreshFatal SANS backoff (le 429, lui, est # transitoire et part dans raise_for_status ci-dessous). if resp.status_code in (400, 401): err = "" try: err = str((resp.json() or {}).get("error", "")) except ValueError: pass if err in _FATAL_REFRESH_ERRORS or resp.status_code == 401: _set_login_required(err or f"HTTP {resp.status_code}") log.error("LOGIN CLAUDE REQUIS — relancer le login isolé " "(CLAUDE_CONFIG_DIR=… claude auth login) ; refresh rejeté: %s", err or resp.status_code) raise _RefreshFatal(err or f"HTTP {resp.status_code}") resp.raise_for_status() # 429/5xx/autre -> httpx.HTTPError, traité en transitoire (backoff) tok = resp.json() o["accessToken"] = tok["access_token"] o["refreshToken"] = tok["refresh_token"] o["expiresAt"] = int(time.time() * 1000) + int(tok["expires_in"]) * 1000 try: _write_creds(data) except OSError: # Token frais en mémoire mais non persisté : au prochain démarrage le disque garde l'ANCIEN # token (déjà consommé) -> rejoue la cause n°1 des reconnexions. On le signale fort. log.error("nouveau refresh token NON persisté (échec écriture %s)", config.claude_creds_path) raise _note_refresh_success() _clear_login_required() # un refresh réussi prouve que l'auth est de nouveau bonne log.info("refresh OAuth Claude réussi — token valide jusqu'à %s", datetime.fromtimestamp(o["expiresAt"] / 1000, timezone.utc).isoformat()) return o["accessToken"] async def _valid_token() -> str: """Renvoie un access token valide, en rafraîchissant PROACTIVEMENT bien avant l'expiration. - Refresh dès qu'il reste moins de `claude_refresh_lead_ms` (≈2 h) sur le token (~8 h de vie) : un échec transitoire (429) a alors des heures de marge pour réussir avant que l'access token meure, et le refresh token rotatif tourne tôt (loin du chemin critique). - Échec transitoire -> backoff + _RefreshThrottled : l'appelant sert la dernière valeur connue. - Refresh token mort (invalid_grant/401) -> _RefreshFatal, sans backoff ni re-soumission en boucle. L'alerte ne retombe qu'au prochain login isolé (token redevenu frais sur disque).""" # Plancher défensif : même mal configuré (LEAD_MIN=0), on rafraîchit au moins juste avant l'expiry. lead = max(config.claude_refresh_lead_ms, EXPIRY_BUFFER_MS) data = _load_creds() o = data["claudeAiOauth"] if int(o.get("expiresAt", 0)) - int(time.time() * 1000) > lead: _clear_login_required() # token frais : si une alerte était armée, un re-login l'a réparée return o["accessToken"] if _login_required["flag"]: raise _RefreshFatal(str(_login_required["detail"])) # token mort connu : pas d'appel réseau if time.time() < _refresh_state["backoff_until"]: raise _RefreshThrottled() async with _refresh_lock: # Re-lecture après acquisition du lock : un autre coroutine a peut-être déjà rafraîchi, ou # l'utilisateur a relancé le login isolé entre-temps (token redevenu frais -> on désarme). data = _load_creds() o = data["claudeAiOauth"] if int(o.get("expiresAt", 0)) - int(time.time() * 1000) > lead: _clear_login_required() return o["accessToken"] if _login_required["flag"]: raise _RefreshFatal(str(_login_required["detail"])) if time.time() < _refresh_state["backoff_until"]: raise _RefreshThrottled() try: return await _refresh(data) except httpx.HTTPError as exc: # Transitoire (429/5xx/réseau/timeout). _RefreshFatal, lui, n'est PAS un httpx.HTTPError : # il remonte tel quel, sans backoff. _note_refresh_failure(exc) raise async def _get_usage(token: str) -> httpx.Response: headers = { "Authorization": f"Bearer {token}", "anthropic-beta": "oauth-2025-04-20", "User-Agent": config.claude_ua, "Content-Type": "application/json", } async with httpx.AsyncClient(timeout=15) as client: return await client.get(USAGE_URL, headers=headers) def _burn_rate_from_ccusage() -> float | None: """Burn rate du bloc 5h actif via `ccusage blocks --json` (best-effort).""" try: proc = subprocess.run( ["ccusage", "blocks", "--active", "--json"], capture_output=True, text=True, timeout=20, ) if proc.returncode != 0: return None data = json.loads(proc.stdout) for b in data.get("blocks") or []: if b.get("isActive"): bp = b.get("burnRate") or {} rate = bp.get("tokensPerMinute") or bp.get("tokensPerMinuteForIndicator") return float(rate) if rate is not None else None except (subprocess.SubprocessError, json.JSONDecodeError, ValueError, FileNotFoundError): return None return None # Dernier usage récupéré avec succès : sert de cache (throttle) ET de repli en cas d'erreur # transitoire (429, réseau) pour ne pas afficher "HTTP 429" sur l'e-ink. Les libellés dynamiques # (resets_in_human) restent corrects car recalculés à la volée depuis resets_at. _usage_cache: dict[str, object] = {"value": None, "ts": 0.0} async def fetch_usage() -> ClaudeUsage: _load_state() # hydrate le backoff persisté avant toute logique de refresh (1×/process) now = time.time() cached = _usage_cache["value"] if isinstance(cached, ClaudeUsage) and cached.ok and (now - float(_usage_cache["ts"])) < config.usage_ttl_seconds: return cached result = await _fetch_usage() if result.ok: _usage_cache["value"] = result _usage_cache["ts"] = now return result # Erreur TRANSITOIRE (429, réseau, auth temporaire) : on réaffiche la dernière valeur correcte # connue plutôt qu'un message d'erreur, le temps que ça se rétablisse. En revanche une erreur # FATALE (login isolé à relancer) doit passer en clair à l'écran : pas de repli cache. if not result.fatal and isinstance(cached, ClaudeUsage) and cached.ok: return cached return result async def _fetch_usage() -> ClaudeUsage: if not os.path.exists(config.claude_creds_path): return ClaudeUsage(ok=False, error="credentials Claude absents — login isolé requis") try: token = await _valid_token() except _RefreshThrottled: return ClaudeUsage(ok=False, error="refresh en pause (anti-429)") except _RefreshFatal: return ClaudeUsage(ok=False, fatal=True, error="Reconnexion Claude requise") except (OSError, KeyError, json.JSONDecodeError) as exc: return ClaudeUsage(ok=False, error=f"credentials illisibles: {exc}") except httpx.HTTPError as exc: return ClaudeUsage(ok=False, error=f"refresh échoué: {exc}") try: resp = await _get_usage(token) # Token rejeté malgré la marge -> une tentative de refresh forcé (soumise au backoff). if resp.status_code == 401: try: data = _load_creds() async with _refresh_lock: token = await _refresh(data) except _RefreshFatal: return ClaudeUsage(ok=False, fatal=True, error="Reconnexion Claude requise") except httpx.HTTPError as exc: _note_refresh_failure(exc) raise resp = await _get_usage(token) except httpx.HTTPError as exc: return ClaudeUsage(ok=False, error=f"réseau: {exc}") if resp.status_code == 401: return ClaudeUsage(ok=False, error="auth invalide — relancer le login isolé") if resp.status_code == 403: return ClaudeUsage(ok=False, error="scope insuffisant (login complet requis)") if resp.status_code != 200: return ClaudeUsage(ok=False, error=f"HTTP {resp.status_code}") data = resp.json() burn = _burn_rate_from_ccusage() if config.ccusage_enabled else None extra = None eu = data.get("extra_usage") or {} if eu.get("is_enabled") and eu.get("used_credits"): extra = ExtraUsage( used=float(eu.get("used_credits", 0)), limit=float(eu["monthly_limit"]) if eu.get("monthly_limit") else None, currency=eu.get("currency", "EUR"), ) return ClaudeUsage( ok=True, five_hour=_parse_window(data.get("five_hour")), seven_day=_parse_window(data.get("seven_day")), seven_day_opus=_parse_window(data.get("seven_day_opus")), seven_day_sonnet=_parse_window(data.get("seven_day_sonnet")), extra=extra, burn_rate=burn, )