"""CLI InkFlow (typer). Commandes : - parse : EPUB -> book.json + chapters/chNN.json - analyze : analyse Gemma d'un (ou de tous les) chapitre(s) -> analysis + cast - info : affiche la structure d'un livre deja parse """ from __future__ import annotations from typing import Optional import typer from rich.console import Console from rich.table import Table from .config import ensure_dirs from .epub.parser import load_book, load_chapter_text, parse_epub from .models import Cast from .store import artifacts app = typer.Typer(add_completion=False, help="InkFlow : EPUB -> livre audio (local, MLX).") console = Console() @app.command() def parse(epub_path: str, slug: Optional[str] = typer.Option(None, help="Slug interne (def: depuis le titre).")): """Parse un EPUB en structure normalisee.""" ensure_dirs() book = parse_epub(epub_path, slug=slug) console.print(f"[green]Parse:[/] {book.title} — slug=[cyan]{book.slug}[/]") console.print(f" {len(book.chapters)} items, {len(book.render_chapters)} a rendre.") _print_chapters(book) @app.command() def info(slug: str): """Affiche la structure d'un livre deja parse.""" _print_chapters(load_book(slug)) @app.command() def serve(host: str = "127.0.0.1", port: int = 8000): """Lance l'API + l'UI web (sert frontend/dist si build).""" import uvicorn ensure_dirs() console.print(f"[green]InkFlow[/] sur http://{host}:{port}") uvicorn.run("inkflow.api.app:app", host=host, port=port, log_level="info") @app.command() def analyze( slug: str, chapter: Optional[int] = typer.Option(None, help="Index de chapitre unique (def: tous)."), limit: Optional[int] = typer.Option(None, help="Limiter au N premiers chapitres rendus."), force: bool = typer.Option(False, help="Re-analyser meme si un artefact existe."), ): """Analyse Gemma : segments narration/dialogue + locuteurs + casting.""" from .analysis.gemma import Gemma from .analysis.segmenter import analyze_chapter from .settings import get_settings book = load_book(slug) gemma = Gemma() dedup_gemma = gemma if get_settings().dedup_use_gemma else None cast = artifacts.load_cast(slug) chars = list(cast.characters) targets = [c for c in book.render_chapters] if chapter is not None: targets = [c for c in book.chapters if c.index == chapter] elif limit: targets = targets[:limit] for ch in targets: if not force and artifacts.analysis_path(slug, ch.index).exists(): console.print(f"[dim]ch{ch.index:02d} deja analyse — ignore.[/]") continue ct = load_chapter_text(slug, ch) console.print(f"[blue]Analyse[/] ch{ch.index:02d} — {ch.title} ({ct.word_count} mots)…") try: # La dedup est faite dans analyze_chapter : `chars` recoit le cast # cumule reconcilie. analysis, chars = analyze_chapter( ch, ct, gemma, book_chars=chars, dedup_gemma=dedup_gemma) except Exception as exc: # noqa: BLE001 — un chapitre ne doit pas tout stopper console.print(f" [yellow]! echec, chapitre ignore: {exc}[/]") continue artifacts.save_analysis(slug, analysis) n_dlg = sum(1 for s in analysis.segments if s.type.value == "dialogue") console.print(f" -> {len(analysis.segments)} segments ({n_dlg} repliques), " f"{len(chars)} personnages cumules.") cast = Cast(narrator_voice_id=cast.narrator_voice_id, characters=chars) artifacts.save_cast(slug, cast) console.print(f"[green]Casting[/] : {len(chars)} personnages -> cast.json") @app.command() def pronounce( slug: str, chapter: Optional[int] = typer.Option(None, help="Index de chapitre (def: 1er rendu)."), ): """Propose des candidats de prononciation (Gemma) -> pronunciation.json.""" from .analysis.gemma import Gemma from .analysis.pronunciation import merge_pronunciations, propose_pronunciations book = load_book(slug) ch = (next((c for c in book.chapters if c.index == chapter), None) if chapter is not None else (book.render_chapters[0] if book.render_chapters else None)) if ch is None or not ch.text_file: console.print("[red]Chapitre introuvable.[/]"); raise typer.Exit(1) ct = load_chapter_text(slug, ch) gemma = Gemma() with console.status("Recherche des mots a risque…"): new = propose_pronunciations("\n".join(ct.paragraphs), gemma) pron = merge_pronunciations(artifacts.load_pronunciation(slug), new) artifacts.save_pronunciation(slug, pron) table = Table("terme", "prononciation", "note") for e in pron.entries: table.add_row(e.term, e.replacement, e.note or "") console.print(table) console.print(f"[green]{len(pron.entries)} entrees[/] -> pronunciation.json") @app.command() def cast( slug: str, rebuild_voicebank: bool = typer.Option(False, help="Regenere les clips de la voicebank."), dedup: bool = typer.Option(False, help="Deduplique d'abord les variantes de noms (heuristique)."), llm: bool = typer.Option(False, "--llm", help="Ajoute la passe Gemma a la dedup (moins sur)."), ): """Construit la voicebank (si besoin) et auto-assigne les voix au casting.""" from .casting.assign import assign_voices from .casting.voicebank import build_voicebank, load_voicebank cast = artifacts.load_cast(slug) if not cast.characters: console.print("[yellow]Aucun personnage — lance d'abord `analyze`.[/]") raise typer.Exit(1) if dedup: from .casting.dedup import dedup_cast from .models import Cast gemma = None if llm: from .analysis.gemma import Gemma gemma = Gemma() before = len(cast.characters) with console.status("Deduplication du casting…"): chars = dedup_cast(cast.characters, gemma) cast = Cast(narrator_voice_id=cast.narrator_voice_id, characters=chars) artifacts.save_cast(slug, cast) console.print(f"[green]Dedup[/] : {before} -> {len(chars)} personnages.") vb = load_voicebank() if rebuild_voicebank or not vb.entries or not any(e.ref_audio for e in vb.entries): with console.status("Generation des clips de la voicebank…"): vb = build_voicebank(regenerate=rebuild_voicebank) console.print(f"[green]Voicebank[/] : {len(vb.entries)} voix, clips generes.") cast = assign_voices(cast.characters, vb, narrator_voice_id=cast.narrator_voice_id) artifacts.save_cast(slug, cast) table = Table("personnage", "genre", "voix") table.add_row("[narrateur]", "", cast.narrator_voice_id or "") for ch in cast.characters: table.add_row(ch.name, ch.gender or "?", ch.voice_id or "") console.print(table) @app.command() def render( slug: str, chapter: int = typer.Argument(..., help="Index du chapitre a synthetiser."), backend: str = typer.Option("kokoro", help="Moteur TTS: kokoro | qwen3."), mono: bool = typer.Option(True, help="Mono-narrateur (sinon multi-voix via cast)."), max_paragraphs: Optional[int] = typer.Option(None, help="Limiter (test rapide)."), ): """Synthetise un chapitre en MP3 dans output//.""" from .pipeline.render import ( build_units_mono, build_units_multi, render_chapter_to_mp3, ) from .tts.base import VoiceSpec from .tts.factory import get_backend book = load_book(slug) ch = next((c for c in book.chapters if c.index == chapter), None) if ch is None or not ch.text_file: console.print(f"[red]Chapitre {chapter} introuvable ou non rendu.[/]") raise typer.Exit(1) ct = load_chapter_text(slug, ch) if max_paragraphs: ct.paragraphs = ct.paragraphs[:max_paragraphs] tts = get_backend(backend) pron = artifacts.load_pronunciation(slug) if mono: units = build_units_mono(ct, tts.default_voice()) else: from .casting.voicebank import load_voicebank, voice_spec_for from .pipeline.render import make_voice_resolver analysis = artifacts.load_analysis(slug, chapter) cast_data = artifacts.load_cast(slug) vb = load_voicebank() # Voix narrateur par defaut depuis la voicebank si disponible. narrator_entry = vb.by_id(cast_data.narrator_voice_id) if cast_data.narrator_voice_id else None default_voice = (voice_spec_for(narrator_entry, backend) if narrator_entry else tts.default_voice()) resolver = make_voice_resolver(cast_data, vb, backend) units = build_units_multi(analysis, resolver, default_voice) with console.status(f"Synthese de {len(units)} unites ({backend})…"): def _p(done, total): console.print(f" unite {done}/{total}", end="\r") track = (book.render_chapters.index(ch) + 1) if ch in book.render_chapters else None mp3 = render_chapter_to_mp3(book, ch, units, tts, pron=pron, track=track, progress=_p) console.print(f"\n[green]MP3:[/] {mp3}") def _print_chapters(book) -> None: table = Table(show_header=True, header_style="bold") for col in ("idx", "kind", "render", "pov", "mots", "sortie", "titre"): table.add_column(col) for c in book.chapters: table.add_row( str(c.index), c.kind.value, "✓" if c.render else "·", c.pov or "", str(c.word_count), c.output_name or "", c.title) console.print(table) if __name__ == "__main__": app()