"""Backfill — génère un historique de biais IA pour backtester la stratégie. Parcourt l'historique des bougies à une cadence donnée et, à chaque pas de temps, demande à Claude un biais en n'utilisant QUE les données disponibles jusqu'à ce moment-là (pas de fuite du futur). Écrit chaque biais dans l'historique CSV avec l'horodatage de la bougie correspondante. ⚠️ COÛTEUX EN QUOTA D'ABONNEMENT : 1 appel Claude par pas de temps. 6 mois × cadence 24 h ≈ 180 appels. Choisis une cadence raisonnable. Usage : python backfill.py --start 20260101 --step-hours 24 python backfill.py --start 20260101 --end 20260301 --step-hours 12 --pairs BTC/USDT,ETH/USDT """ from __future__ import annotations import argparse import os from datetime import datetime, timedelta, timezone import ccxt from claude_client import ClaudeClient from market_data import snapshot_from_candles from signal_store import append_history EXCHANGE = os.environ.get("ANALYZER_EXCHANGE", "binance") MODEL = os.environ.get("ANALYZER_MODEL", "claude-sonnet-4-6") HISTORY_DIR = os.environ.get( "ANALYZER_HISTORY_DIR", os.path.join(os.path.dirname(os.path.dirname(__file__)), "freqtrade", "user_data", "ai_bias_history"), ) WINDOW = 100 # nb de bougies fournies à Claude à chaque pas def _parse_day(s: str) -> datetime: return datetime.strptime(s, "%Y%m%d").replace(tzinfo=timezone.utc) def _fetch_full(exchange, pair: str, timeframe: str, since_ms: int) -> list: """Récupère tout l'OHLCV depuis `since_ms` (pagination ccxt).""" out: list = [] cursor = since_ms while True: batch = exchange.fetch_ohlcv(pair, timeframe=timeframe, since=cursor, limit=1000) if not batch: break out += batch cursor = batch[-1][0] + 1 if len(batch) < 1000: break return out def main() -> None: p = argparse.ArgumentParser(description="MidasBot — backfill historique des biais IA") p.add_argument("--pairs", default="BTC/USDT,ETH/USDT,SOL/USDT,BNB/USDT") p.add_argument("--timeframe", default="1h") p.add_argument("--start", required=True, help="AAAAMMJJ") p.add_argument("--end", default=None, help="AAAAMMJJ (défaut: maintenant)") p.add_argument("--step-hours", type=int, default=24, help="cadence d'analyse") p.add_argument("--yes", action="store_true", help="ne pas demander confirmation") args = p.parse_args() pairs = [x.strip() for x in args.pairs.split(",") if x.strip()] tf = args.timeframe start = _parse_day(args.start) end = _parse_day(args.end) if args.end else datetime.now(timezone.utc) step = timedelta(hours=args.step_hours) n_steps = int((end - start) / step) print(f"Backfill {pairs} {tf} | {start.date()} → {end.date()} | " f"pas {args.step_hours} h | ~{n_steps} appels Claude ({MODEL})") if not args.yes: if input("Continuer ? Ça consomme ton quota d'abonnement [y/N] ").strip().lower() != "y": print("Annulé.") return exchange = getattr(ccxt, EXCHANGE)({"enableRateLimit": True}) since_ms = int(start.timestamp() * 1000) # Récupère tout l'historique une fois par paire (puis on tranche par pas de temps). full = {pair: _fetch_full(exchange, pair, tf, since_ms) for pair in pairs} client = ClaudeClient(model=MODEL) t = start done = 0 while t <= end: t_ms = int(t.timestamp() * 1000) snaps = [] for pair in pairs: candles = [c for c in full[pair] if c[0] <= t_ms][-WINDOW:] snap = snapshot_from_candles(pair, tf, candles) if snap: snaps.append(snap) if snaps: try: batch = client.get_biases(snaps) for bias in batch.biases: append_history(bias, HISTORY_DIR, ts=t) done += 1 print(f" {t.isoformat()} → {len(batch.biases)} biais") except Exception as exc: # noqa: BLE001 print(f" {t.isoformat()} → erreur: {exc}") t += step print(f"Terminé : {done} pas écrits dans {HISTORY_DIR}") if __name__ == "__main__": main()