"""CLI InkFlow (typer). Commandes : - parse : EPUB -> book.json + chapters/chNN.json - analyze : analyse Gemma d'un (ou de tous les) chapitre(s) -> analysis + cast - info : affiche la structure d'un livre deja parse """ from __future__ import annotations from typing import Optional import typer from rich.console import Console from rich.table import Table from .config import book_data_dir, ensure_dirs from .epub.parser import load_book, load_chapter_text, parse_epub from .models import Cast from .store import artifacts app = typer.Typer(add_completion=False, help="InkFlow : EPUB -> livre audio (local, MLX).") console = Console() @app.command() def parse(epub_path: str, slug: Optional[str] = typer.Option(None, help="Slug interne (def: depuis le titre).")): """Parse un EPUB en structure normalisee.""" ensure_dirs() book = parse_epub(epub_path, slug=slug) console.print(f"[green]Parse:[/] {book.title} — slug=[cyan]{book.slug}[/]") console.print(f" {len(book.chapters)} items, {len(book.render_chapters)} a rendre.") _print_chapters(book) @app.command() def info(slug: str): """Affiche la structure d'un livre deja parse.""" _print_chapters(load_book(slug)) @app.command() def serve(host: str = "127.0.0.1", port: int = 8000): """Lance l'API + l'UI web (sert frontend/dist si build).""" import uvicorn ensure_dirs() console.print(f"[green]InkFlow[/] sur http://{host}:{port}") uvicorn.run("inkflow.api.app:app", host=host, port=port, log_level="info") @app.command() def analyze( slug: str, chapter: Optional[int] = typer.Option(None, help="Index de chapitre unique (def: tous)."), limit: Optional[int] = typer.Option(None, help="Limiter au N premiers chapitres rendus."), force: bool = typer.Option(False, help="Re-analyser meme si un artefact existe."), backend: Optional[str] = typer.Option(None, help="Moteur LLM: mlx ou lmstudio (def: reglages)."), model: Optional[str] = typer.Option(None, help="Identifiant de modele (def: reglages)."), ): """Analyse Gemma : segments narration/dialogue + locuteurs + casting.""" from .analysis.llm.client import LLM from .analysis.segmenter import analyze_chapter from .settings import get_settings book = load_book(slug) gemma = LLM(model_id=model, backend=backend) dedup_gemma = gemma if get_settings().dedup_use_gemma else None cast = artifacts.load_cast(slug) chars = list(cast.characters) targets = [c for c in book.render_chapters] if chapter is not None: targets = [c for c in book.chapters if c.index == chapter] elif limit: targets = targets[:limit] for ch in targets: if not force and artifacts.analysis_path(slug, ch.index).exists(): console.print(f"[dim]ch{ch.index:02d} deja analyse — ignore.[/]") continue ct = load_chapter_text(slug, ch) console.print(f"[blue]Analyse[/] ch{ch.index:02d} — {ch.title} ({ct.word_count} mots)…") try: # La dedup est faite dans analyze_chapter : `chars` recoit le cast # cumule reconcilie. analysis, chars = analyze_chapter( ch, ct, gemma, book_chars=chars, dedup_gemma=dedup_gemma) except Exception as exc: # noqa: BLE001 — un chapitre ne doit pas tout stopper console.print(f" [yellow]! echec, chapitre ignore: {exc}[/]") continue artifacts.save_analysis(slug, analysis) n_dlg = sum(1 for s in analysis.segments if s.type.value == "dialogue") console.print(f" -> {len(analysis.segments)} segments ({n_dlg} repliques), " f"{len(chars)} personnages cumules.") cast = Cast(narrator_voice_id=cast.narrator_voice_id, characters=chars) artifacts.save_cast(slug, cast) console.print(f"[green]Casting[/] : {len(chars)} personnages -> cast.json") @app.command() def benchmark( slug: str, models: Optional[str] = typer.Option( None, help="Modeles a comparer, separes par des virgules (def: modele courant)."), backend: Optional[str] = typer.Option( None, help="Moteur LLM: mlx ou lmstudio (def: reglages)."), chapter: Optional[int] = typer.Option( None, help="Restreindre a un chapitre (def: tous ceux avec reference)."), temperature: Optional[float] = typer.Option( None, help="Epingle la temperature Gemma (repro). Ex: 0.0."), reasoning: bool = typer.Option( False, "--reasoning", help="Modeles a raisonnement : retire la pensee + budget tokens accru."), use_cached: bool = typer.Option( False, "--use-cached", help="Compare les analysis/chNN.json existants (pas de modele)."), stream: bool = typer.Option( False, "--stream", help="Affiche les tokens generes en temps reel (pensee + reponse)."), ): """Met des modeles en concurrence sur les chapitres de reference (vs reference/).""" import sys from datetime import datetime from .analysis.llm import client as _llm from .analysis.benchmark import run_benchmark from .settings import get_settings settings = get_settings() backend_name = backend or settings.gemma_backend default_model = (settings.lmstudio_model if backend_name == "lmstudio" else settings.gemma_model) model_ids = ([m.strip() for m in models.split(",") if m.strip()] if models else [default_model]) chapters = [chapter] if chapter is not None else None label = "artefacts en cache" if use_cached else f"{len(model_ids)} modele(s)" console.print(f"[blue]Benchmark[/] {slug} ({label}) — suivi par chapitre :") # Suivi en clair (lignes persistantes), avec horodatage pour voir l'avancement # d'un run long. On evite console.status (spinner) qui n'imprime rien. def _progress(msg: str) -> None: from datetime import datetime as _dt console.print(f"[dim]{_dt.now():%H:%M:%S}[/] {msg}") # Streaming des tokens : ecriture brute sur stdout (sans markup rich) pour # voir defiler pensee et reponse. Necessite stdout non bufferise cote shell. if stream: def _sink(piece: str) -> None: sys.stdout.write(piece) sys.stdout.flush() _llm.set_token_sink(_sink) try: report = run_benchmark( slug, model_ids, backend=backend_name, chapters=chapters, temperature=temperature, reasoning=reasoning if reasoning else None, use_cached=use_cached, progress=_progress) finally: if stream: _llm.set_token_sink(None) report.generated_at = datetime.now().isoformat(timespec="seconds") # Table comparative : une ligne par modele (agregat micro-moyenne). table = Table(title=f"Benchmark {slug} — chapitres {report.chapters}") table.add_column("modele") for col in ("speaker_dlg", "speaker_all", "incise_f1", "type", "glued", "temps(s)"): table.add_column(col, justify="right") for ms in report.models: if ms.error or ms.aggregate is None: table.add_row(ms.model_id, f"[red]{ms.error or 'aucun chapitre'}[/]") continue a = ms.aggregate table.add_row( ms.model_id, f"{a.speaker_acc_dialogue:.1%}", f"{a.speaker_acc_all:.1%}", f"{a.incise_overlap_f1:.2f}", f"{a.type_acc:.1%}", f"{a.glued_acc:.1%}", f"{ms.elapsed_s:.0f}", ) console.print(table) # Detail des erreurs d'attribution (les pires) par modele. for ms in report.models: errs = [e for cs in ms.per_chapter for e in cs.errors] if not errs: continue console.print(f"\n[bold]{ms.model_id}[/] — {len(errs)} erreur(s) de locuteur:") for e in errs[:15]: console.print( f" ch·seg{e.index:>3} attendu=[green]{e.expected}[/] " f"obtenu=[red]{e.got}[/] — {e.text_excerpt!r}") if len(errs) > 15: console.print(f" [dim]… +{len(errs) - 15} autres[/]") # Rapport JSON horodate. out_dir = book_data_dir(slug) / "benchmark" out_dir.mkdir(parents=True, exist_ok=True) stamp = report.generated_at.replace(":", "").replace("-", "") out_path = out_dir / f"{stamp}.json" out_path.write_text(report.model_dump_json(indent=2), encoding="utf-8") console.print(f"\n[green]Rapport[/] -> {out_path}") @app.command() def pronounce( slug: str, chapter: Optional[int] = typer.Option(None, help="Index de chapitre (def: 1er rendu)."), backend: Optional[str] = typer.Option(None, help="Moteur LLM: mlx ou lmstudio (def: reglages)."), model: Optional[str] = typer.Option(None, help="Identifiant de modele (def: reglages)."), ): """Propose des candidats de prononciation (Gemma) -> pronunciation.json.""" from .analysis.llm.client import LLM from .analysis.pronunciation import merge_pronunciations, propose_pronunciations book = load_book(slug) ch = (next((c for c in book.chapters if c.index == chapter), None) if chapter is not None else (book.render_chapters[0] if book.render_chapters else None)) if ch is None or not ch.text_file: console.print("[red]Chapitre introuvable.[/]"); raise typer.Exit(1) ct = load_chapter_text(slug, ch) gemma = LLM(model_id=model, backend=backend) with console.status("Recherche des mots a risque…"): new = propose_pronunciations("\n".join(ct.paragraphs), gemma) pron = merge_pronunciations(artifacts.load_pronunciation(slug), new) artifacts.save_pronunciation(slug, pron) table = Table("terme", "prononciation", "note") for e in pron.entries: table.add_row(e.term, e.replacement, e.note or "") console.print(table) console.print(f"[green]{len(pron.entries)} entrees[/] -> pronunciation.json") @app.command() def cast( slug: str, rebuild_voicebank: bool = typer.Option(False, help="Regenere les clips de la voicebank."), dedup: bool = typer.Option(False, help="Deduplique d'abord les variantes de noms (heuristique)."), llm: bool = typer.Option(False, "--llm", help="Ajoute la passe Gemma a la dedup (moins sur)."), backend: Optional[str] = typer.Option(None, help="Moteur LLM pour --llm: mlx ou lmstudio (def: reglages)."), model: Optional[str] = typer.Option(None, help="Identifiant de modele pour --llm (def: reglages)."), ): """Construit la voicebank (si besoin) et auto-assigne les voix au casting.""" from .casting.assign import assign_voices from .casting.voicebank import build_voicebank, load_voicebank cast = artifacts.load_cast(slug) if not cast.characters: console.print("[yellow]Aucun personnage — lance d'abord `analyze`.[/]") raise typer.Exit(1) if dedup: from .casting.dedup import dedup_cast from .models import Cast gemma = None if llm: from .analysis.llm.client import LLM gemma = LLM(model_id=model, backend=backend) before = len(cast.characters) with console.status("Deduplication du casting…"): chars = dedup_cast(cast.characters, gemma) cast = Cast(narrator_voice_id=cast.narrator_voice_id, characters=chars) artifacts.save_cast(slug, cast) console.print(f"[green]Dedup[/] : {before} -> {len(chars)} personnages.") vb = load_voicebank() if rebuild_voicebank or not vb.entries or not any(e.ref_audio for e in vb.entries): with console.status("Generation des clips de la voicebank…"): vb = build_voicebank(regenerate=rebuild_voicebank) console.print(f"[green]Voicebank[/] : {len(vb.entries)} voix, clips generes.") cast = assign_voices(cast.characters, vb, narrator_voice_id=cast.narrator_voice_id) artifacts.save_cast(slug, cast) table = Table("personnage", "genre", "voix") table.add_row("[narrateur]", "", cast.narrator_voice_id or "") for ch in cast.characters: table.add_row(ch.name, ch.gender or "?", ch.voice_id or "") console.print(table) @app.command() def render( slug: str, chapter: int = typer.Argument(..., help="Index du chapitre a synthetiser."), backend: str = typer.Option("kokoro", help="Moteur TTS: kokoro | qwen3."), mono: bool = typer.Option(True, help="Mono-narrateur (sinon multi-voix via cast)."), max_paragraphs: Optional[int] = typer.Option(None, help="Limiter (test rapide)."), ): """Synthetise un chapitre en MP3 dans output//.""" from .pipeline.render import ( build_units_mono, build_units_multi, render_chapter_to_mp3, ) from .tts.base import VoiceSpec from .tts.factory import get_backend book = load_book(slug) ch = next((c for c in book.chapters if c.index == chapter), None) if ch is None or not ch.text_file: console.print(f"[red]Chapitre {chapter} introuvable ou non rendu.[/]") raise typer.Exit(1) ct = load_chapter_text(slug, ch) if max_paragraphs: ct.paragraphs = ct.paragraphs[:max_paragraphs] tts = get_backend(backend) pron = artifacts.load_pronunciation(slug) if mono: units = build_units_mono(ct, tts.default_voice()) else: from .casting.voicebank import load_voicebank, voice_spec_for from .pipeline.render import make_voice_resolver analysis = artifacts.load_analysis(slug, chapter) cast_data = artifacts.load_cast(slug) vb = load_voicebank() # Voix narrateur par defaut depuis la voicebank si disponible. narrator_entry = vb.by_id(cast_data.narrator_voice_id) if cast_data.narrator_voice_id else None default_voice = (voice_spec_for(narrator_entry, backend) if narrator_entry else tts.default_voice()) resolver = make_voice_resolver(cast_data, vb, backend) units = build_units_multi(analysis, resolver, default_voice) with console.status(f"Synthese de {len(units)} unites ({backend})…"): def _p(done, total): console.print(f" unite {done}/{total}", end="\r") track = (book.render_chapters.index(ch) + 1) if ch in book.render_chapters else None mp3 = render_chapter_to_mp3(book, ch, units, tts, pron=pron, track=track, progress=_p) console.print(f"\n[green]MP3:[/] {mp3}") def _print_chapters(book) -> None: table = Table(show_header=True, header_style="bold") for col in ("idx", "kind", "render", "pov", "mots", "sortie", "titre"): table.add_column(col) for c in book.chapters: table.add_row( str(c.index), c.kind.value, "✓" if c.render else "·", c.pov or "", str(c.word_count), c.output_name or "", c.title) console.print(table) if __name__ == "__main__": app()