"""Benchmark des modeles d'analyse contre les fichiers de reference. Les fichiers `data//reference/chNN.json` sont des verites terrain corrigees a la main (meme schema `ChapterAnalysis`). Ce module compare la sortie d'un modele (`hypothese`) a ces references et chiffre la qualite sur trois dimensions : 1. **Attribution du locuteur** (le point faible du petit modele local) ; 2. **Incises** (bornes start/end dans la replique) ; 3. **Type narration/dialogue** et flag `glued_to_prev` (garde-fou de regression). Le scoring (`score_chapter`/`aggregate`) est **pur** : aucune dependance MLX ni disque, teste comme `analysis.segmenter`. Le runner (`run_benchmark`) met plusieurs modeles en concurrence : il relance `analyze_chapter` en memoire (sans ecraser les artefacts) avec chaque `model_id`, score, libere le modele, enchaine. """ from __future__ import annotations import difflib import re import time from dataclasses import dataclass, field from typing import Callable, Optional from pydantic import BaseModel, Field from ..models import Cast, ChapterAnalysis, Incise, Segment, SegmentType # --- Normalisation ----------------------------------------------------------- _WS_RE = re.compile(r"\s+") def _norm_text(text: str) -> str: """Texte normalise pour l'alignement (insensible aux espaces/casse).""" return _WS_RE.sub(" ", text).strip().casefold() def _alias_map(cast: Optional[Cast]) -> dict[str, str]: """alias/nom (casefold) -> nom canonique, pour ne pas penaliser les variantes.""" mapping: dict[str, str] = {} if cast is None: return mapping for c in cast.characters: canon = c.name.strip() mapping[canon.casefold()] = canon for alias in c.aliases: mapping[alias.strip().casefold()] = canon return mapping def _norm_speaker(name: str, alias_map: dict[str, str]) -> str: key = (name or "").strip().casefold() return alias_map.get(key, key) # --- Comptes bruts (permettent une micro-moyenne sur plusieurs chapitres) ---- @dataclass class _Counts: seg_total: int = 0 seg_correct: int = 0 # locuteur correct (tous segments) dlg_total: int = 0 dlg_correct: int = 0 # locuteur correct (dialogues seuls) type_total: int = 0 type_correct: int = 0 glued_total: int = 0 glued_correct: int = 0 inc_exact_tp: int = 0 inc_exact_fp: int = 0 inc_exact_fn: int = 0 inc_ov_tp: int = 0 inc_ov_fp: int = 0 inc_ov_fn: int = 0 errors: list["SpeakerError"] = field(default_factory=list) confusion: dict[str, dict[str, int]] = field(default_factory=dict) warnings: list[str] = field(default_factory=list) def add(self, other: "_Counts") -> None: self.seg_total += other.seg_total self.seg_correct += other.seg_correct self.dlg_total += other.dlg_total self.dlg_correct += other.dlg_correct self.type_total += other.type_total self.type_correct += other.type_correct self.glued_total += other.glued_total self.glued_correct += other.glued_correct self.inc_exact_tp += other.inc_exact_tp self.inc_exact_fp += other.inc_exact_fp self.inc_exact_fn += other.inc_exact_fn self.inc_ov_tp += other.inc_ov_tp self.inc_ov_fp += other.inc_ov_fp self.inc_ov_fn += other.inc_ov_fn self.errors.extend(other.errors) self.warnings.extend(other.warnings) for exp, gots in other.confusion.items(): dst = self.confusion.setdefault(exp, {}) for got, n in gots.items(): dst[got] = dst.get(got, 0) + n # --- Modeles de rapport (serialisables) -------------------------------------- class SpeakerError(BaseModel): index: int # index du segment dans le chapitre text_excerpt: str expected: str got: str class ChapterScore(BaseModel): index: int # -1 pour l'agregat n_segments: int = 0 n_dialogue: int = 0 # attribution du locuteur speaker_acc_all: float = 1.0 speaker_acc_dialogue: float = 1.0 # incises incise_exact_p: float = 1.0 incise_exact_r: float = 1.0 incise_exact_f1: float = 1.0 incise_overlap_p: float = 1.0 incise_overlap_r: float = 1.0 incise_overlap_f1: float = 1.0 # type / glued type_acc: float = 1.0 glued_acc: float = 1.0 # detail errors: list[SpeakerError] = Field(default_factory=list) confusion: dict[str, dict[str, int]] = Field(default_factory=dict) alignment_warnings: list[str] = Field(default_factory=list) class ModelScore(BaseModel): model_id: str elapsed_s: float = 0.0 error: Optional[str] = None # rempli si le modele a echoue (chargement, etc.) per_chapter: list[ChapterScore] = Field(default_factory=list) aggregate: Optional[ChapterScore] = None class BenchmarkReport(BaseModel): slug: str generated_at: str # horodatage pose par la couche I/O (CLI) chapters: list[int] = Field(default_factory=list) settings_snapshot: dict = Field(default_factory=dict) models: list[ModelScore] = Field(default_factory=list) # --- Metriques pures --------------------------------------------------------- def _prf(tp: int, fp: int, fn: int) -> tuple[float, float, float]: p = tp / (tp + fp) if (tp + fp) else 1.0 r = tp / (tp + fn) if (tp + fn) else 1.0 f1 = (2 * p * r / (p + r)) if (p + r) else 0.0 return p, r, f1 def _ratio(correct: int, total: int) -> float: return correct / total if total else 1.0 def _iou(a: Incise, b: Incise) -> float: inter = max(0, min(a.end, b.end) - max(a.start, b.start)) union = (a.end - a.start) + (b.end - b.start) - inter return inter / union if union else 0.0 def _match_incises(ref: list[Incise], hyp: list[Incise]) -> tuple[int, int, int, int, int, int]: """Compare deux listes de spans : (exact tp/fp/fn, overlap tp/fp/fn). Exact = memes (start, end). Overlap = appariement glouton IoU >= 0.5. """ ref_keys = [(i.start, i.end) for i in ref] hyp_keys = [(i.start, i.end) for i in hyp] # appariement exact 1:1 (pas de double comptage si doublons improbables) used = [False] * len(ref_keys) ex_tp = 0 for hk in hyp_keys: for j, rk in enumerate(ref_keys): if not used[j] and hk == rk: used[j] = True ex_tp += 1 break ex_fp = len(hyp_keys) - ex_tp ex_fn = len(ref_keys) - ex_tp used = [False] * len(ref) ov_tp = 0 for h in hyp: best_j, best_iou = -1, 0.0 for j, r in enumerate(ref): if used[j]: continue iou = _iou(r, h) if iou >= 0.5 and iou > best_iou: best_j, best_iou = j, iou if best_j >= 0: used[best_j] = True ov_tp += 1 ov_fp = len(hyp) - ov_tp ov_fn = len(ref) - ov_tp return ex_tp, ex_fp, ex_fn, ov_tp, ov_fp, ov_fn def align(ref: ChapterAnalysis, hyp: ChapterAnalysis) -> list[tuple[Optional[Segment], Optional[Segment]]]: """Aligne les segments hypothese sur la reference. Cas nominal (segmentation deterministe) : meme nombre + memes textes -> 1:1. Sinon, alignement par `difflib.SequenceMatcher` sur les textes normalises ; les segments orphelins ressortent en paires avec `None`. """ rt = [_norm_text(s.text) for s in ref.segments] ht = [_norm_text(s.text) for s in hyp.segments] if rt == ht: return list(zip(ref.segments, hyp.segments)) pairs: list[tuple[Optional[Segment], Optional[Segment]]] = [] sm = difflib.SequenceMatcher(a=rt, b=ht, autojunk=False) for tag, i1, i2, j1, j2 in sm.get_opcodes(): if tag == "equal": for k in range(i2 - i1): pairs.append((ref.segments[i1 + k], hyp.segments[j1 + k])) elif tag == "replace": for k in range(max(i2 - i1, j2 - j1)): r = ref.segments[i1 + k] if i1 + k < i2 else None h = hyp.segments[j1 + k] if j1 + k < j2 else None pairs.append((r, h)) elif tag == "delete": for k in range(i1, i2): pairs.append((ref.segments[k], None)) elif tag == "insert": for k in range(j1, j2): pairs.append((None, hyp.segments[k])) return pairs def _score_counts(ref: ChapterAnalysis, hyp: ChapterAnalysis, cast: Optional[Cast]) -> _Counts: amap = _alias_map(cast) c = _Counts() for r, h in align(ref, hyp): if r is None: c.warnings.append(f"segment hypothese sans correspondance: {h.text[:60]!r}") continue if h is None: c.warnings.append(f"segment reference non couvert: {r.text[:60]!r}") continue # type c.type_total += 1 if r.type == h.type: c.type_correct += 1 # glued c.glued_total += 1 if r.glued_to_prev == h.glued_to_prev: c.glued_correct += 1 # locuteur exp = _norm_speaker(r.speaker, amap) got = _norm_speaker(h.speaker, amap) c.seg_total += 1 ok = exp == got if ok: c.seg_correct += 1 if r.type is SegmentType.DIALOGUE: c.dlg_total += 1 if ok: c.dlg_correct += 1 else: c.errors.append(SpeakerError( index=r_index(ref, r), text_excerpt=r.text[:80], expected=r.speaker, got=h.speaker)) row = c.confusion.setdefault(r.speaker, {}) row[h.speaker] = row.get(h.speaker, 0) + 1 # incises (sur les dialogues de la reference) ex_tp, ex_fp, ex_fn, ov_tp, ov_fp, ov_fn = _match_incises(r.incises, h.incises) c.inc_exact_tp += ex_tp c.inc_exact_fp += ex_fp c.inc_exact_fn += ex_fn c.inc_ov_tp += ov_tp c.inc_ov_fp += ov_fp c.inc_ov_fn += ov_fn return c def r_index(analysis: ChapterAnalysis, seg: Segment) -> int: """Position d'un segment dans le chapitre (identite d'objet).""" for i, s in enumerate(analysis.segments): if s is seg: return i return -1 def _counts_to_score(index: int, c: _Counts) -> ChapterScore: ex_p, ex_r, ex_f1 = _prf(c.inc_exact_tp, c.inc_exact_fp, c.inc_exact_fn) ov_p, ov_r, ov_f1 = _prf(c.inc_ov_tp, c.inc_ov_fp, c.inc_ov_fn) return ChapterScore( index=index, n_segments=c.seg_total, n_dialogue=c.dlg_total, speaker_acc_all=_ratio(c.seg_correct, c.seg_total), speaker_acc_dialogue=_ratio(c.dlg_correct, c.dlg_total), incise_exact_p=ex_p, incise_exact_r=ex_r, incise_exact_f1=ex_f1, incise_overlap_p=ov_p, incise_overlap_r=ov_r, incise_overlap_f1=ov_f1, type_acc=_ratio(c.type_correct, c.type_total), glued_acc=_ratio(c.glued_correct, c.glued_total), errors=c.errors, confusion=c.confusion, alignment_warnings=c.warnings, ) def score_chapter(ref: ChapterAnalysis, hyp: ChapterAnalysis, cast: Optional[Cast] = None) -> ChapterScore: """Score une hypothese contre une reference pour un chapitre.""" return _counts_to_score(ref.index, _score_counts(ref, hyp, cast)) def aggregate(scores: list[ChapterScore], counts: list[_Counts]) -> ChapterScore: """Micro-moyenne (pooling de tous les segments) sur plusieurs chapitres.""" total = _Counts() for c in counts: total.add(c) return _counts_to_score(-1, total) # --- Runner multi-modeles ---------------------------------------------------- def _reference_chapters(slug: str, chapters: Optional[list[int]]) -> list[int]: """Index des chapitres disposant d'une reference (filtres par `chapters`).""" from ..config import book_data_dir ref_dir = book_data_dir(slug) / "reference" found: list[int] = [] if ref_dir.exists(): for p in sorted(ref_dir.glob("ch*.json")): m = re.match(r"ch(\d+)\.json$", p.name) if m: found.append(int(m.group(1))) if chapters is not None: found = [i for i in found if i in chapters] return found def _load_reference(slug: str, index: int) -> ChapterAnalysis: from ..config import book_data_dir path = book_data_dir(slug) / "reference" / f"ch{index:02d}.json" return ChapterAnalysis.model_validate_json(path.read_text(encoding="utf-8")) def _build_model_score(model_id: str, per_chapter: list[ChapterScore], counts: list[_Counts], elapsed: float) -> ModelScore: return ModelScore( model_id=model_id, elapsed_s=elapsed, per_chapter=per_chapter, aggregate=aggregate(per_chapter, counts) if per_chapter else None, ) def run_benchmark(slug: str, model_ids: list[str], *, backend: Optional[str] = None, chapters: Optional[list[int]] = None, temperature: Optional[float] = None, reasoning: Optional[bool] = None, use_cached: bool = False, progress: Optional[Callable[[str], None]] = None) -> BenchmarkReport: """Met plusieurs modeles en concurrence sur les chapitres de reference. `use_cached=True` : compare les artefacts `analysis/chNN.json` existants (pas de modele charge ; `model_ids` est ignore, un seul resultat "cache"). Sinon, pour chaque `model_id`, relance `analyze_chapter` en memoire (sans `save_analysis`) et score. Un seul MLX reside en RAM a la fois. `progress` : callback optionnel appele a chaque etape (chargement, chapitre analyse, modele termine) pour suivre l'avancement d'un run long. """ from ..epub.parser import load_book, load_chapter_text from ..settings import get_settings from ..store import artifacts emit = progress or (lambda _msg: None) targets = _reference_chapters(slug, chapters) if not targets: raise ValueError( f"Aucune reference trouvee pour {slug!r} " f"(data/{slug}/reference/chNN.json).") references = {i: _load_reference(slug, i) for i in targets} cast = artifacts.load_cast(slug) settings = get_settings() snapshot = { "gemma_temperature": temperature if temperature is not None else settings.gemma_temperature, "gemma_max_tokens": settings.gemma_max_tokens, "gemma_reasoning": reasoning if reasoning is not None else settings.gemma_reasoning, "dedup_use_gemma": settings.dedup_use_gemma, "retro_pass_use_gemma": settings.retro_pass_use_gemma, "prompt_speakers_hash": hash(settings.prompt_speakers) & 0xFFFFFFFF, } report = BenchmarkReport( slug=slug, generated_at="", chapters=targets, settings_snapshot=snapshot) if use_cached: per_chapter, counts = [], [] for i in targets: hyp = artifacts.load_analysis(slug, i) cnt = _score_counts(references[i], hyp, cast) counts.append(cnt) per_chapter.append(_counts_to_score(i, cnt)) report.models.append(_build_model_score("", per_chapter, counts, 0.0)) return report from .llm.client import LLM from .llm.factory import reset_llm_cache from .segmenter import analyze_chapter book = load_book(slug) by_index = {c.index: c for c in book.chapters} # Epingle temperature/reasoning en memoire (jamais save_settings -> pas # d'ecriture disque), restaure en sortie. original_temp = settings.gemma_temperature original_reasoning = settings.gemma_reasoning if temperature is not None: settings.gemma_temperature = temperature if reasoning is not None: settings.gemma_reasoning = reasoning try: for mi, model_id in enumerate(model_ids, 1): t0 = time.perf_counter() per_chapter, counts = [], [] model_err: Optional[str] = None emit(f"[{mi}/{len(model_ids)}] {model_id} — chargement du modele…") try: gemma = LLM(model_id=model_id, backend=backend) for i in targets: ch = by_index.get(i) if ch is None: continue emit(f" ch{i:02d} — analyse en cours…") tc = time.perf_counter() ct = load_chapter_text(slug, ch) hyp, _ = analyze_chapter( ch, ct, gemma, book_chars=list(cast.characters), dedup_gemma=None) cnt = _score_counts(references[i], hyp, cast) counts.append(cnt) cs = _counts_to_score(i, cnt) per_chapter.append(cs) emit(f" ch{i:02d} — OK en {time.perf_counter() - tc:.0f}s " f"(locuteur dlg {cs.speaker_acc_dialogue:.0%}, " f"{len(cs.errors)} erreurs)") except Exception as exc: # noqa: BLE001 — un modele KO ne stoppe pas les autres model_err = f"{type(exc).__name__}: {exc}" emit(f" ! echec: {model_err[:120]}") finally: reset_llm_cache() # libere le modele avant le suivant ms = _build_model_score( model_id, per_chapter, counts, time.perf_counter() - t0) ms.error = model_err report.models.append(ms) if not model_err and ms.aggregate is not None: emit(f"[{mi}/{len(model_ids)}] {model_id} — termine en " f"{ms.elapsed_s:.0f}s (locuteur dlg " f"{ms.aggregate.speaker_acc_dialogue:.1%})") finally: settings.gemma_temperature = original_temp settings.gemma_reasoning = original_reasoning return report