Remplace la voicebank générée par Kokoro (timbre anglais sur français phonémisé -> accent que Qwen3 clonait) par 41 vraies voix FR issues de CML-TTS (livres audio studio) : 1 narrateur dédié, 18F/14M nommées, 4F/4M anonymes réservées. - scripts/import_voices.py : import multi-shards parquet, 1 clip/locuteur (le plus propre via levenshtein), genre estimé par F0 (YIN, anti-octave), filtre débit de parole (ref_text aligné sur l'audio). - VoiceEntry.anonymous + assign_voices : les figurants « anonyme (...) » tirent dans un pool réservé, jamais mélangé avec les voix nommées ; narrateur dédié (fr_narrator remplace fr_f_siwis). - dedup._anon_attrs : genre/âge déduits du nom anonyme (bon genre de voix). - tts/qwen3.py : garde-fou anti-dérive (rejette/réessaie les sorties en boucle ou coupées en estimant la durée plausible du chunk). Limite connue : Qwen3 ne sait pas synthétiser les fragments d'1-2 mots (incises, titres) -> trous ; à traiter (repli Kokoro ou fusion des incises). Inclut aussi du travail en cours antérieur (refacto backend LLM pluggable mlx/lmstudio, benchmark, ajustements frontend/API). Claude-Session: https://claude.ai/code/session_01XSVvcy1mfb4k1xDgib9vVU
365 lines
15 KiB
Python
365 lines
15 KiB
Python
"""Orchestrateur : execute les etapes du pipeline en tache de fond, piste l'etat
|
|
et diffuse l'etat complet a l'UI a chaque changement.
|
|
|
|
- Un seul worker thread execute les jobs en serie (un Mac = une charge MLX a la
|
|
fois). Les jobs sont enfiles et rendent la main immediatement a l'API.
|
|
- L'etat (ProjectState) est persiste dans data/<slug>/state.json -> reprenable.
|
|
- La diffusion passe par un `broadcaster` injecte par la couche API (pour rester
|
|
independant de FastAPI). Il recoit (slug, dict_etat).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import queue
|
|
import threading
|
|
import traceback
|
|
from pathlib import Path
|
|
from typing import Callable, Optional
|
|
|
|
from ..config import book_data_dir, book_output_dir
|
|
from ..epub.parser import load_book, load_chapter_text
|
|
from ..models import ChapterRenderState, ProjectState, StageStatus
|
|
from ..store import artifacts
|
|
|
|
Broadcaster = Callable[[str, dict], None]
|
|
|
|
|
|
def state_path(slug: str) -> Path:
|
|
return book_data_dir(slug) / "state.json"
|
|
|
|
|
|
def load_state(slug: str) -> ProjectState:
|
|
path = state_path(slug)
|
|
if path.exists():
|
|
state = ProjectState.model_validate_json(path.read_text(encoding="utf-8"))
|
|
else:
|
|
book = load_book(slug)
|
|
state = ProjectState(slug=slug, title=book.title,
|
|
stages={"parse": StageStatus.DONE})
|
|
return _reconcile(slug, state)
|
|
|
|
|
|
def _reconcile(slug: str, state: ProjectState) -> ProjectState:
|
|
"""Aligne l'etat sur les artefacts presents sur disque (reprise robuste).
|
|
|
|
Permet a l'UI de refleter ce qui a deja ete fait, meme via la CLI ou apres
|
|
un redemarrage, sans rejouer les etapes.
|
|
"""
|
|
book = load_book(slug)
|
|
state.stages.setdefault("parse", StageStatus.DONE)
|
|
|
|
# Analyse : chapitres possedant un artefact d'analyse.
|
|
analyzed = [c.index for c in book.render_chapters
|
|
if artifacts.analysis_path(slug, c.index).exists()]
|
|
if analyzed:
|
|
for idx in analyzed:
|
|
if idx not in state.analyzed_chapters:
|
|
state.analyzed_chapters.append(idx)
|
|
if state.stage("analyze") == StageStatus.PENDING:
|
|
state.stages["analyze"] = (
|
|
StageStatus.DONE if len(analyzed) == len(book.render_chapters)
|
|
else StageStatus.RUNNING)
|
|
|
|
# Casting : au moins une voix attribuee.
|
|
cast = artifacts.load_cast(slug)
|
|
if cast.narrator_voice_id or any(c.voice_id for c in cast.characters):
|
|
state.stages.setdefault("cast", StageStatus.DONE)
|
|
|
|
# Prononciation : au moins une entree.
|
|
if artifacts.load_pronunciation(slug).entries:
|
|
state.stages.setdefault("pronounce", StageStatus.DONE)
|
|
|
|
# Rendu : mp3 presents en sortie.
|
|
out_dir = book_output_dir(book.title)
|
|
for ch in book.render_chapters:
|
|
existing = state.render.get(ch.index)
|
|
if existing and existing.mp3:
|
|
continue
|
|
if ch.output_name and (out_dir / ch.output_name).exists():
|
|
state.render[ch.index] = ChapterRenderState(
|
|
index=ch.index, status=StageStatus.DONE, progress=1.0,
|
|
mp3=ch.output_name)
|
|
return state
|
|
|
|
|
|
class Orchestrator:
|
|
def __init__(self) -> None:
|
|
self._q: "queue.Queue[tuple[str, Callable[[], None]]]" = queue.Queue()
|
|
self._worker: Optional[threading.Thread] = None
|
|
self._broadcaster: Optional[Broadcaster] = None
|
|
self._lock = threading.Lock()
|
|
self.busy_slug: Optional[str] = None
|
|
|
|
# --- infra ---------------------------------------------------------------
|
|
def set_broadcaster(self, fn: Broadcaster) -> None:
|
|
self._broadcaster = fn
|
|
|
|
def _ensure_worker(self) -> None:
|
|
if self._worker is None or not self._worker.is_alive():
|
|
self._worker = threading.Thread(target=self._loop, daemon=True)
|
|
self._worker.start()
|
|
|
|
def _loop(self) -> None:
|
|
while True:
|
|
slug, job = self._q.get()
|
|
self.busy_slug = slug
|
|
try:
|
|
job()
|
|
except Exception: # noqa: BLE001
|
|
traceback.print_exc()
|
|
finally:
|
|
self.busy_slug = None
|
|
self._q.task_done()
|
|
|
|
def _save_and_emit(self, state: ProjectState) -> None:
|
|
path = state_path(state.slug)
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(state.model_dump_json(indent=2), encoding="utf-8")
|
|
if self._broadcaster:
|
|
self._broadcaster(state.slug, state.model_dump(mode="json"))
|
|
|
|
def enqueue(self, slug: str, job: Callable[[], None]) -> None:
|
|
self._ensure_worker()
|
|
self._q.put((slug, job))
|
|
|
|
# --- etapes --------------------------------------------------------------
|
|
def run_analyze(self, slug: str, chapter_indexes: Optional[list[int]] = None) -> None:
|
|
def job() -> None:
|
|
from ..analysis.llm.client import LLM
|
|
from ..analysis.segmenter import analyze_chapter
|
|
from ..models import Cast
|
|
from ..settings import get_settings
|
|
|
|
state = load_state(slug)
|
|
book = load_book(slug)
|
|
targets = [c for c in book.render_chapters
|
|
if chapter_indexes is None or c.index in chapter_indexes]
|
|
state.stages["analyze"] = StageStatus.RUNNING
|
|
state.active_stage = "analyze"
|
|
self._save_and_emit(state)
|
|
|
|
gemma = LLM()
|
|
dedup_gemma = gemma if get_settings().dedup_use_gemma else None
|
|
cast = artifacts.load_cast(slug)
|
|
chars = list(cast.characters)
|
|
total = len(targets)
|
|
for i, ch in enumerate(targets):
|
|
state.active_detail = f"Analyse {ch.title}"
|
|
state.active_progress = i / max(total, 1)
|
|
self._save_and_emit(state)
|
|
ct = load_chapter_text(slug, ch)
|
|
try:
|
|
# La dedup est faite dans analyze_chapter : `chars` recoit le
|
|
# cast cumule reconcilie.
|
|
analysis, chars = analyze_chapter(
|
|
ch, ct, gemma, book_chars=chars, dedup_gemma=dedup_gemma)
|
|
except Exception: # noqa: BLE001 — chapitre ignore, on continue
|
|
traceback.print_exc()
|
|
continue
|
|
artifacts.save_analysis(slug, analysis)
|
|
if ch.index not in state.analyzed_chapters:
|
|
state.analyzed_chapters.append(ch.index)
|
|
self._save_and_emit(state)
|
|
|
|
artifacts.save_cast(slug, Cast(
|
|
narrator_voice_id=cast.narrator_voice_id, characters=chars))
|
|
state.stages["analyze"] = StageStatus.DONE
|
|
self._finish(state)
|
|
self.enqueue(slug, job)
|
|
|
|
def run_cast(self, slug: str) -> None:
|
|
def job() -> None:
|
|
from ..casting.assign import assign_voices
|
|
from ..casting.voicebank import build_voicebank, load_voicebank
|
|
|
|
state = load_state(slug)
|
|
state.stages["cast"] = StageStatus.RUNNING
|
|
state.active_stage = "cast"
|
|
state.active_detail = "Preparation de la voicebank"
|
|
self._save_and_emit(state)
|
|
|
|
vb = load_voicebank()
|
|
if not vb.entries or not any(e.ref_audio for e in vb.entries):
|
|
vb = build_voicebank()
|
|
cast = artifacts.load_cast(slug)
|
|
cast = assign_voices(cast.characters, vb,
|
|
narrator_voice_id=cast.narrator_voice_id)
|
|
artifacts.save_cast(slug, cast)
|
|
state.stages["cast"] = StageStatus.DONE
|
|
self._finish(state)
|
|
self.enqueue(slug, job)
|
|
|
|
def run_cast_analyze(self, slug: str, chapter_indexes: Optional[list[int]] = None) -> None:
|
|
"""(Re)extrait les personnages d'un/des chapitre(s) et les reconcilie.
|
|
|
|
Plus leger que `run_analyze` : ne re-segmente pas (les artefacts d'analyse
|
|
existants restent intacts). Sert le casting "a l'echelle d'un chapitre"
|
|
tout en maintenant la coherence du livre (deduplication).
|
|
"""
|
|
def job() -> None:
|
|
from ..analysis.llm.client import LLM
|
|
from ..analysis.segmenter import extract_characters
|
|
from ..casting.dedup import reconcile_characters
|
|
from ..models import Cast
|
|
from ..settings import get_settings
|
|
|
|
state = load_state(slug)
|
|
book = load_book(slug)
|
|
targets = [c for c in book.render_chapters
|
|
if chapter_indexes is None or c.index in chapter_indexes]
|
|
state.active_stage = "cast"
|
|
self._save_and_emit(state)
|
|
|
|
gemma = LLM()
|
|
dedup_gemma = gemma if get_settings().dedup_use_gemma else None
|
|
cast = artifacts.load_cast(slug)
|
|
chars = list(cast.characters)
|
|
total = len(targets)
|
|
for i, ch in enumerate(targets):
|
|
state.active_detail = f"Casting — {ch.title}"
|
|
state.active_progress = i / max(total, 1)
|
|
self._save_and_emit(state)
|
|
ct = load_chapter_text(slug, ch)
|
|
try:
|
|
found = extract_characters("\n".join(ct.paragraphs), gemma)
|
|
speakers: list[str] = []
|
|
if artifacts.analysis_path(slug, ch.index).exists():
|
|
analysis = artifacts.load_analysis(slug, ch.index)
|
|
speakers = [s.speaker for s in analysis.segments]
|
|
chars, _ = reconcile_characters(
|
|
chars, found, dedup_gemma, speaker_names=speakers)
|
|
except Exception: # noqa: BLE001 — chapitre ignore, on continue
|
|
traceback.print_exc()
|
|
continue
|
|
artifacts.save_cast(slug, Cast(
|
|
narrator_voice_id=cast.narrator_voice_id, characters=chars))
|
|
self._save_and_emit(state)
|
|
self._finish(state)
|
|
self.enqueue(slug, job)
|
|
|
|
def run_dedup_cast(self, slug: str) -> None:
|
|
"""Replie les doublons d'un casting deja constitue (Holden/James Holden...)."""
|
|
def job() -> None:
|
|
from ..analysis.llm.client import LLM
|
|
from ..casting.dedup import dedup_cast
|
|
from ..models import Cast
|
|
from ..settings import get_settings
|
|
|
|
state = load_state(slug)
|
|
state.active_stage = "cast"
|
|
state.active_detail = "Deduplication du casting"
|
|
self._save_and_emit(state)
|
|
|
|
cast = artifacts.load_cast(slug)
|
|
gemma = LLM() if get_settings().dedup_use_gemma else None
|
|
chars = dedup_cast(cast.characters, gemma)
|
|
artifacts.save_cast(slug, Cast(
|
|
narrator_voice_id=cast.narrator_voice_id, characters=chars))
|
|
self._finish(state)
|
|
self.enqueue(slug, job)
|
|
|
|
def run_pronounce(self, slug: str) -> None:
|
|
def job() -> None:
|
|
from ..analysis.llm.client import LLM
|
|
from ..analysis.pronunciation import (
|
|
merge_pronunciations,
|
|
propose_pronunciations,
|
|
)
|
|
|
|
state = load_state(slug)
|
|
book = load_book(slug)
|
|
state.stages["pronounce"] = StageStatus.RUNNING
|
|
state.active_stage = "pronounce"
|
|
self._save_and_emit(state)
|
|
|
|
gemma = LLM()
|
|
pron = artifacts.load_pronunciation(slug)
|
|
targets = book.render_chapters[:3] # echantillon de chapitres
|
|
for i, ch in enumerate(targets):
|
|
state.active_detail = f"Mots a risque — {ch.title}"
|
|
state.active_progress = i / max(len(targets), 1)
|
|
self._save_and_emit(state)
|
|
ct = load_chapter_text(slug, ch)
|
|
pron = merge_pronunciations(
|
|
pron, propose_pronunciations("\n".join(ct.paragraphs), gemma))
|
|
artifacts.save_pronunciation(slug, pron)
|
|
state.stages["pronounce"] = StageStatus.DONE
|
|
self._finish(state)
|
|
self.enqueue(slug, job)
|
|
|
|
def run_render(self, slug: str, chapter_indexes: list[int],
|
|
backend: Optional[str] = None, mono: bool = False) -> None:
|
|
from ..settings import get_settings
|
|
backend = backend or get_settings().default_backend
|
|
|
|
def job() -> None:
|
|
from ..casting.voicebank import load_voicebank, voice_spec_for
|
|
from ..pipeline.render import (
|
|
build_units_mono,
|
|
build_units_multi,
|
|
make_voice_resolver,
|
|
render_chapter_to_mp3,
|
|
)
|
|
from ..tts.factory import get_backend
|
|
|
|
state = load_state(slug)
|
|
book = load_book(slug)
|
|
state.stages["render"] = StageStatus.RUNNING
|
|
state.active_stage = "render"
|
|
self._save_and_emit(state)
|
|
|
|
tts = get_backend(backend)
|
|
pron = artifacts.load_pronunciation(slug)
|
|
cast = artifacts.load_cast(slug)
|
|
vb = load_voicebank()
|
|
render_list = [c for c in book.render_chapters if c.index in chapter_indexes]
|
|
|
|
for ch in render_list:
|
|
rs = state.render.get(ch.index) or ChapterRenderState(index=ch.index)
|
|
rs.status = StageStatus.RUNNING
|
|
rs.progress = 0.0
|
|
rs.backend = backend
|
|
state.render[ch.index] = rs
|
|
state.active_detail = f"Synthese — {ch.title}"
|
|
self._save_and_emit(state)
|
|
try:
|
|
ct = load_chapter_text(slug, ch)
|
|
if mono or ch.index not in state.analyzed_chapters:
|
|
units = build_units_mono(ct, tts.default_voice())
|
|
else:
|
|
analysis = artifacts.load_analysis(slug, ch.index)
|
|
narr = vb.by_id(cast.narrator_voice_id) if cast.narrator_voice_id else None
|
|
default_voice = (voice_spec_for(narr, backend)
|
|
if narr else tts.default_voice())
|
|
resolver = make_voice_resolver(cast, vb, backend)
|
|
units = build_units_multi(analysis, resolver, default_voice)
|
|
|
|
def _p(done: int, total: int, _rs=rs, _state=state) -> None:
|
|
_rs.progress = done / max(total, 1)
|
|
_state.active_progress = _rs.progress
|
|
self._save_and_emit(_state)
|
|
|
|
track = book.render_chapters.index(ch) + 1
|
|
mp3 = render_chapter_to_mp3(book, ch, units, tts, pron=pron,
|
|
track=track, progress=_p)
|
|
rs.status = StageStatus.DONE
|
|
rs.progress = 1.0
|
|
rs.mp3 = mp3.name
|
|
except Exception as exc: # noqa: BLE001
|
|
rs.status = StageStatus.ERROR
|
|
rs.error = str(exc)
|
|
self._save_and_emit(state)
|
|
|
|
state.stages["render"] = StageStatus.DONE
|
|
self._finish(state)
|
|
self.enqueue(slug, job)
|
|
|
|
def _finish(self, state: ProjectState) -> None:
|
|
state.active_stage = None
|
|
state.active_detail = None
|
|
state.active_progress = 0.0
|
|
self._save_and_emit(state)
|
|
|
|
|
|
# Singleton partage par l'API.
|
|
orchestrator = Orchestrator()
|