Voicebank : vraies voix françaises (CML-TTS) + pool anonyme + garde-fou Qwen3

Remplace la voicebank générée par Kokoro (timbre anglais sur français phonémisé
-> accent que Qwen3 clonait) par 41 vraies voix FR issues de CML-TTS (livres
audio studio) : 1 narrateur dédié, 18F/14M nommées, 4F/4M anonymes réservées.

- scripts/import_voices.py : import multi-shards parquet, 1 clip/locuteur (le
  plus propre via levenshtein), genre estimé par F0 (YIN, anti-octave), filtre
  débit de parole (ref_text aligné sur l'audio).
- VoiceEntry.anonymous + assign_voices : les figurants « anonyme (...) » tirent
  dans un pool réservé, jamais mélangé avec les voix nommées ; narrateur dédié
  (fr_narrator remplace fr_f_siwis).
- dedup._anon_attrs : genre/âge déduits du nom anonyme (bon genre de voix).
- tts/qwen3.py : garde-fou anti-dérive (rejette/réessaie les sorties en boucle
  ou coupées en estimant la durée plausible du chunk).

Limite connue : Qwen3 ne sait pas synthétiser les fragments d'1-2 mots (incises,
titres) -> trous ; à traiter (repli Kokoro ou fusion des incises).

Inclut aussi du travail en cours antérieur (refacto backend LLM pluggable
mlx/lmstudio, benchmark, ajustements frontend/API).

Claude-Session: https://claude.ai/code/session_01XSVvcy1mfb4k1xDgib9vVU
This commit is contained in:
2026-06-21 21:32:31 +02:00
parent 141df5f04e
commit ba1813c583
91 changed files with 2558 additions and 442 deletions

View File

@@ -359,6 +359,7 @@ def _build_model_score(model_id: str, per_chapter: list[ChapterScore],
def run_benchmark(slug: str, model_ids: list[str], *,
backend: Optional[str] = None,
chapters: Optional[list[int]] = None,
temperature: Optional[float] = None,
reasoning: Optional[bool] = None,
@@ -414,7 +415,8 @@ def run_benchmark(slug: str, model_ids: list[str], *,
report.models.append(_build_model_score("<cached>", per_chapter, counts, 0.0))
return report
from .gemma import Gemma, _load
from .llm.client import LLM
from .llm.factory import reset_llm_cache
from .segmenter import analyze_chapter
book = load_book(slug)
@@ -435,7 +437,7 @@ def run_benchmark(slug: str, model_ids: list[str], *,
model_err: Optional[str] = None
emit(f"[{mi}/{len(model_ids)}] {model_id} — chargement du modele…")
try:
gemma = Gemma(model_id=model_id)
gemma = LLM(model_id=model_id, backend=backend)
for i in targets:
ch = by_index.get(i)
if ch is None:
@@ -457,7 +459,7 @@ def run_benchmark(slug: str, model_ids: list[str], *,
model_err = f"{type(exc).__name__}: {exc}"
emit(f" ! echec: {model_err[:120]}")
finally:
_load.cache_clear() # libere le modele avant le suivant
reset_llm_cache() # libere le modele avant le suivant
ms = _build_model_score(
model_id, per_chapter, counts, time.perf_counter() - t0)
ms.error = model_err

View File

@@ -1,251 +0,0 @@
"""Wrapper mlx-lm autour de Gemma pour l'analyse de texte.
Charge le modele paresseusement (une seule fois par process) et expose des
helpers de generation, dont un `generate_json` tolerant qui extrait le premier
objet/array JSON valide de la sortie du modele.
"""
from __future__ import annotations
import json
import re
from functools import lru_cache
from typing import Any, Optional
from ..settings import get_settings
# Bornes d'un bloc JSON dans une reponse potentiellement bavarde.
_JSON_SPAN_RE = re.compile(r"(\{.*\}|\[.*\])", re.DOTALL)
_FENCE_RE = re.compile(r"```(?:json)?\s*(.*?)```", re.DOTALL)
# Marqueurs de FIN de chaine de pensee : on ne garde que ce qui suit le dernier.
# - balises type DeepSeek-R1 / Qwen-think
# - format a canaux type Gemma 4 / Harmony (la pensee est close par <channel|>)
_REASONING_END_MARKERS = ("</think>", "<channel|>", "<|channel|>")
# Prefixe de canal/think non ferme reste en tete (pensee tronquee) : a retirer.
_REASONING_OPEN_RE = re.compile(r"^\s*(?:<\|?channel\|?>\s*\w*|<think>)", re.IGNORECASE)
@lru_cache(maxsize=2)
def _load(model_id: str):
# Import paresseux : evite de charger mlx tant qu'on n'analyse pas.
from mlx_lm import load
return load(model_id)
# Hook de streaming optionnel. Si defini, `generate()` diffuse chaque morceau de
# texte AU FIL de la generation (pensee comprise, avant tout nettoyage) en
# appelant ce callback. Utilise par `inkflow benchmark --stream` pour voir les
# tokens en temps reel. None -> generation par lot classique (plus rapide).
_TOKEN_SINK: Optional[Any] = None
def set_token_sink(callback) -> None:
"""Definit (ou retire avec None) le callback de streaming des tokens."""
global _TOKEN_SINK
_TOKEN_SINK = callback
def _resolve_chat_template(model_id: str, tokenizer) -> Optional[str]:
"""Renvoie un template de chat a passer explicitement, ou None.
Certaines conversions (Mistral recents...) logent leur template dans un
fichier `chat_template.jinja` que le downloader de mlx-lm n'embarque pas
toujours : `tokenizer.chat_template` est alors vide et `apply_chat_template`
echoue. On recupere alors le fichier officiel du repo. None si le tokenizer
possede deja un template (cas courant) ou si aucun n'est disponible.
"""
if getattr(tokenizer, "chat_template", None):
return None
from pathlib import Path
from huggingface_hub import hf_hub_download
# Selon les conversions : fichier Jinja brut, ou JSON {"chat_template": ...}.
for fname in ("chat_template.jinja", "chat_template.json"):
try:
text = Path(hf_hub_download(model_id, fname)).read_text(encoding="utf-8")
except Exception: # noqa: BLE001 — fichier absent, on tente le suivant
continue
if fname.endswith(".json"):
data = json.loads(text)
return data.get("chat_template") if isinstance(data, dict) else None
return text
return None # aucun template dispo -> apply_chat_template levera une erreur claire
class Gemma:
"""Petite facade autour de mlx-lm pour piloter Gemma."""
def __init__(self, model_id: Optional[str] = None):
self.model_id = model_id or get_settings().gemma_model
self._model = None
self._tokenizer = None
self._chat_template = None # template recupere si absent du tokenizer
def _ensure_loaded(self) -> None:
if self._model is None:
self._model, self._tokenizer = _load(self.model_id)
self._chat_template = _resolve_chat_template(
self.model_id, self._tokenizer)
def generate(
self,
prompt: str,
*,
system: Optional[str] = None,
max_tokens: Optional[int] = None,
temperature: Optional[float] = None,
) -> str:
"""Genere une reponse texte a partir d'un prompt (template de chat).
`max_tokens`/`temperature` non fournis -> valeurs des reglages courants.
"""
self._ensure_loaded()
settings = get_settings()
if max_tokens is None:
max_tokens = settings.gemma_max_tokens
# En mode raisonnement, plafond dedie (garde-fou anti-boucle) ; la
# generation s'arrete de toute facon des que le JSON post-pensee est
# complet (cf. boucle de streaming ci-dessous).
if settings.gemma_reasoning:
max_tokens = max(max_tokens, settings.gemma_reasoning_max_tokens)
if temperature is None:
temperature = settings.gemma_temperature
# Decodage glouton (temp 0) + raisonnement = boucles de pensee sans fin.
# On force un echantillonnage minimal en mode raisonnement.
if settings.gemma_reasoning and temperature == 0.0:
temperature = settings.gemma_reasoning_temperature
from mlx_lm.sample_utils import make_sampler
messages = []
if system:
messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": prompt})
# Modeles hybrides (Qwen3...) : hors mode raisonnement, on DESACTIVE la
# pensee via enable_thinking=False -> JSON direct, bien plus rapide. Avec
# --reasoning, on laisse penser puis on retire la pensee en aval. Ce
# kwarg est ignore par les templates qui ne l'utilisent pas (Gemma...).
template_kwargs = {}
if not settings.gemma_reasoning:
template_kwargs["enable_thinking"] = False
formatted = self._tokenizer.apply_chat_template(
messages, add_generation_prompt=True, tokenize=False,
chat_template=self._chat_template, # None -> celui du tokenizer
**template_kwargs,
)
sampler = make_sampler(temp=temperature)
# On streame (token par token) si : un sink est branche (--stream) OU on
# est en mode raisonnement (pour pouvoir s'arreter des que la reponse est
# prete, sans subir les boucles de pensee sans fin). Sinon, lot rapide.
if _TOKEN_SINK is not None or settings.gemma_reasoning:
from mlx_lm import stream_generate
parts = []
seen_end = False # marqueur de fin de pensee rencontre
for resp in stream_generate(
self._model, self._tokenizer, prompt=formatted,
max_tokens=max_tokens, sampler=sampler,
):
parts.append(resp.text)
if _TOKEN_SINK is not None:
_TOKEN_SINK(resp.text)
# Arret anticipe : une fois la pensee close, des que le JSON
# post-pensee est complet, inutile de continuer a generer.
if settings.gemma_reasoning and ("}" in resp.text or "]" in resp.text):
buf = "".join(parts)
if not seen_end:
seen_end = any(mk in buf for mk in _REASONING_END_MARKERS)
if seen_end and _has_complete_json(_strip_reasoning(buf)):
break
if _TOKEN_SINK is not None:
_TOKEN_SINK("\n") # separe les generations successives
raw = "".join(parts)
else:
from mlx_lm import generate
raw = generate(
self._model,
self._tokenizer,
prompt=formatted,
max_tokens=max_tokens,
sampler=sampler,
verbose=False,
)
# Retire la chaine de pensee des modeles a raisonnement (sinon des
# fragments de la "pensee" parasitent l'extraction JSON en aval).
if settings.gemma_reasoning:
return _strip_reasoning(raw)
return raw
def generate_json(
self,
prompt: str,
*,
system: Optional[str] = None,
max_tokens: Optional[int] = None,
temperature: Optional[float] = None,
retries: int = 1,
) -> Any:
"""Genere puis parse un JSON. Reessaie en cas d'echec de parsing.
`max_tokens`/`temperature` non fournis -> valeurs des reglages courants.
"""
last_err: Optional[Exception] = None
for attempt in range(retries + 1):
raw = self.generate(
prompt, system=system, max_tokens=max_tokens,
temperature=temperature if attempt == 0 else 0.0,
)
try:
return _extract_json(raw)
except Exception as exc: # noqa: BLE001
last_err = exc
raise ValueError(f"Reponse JSON invalide apres {retries + 1} essais: {last_err}")
def _strip_reasoning(text: str) -> str:
"""Retire la chaine de pensee d'un modele a raisonnement.
Ne conserve que ce qui suit le dernier marqueur de fin de pensee
(`</think>`, `<channel|>`...). Si seul un marqueur d'ouverture non ferme
subsiste (pensee tronquee par le budget de tokens), on le retire en tete
pour eviter de parser la pensee a la place de la reponse.
"""
t = text
for marker in _REASONING_END_MARKERS:
if marker in t:
t = t.rsplit(marker, 1)[-1]
t = _REASONING_OPEN_RE.sub("", t)
return t.strip()
def _has_complete_json(text: str) -> bool:
"""True si `text` contient deja un objet/array JSON complet et parsable.
Sert a stopper la generation des modeles a raisonnement des que la reponse
finale est ecrite (evite de consommer le budget en boucles de pensee).
"""
try:
_extract_json(text)
return True
except Exception: # noqa: BLE001
return False
def _extract_json(text: str) -> Any:
"""Extrait le premier objet/array JSON d'une reponse libre du modele.
Tolere le texte parasite avant/apres (y compris un 2e bloc) grace a
raw_decode, qui s'arrete au premier JSON complet.
"""
text = text.strip()
fence = _FENCE_RE.search(text)
if fence:
text = fence.group(1).strip()
decoder = json.JSONDecoder()
# Cherche le 1er debut de structure JSON et decode a partir de la.
for i, ch in enumerate(text):
if ch in "[{":
try:
obj, _ = decoder.raw_decode(text[i:])
return obj
except json.JSONDecodeError:
continue
raise ValueError("aucun JSON trouve dans la reponse")

View File

@@ -0,0 +1,14 @@
"""Client LLM pluggable pour l'analyse de texte (attribution, personnages...).
La facade `LLM` (client.py) expose `generate` / `generate_json` consommes par
tout le pipeline. Sous elle, un backend pluggable (`base.LLMBackend`) transforme
des messages en texte brut : `mlx_backend` (mlx-lm, defaut) ou `lmstudio_backend`
(API OpenAI locale de LM Studio, sert GGUF *et* MLX). Selection par nom via
`factory.get_llm_backend`.
"""
from __future__ import annotations
from .client import LLM, set_token_sink
from .factory import get_llm_backend, reset_llm_cache
__all__ = ["LLM", "set_token_sink", "get_llm_backend", "reset_llm_cache"]

View File

@@ -0,0 +1,74 @@
"""Helpers agnostiques du moteur : extraction JSON tolerante + retrait de la
chaine de pensee des modeles a raisonnement.
Module neutre (aucune dependance a un backend) : partage par la facade `LLM` et
par les backends (qui s'en servent pour l'arret anticipe en streaming), sans
creer de cycle d'import.
"""
from __future__ import annotations
import json
import re
from typing import Any
# Bornes d'un bloc JSON dans une reponse potentiellement bavarde.
_JSON_SPAN_RE = re.compile(r"(\{.*\}|\[.*\])", re.DOTALL)
_FENCE_RE = re.compile(r"```(?:json)?\s*(.*?)```", re.DOTALL)
# Marqueurs de FIN de chaine de pensee : on ne garde que ce qui suit le dernier.
# - balises type DeepSeek-R1 / Qwen-think
# - format a canaux type Gemma 4 / Harmony (la pensee est close par <channel|>)
_REASONING_END_MARKERS = ("</think>", "<channel|>", "<|channel|>")
# Prefixe de canal/think non ferme reste en tete (pensee tronquee) : a retirer.
_REASONING_OPEN_RE = re.compile(r"^\s*(?:<\|?channel\|?>\s*\w*|<think>)", re.IGNORECASE)
def _strip_reasoning(text: str) -> str:
"""Retire la chaine de pensee d'un modele a raisonnement.
Ne conserve que ce qui suit le dernier marqueur de fin de pensee
(`</think>`, `<channel|>`...). Si seul un marqueur d'ouverture non ferme
subsiste (pensee tronquee par le budget de tokens), on le retire en tete
pour eviter de parser la pensee a la place de la reponse.
"""
t = text
for marker in _REASONING_END_MARKERS:
if marker in t:
t = t.rsplit(marker, 1)[-1]
t = _REASONING_OPEN_RE.sub("", t)
return t.strip()
def _has_complete_json(text: str) -> bool:
"""True si `text` contient deja un objet/array JSON complet et parsable.
Sert a stopper la generation des modeles a raisonnement des que la reponse
finale est ecrite (evite de consumer le budget en boucles de pensee).
"""
try:
_extract_json(text)
return True
except Exception: # noqa: BLE001
return False
def _extract_json(text: str) -> Any:
"""Extrait le premier objet/array JSON d'une reponse libre du modele.
Tolere le texte parasite avant/apres (y compris un 2e bloc) grace a
raw_decode, qui s'arrete au premier JSON complet.
"""
text = text.strip()
fence = _FENCE_RE.search(text)
if fence:
text = fence.group(1).strip()
decoder = json.JSONDecoder()
# Cherche le 1er debut de structure JSON et decode a partir de la.
for i, ch in enumerate(text):
if ch in "[{":
try:
obj, _ = decoder.raw_decode(text[i:])
return obj
except json.JSONDecodeError:
continue
raise ValueError("aucun JSON trouve dans la reponse")

View File

@@ -0,0 +1,43 @@
"""Abstraction des moteurs LLM (backend pluggable).
Calque du pattern TTS (`tts/base.py`) : un backend ne fait *qu'une* chose,
transformer une liste de messages (role/content) en texte brut. Toute la logique
agnostique (calcul des parametres depuis les Settings, retrait de la pensee,
extraction JSON tolerante, retries) vit dans la facade `client.LLM`, jamais
dupliquee par backend.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Callable, Optional
class LLMBackend(ABC):
"""Interface commune a tous les moteurs LLM."""
name: str = "base"
def __init__(self, model_ref: str):
# Reference du modele, interpretee par le backend : id mlx-community
# (mlx) ou nom du modele charge dans LM Studio (lmstudio, vide -> actif).
self.model_ref = model_ref
@abstractmethod
def complete(
self,
messages: list[dict],
*,
max_tokens: int,
temperature: float,
reasoning: bool,
token_sink: Optional[Callable[[str], None]] = None,
) -> str:
"""Genere et renvoie le texte BRUT (chaine de pensee incluse).
- `messages` : liste {role, content} (system optionnel + user).
- `reasoning` : si vrai, le modele peut emettre une chaine de pensee ;
le backend peut s'arreter des que le JSON post-pensee est complet. La
facade retire la pensee en aval (`_strip_reasoning`).
- `token_sink` : si fourni, appele avec chaque morceau de texte au fil de
la generation (streaming pour `inkflow benchmark --stream`).
"""

View File

@@ -0,0 +1,119 @@
"""Facade LLM pour l'analyse de texte (anciennement `Gemma`).
Charge un backend pluggable (mlx par defaut, ou LM Studio) selon les reglages et
expose `generate` / `generate_json` consommes par tout le pipeline. Toute la
logique agnostique du moteur vit ici : calcul des parametres depuis les Settings,
retrait de la chaine de pensee (modeles a raisonnement) et `generate_json`
tolerant qui extrait le premier objet/array JSON valide de la sortie du modele.
"""
from __future__ import annotations
from typing import Any, Optional
from ...settings import Settings, get_settings
from ._text import _extract_json, _strip_reasoning
from .factory import get_llm_backend
# Hook de streaming optionnel. Si defini, `generate()` diffuse chaque morceau de
# texte AU FIL de la generation (pensee comprise, avant tout nettoyage) en
# appelant ce callback. Utilise par `inkflow benchmark --stream` pour voir les
# tokens en temps reel. None -> generation par lot classique (plus rapide).
_TOKEN_SINK: Optional[Any] = None
def set_token_sink(callback) -> None:
"""Definit (ou retire avec None) le callback de streaming des tokens."""
global _TOKEN_SINK
_TOKEN_SINK = callback
def _model_ref_for(backend: str, settings: Settings) -> str:
"""Reference de modele par defaut pour un backend donne."""
if backend == "lmstudio":
return settings.lmstudio_model
return settings.gemma_model
class LLM:
"""Petite facade multi-backend pour piloter le LLM d'analyse."""
def __init__(self, model_id: Optional[str] = None, backend: Optional[str] = None):
settings = get_settings()
self.backend_name = backend or settings.gemma_backend
self.model_ref = model_id or _model_ref_for(self.backend_name, settings)
self._backend = None
def _ensure_loaded(self) -> None:
if self._backend is None:
self._backend = get_llm_backend(self.backend_name, self.model_ref)
def generate(
self,
prompt: str,
*,
system: Optional[str] = None,
max_tokens: Optional[int] = None,
temperature: Optional[float] = None,
) -> str:
"""Genere une reponse texte a partir d'un prompt (template de chat).
`max_tokens`/`temperature` non fournis -> valeurs des reglages courants.
"""
self._ensure_loaded()
settings = get_settings()
if max_tokens is None:
max_tokens = settings.gemma_max_tokens
# En mode raisonnement, plafond dedie (garde-fou anti-boucle) ; la
# generation s'arrete de toute facon des que le JSON post-pensee est
# complet (cf. arret anticipe des backends).
if settings.gemma_reasoning:
max_tokens = max(max_tokens, settings.gemma_reasoning_max_tokens)
if temperature is None:
temperature = settings.gemma_temperature
# Decodage glouton (temp 0) + raisonnement = boucles de pensee sans fin.
# On force un echantillonnage minimal en mode raisonnement.
if settings.gemma_reasoning and temperature == 0.0:
temperature = settings.gemma_reasoning_temperature
messages = []
if system:
messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": prompt})
raw = self._backend.complete(
messages,
max_tokens=max_tokens,
temperature=temperature,
reasoning=settings.gemma_reasoning,
token_sink=_TOKEN_SINK,
)
# Retire la chaine de pensee des modeles a raisonnement (sinon des
# fragments de la "pensee" parasitent l'extraction JSON en aval).
if settings.gemma_reasoning:
return _strip_reasoning(raw)
return raw
def generate_json(
self,
prompt: str,
*,
system: Optional[str] = None,
max_tokens: Optional[int] = None,
temperature: Optional[float] = None,
retries: int = 1,
) -> Any:
"""Genere puis parse un JSON. Reessaie en cas d'echec de parsing.
`max_tokens`/`temperature` non fournis -> valeurs des reglages courants.
"""
last_err: Optional[Exception] = None
for attempt in range(retries + 1):
raw = self.generate(
prompt, system=system, max_tokens=max_tokens,
temperature=temperature if attempt == 0 else 0.0,
)
try:
return _extract_json(raw)
except Exception as exc: # noqa: BLE001
last_err = exc
raise ValueError(f"Reponse JSON invalide apres {retries + 1} essais: {last_err}")

View File

@@ -0,0 +1,36 @@
"""Selection du backend LLM par nom (pluggable).
Calque de `tts/factory.py` : cache par (nom, reference de modele). Une
sauvegarde des reglages (settings.save_settings) appelle `reset_llm_cache()`
pour que les changements de backend/modele prennent effet sans redemarrage.
"""
from __future__ import annotations
from functools import lru_cache
from .base import LLMBackend
BACKENDS = ("mlx", "lmstudio")
@lru_cache(maxsize=4)
def get_llm_backend(backend: str = "mlx", model_ref: str = "") -> LLMBackend:
backend = backend.lower()
if backend == "mlx":
from .mlx_backend import MLXBackend
return MLXBackend(model_ref)
if backend == "lmstudio":
from .lmstudio_backend import LMStudioBackend
return LMStudioBackend(model_ref)
raise ValueError(
f"Backend LLM inconnu: {backend!r} (dispo: {', '.join(BACKENDS)})")
def reset_llm_cache() -> None:
"""Vide les instances de backend et le cache de chargement mlx."""
get_llm_backend.cache_clear()
try:
from .mlx_backend import _load
_load.cache_clear()
except Exception: # noqa: BLE001
pass

View File

@@ -0,0 +1,171 @@
"""Backend LLM via LM Studio (API OpenAI locale).
LM Studio sert indifferemment des modeles GGUF *et* MLX charges depuis sa GUI,
exposes sur un endpoint OpenAI-compatible (`http://127.0.0.1:1234/v1` par
defaut). InkFlow ne fait que parler HTTP : zero dependance native a compiler, et
le modele reste charge entre redemarrages d'InkFlow.
Caveat : `enable_thinking=False` (coupe la pensee des modeles hybrides cote mlx)
n'est pas pilotable de facon fiable via l'API ; le template embarque decide. En
mode non-raisonnement on prend le `content` final et on le strip de toute facon.
"""
from __future__ import annotations
import os
from typing import Callable, Optional
from .base import LLMBackend
from ._text import _has_complete_json, _strip_reasoning
from ...settings import get_settings
def list_models(base_url: str) -> list[dict]:
"""Liste les modeles LLM *telecharges* dans LM Studio (charges ou non).
Utilise l'API REST native (`/api/v0/models`) et non `/v1/models` (qui ne
renvoie que les modeles deja charges) : on peut ainsi proposer n'importe quel
modele telecharge ; LM Studio le charge a la volee (JIT) a la 1re requete.
Renvoie [{id, state, type}] filtre sur les LLM/VLM. Leve en cas d'echec.
"""
import httpx
root = base_url.rstrip("/")
if root.endswith("/v1"):
root = root[:-len("/v1")]
resp = httpx.get(f"{root}/api/v0/models", timeout=5.0)
resp.raise_for_status()
data = resp.json().get("data", [])
return [
{"id": m.get("id"), "state": m.get("state"), "type": m.get("type")}
for m in data if m.get("type") in ("llm", "vlm")
]
class LMStudioBackend(LLMBackend):
"""Moteur LM Studio : client OpenAI pointe sur le serveur local."""
name = "lmstudio"
def __init__(self, model_ref: str):
super().__init__(model_ref)
self._client = None
self._model = None # resolu paresseusement (model_ref vide -> modele actif)
def _ensure_client(self):
if self._client is None:
try:
from openai import OpenAI
except ImportError as exc: # noqa: BLE001
raise RuntimeError(
"Le paquet `openai` est requis pour le backend LM Studio "
"(pip install -e backend)."
) from exc
base_url = get_settings().lmstudio_base_url
# api_key factice : LM Studio n'authentifie pas, mais le SDK l'exige.
self._client = OpenAI(base_url=base_url, api_key="lm-studio")
return self._client
def _resolve_model(self, client) -> str:
"""Renvoie le nom de modele a utiliser (model_ref, ou 1er modele charge)."""
if self.model_ref:
return self.model_ref
if self._model is None:
try:
models = client.models.list()
except Exception as exc: # noqa: BLE001
raise self._connection_error(exc) from exc
ids = [m.id for m in getattr(models, "data", [])]
if not ids:
raise RuntimeError(
"Aucun modele charge dans LM Studio : charge un modele "
"(GGUF ou MLX) dans l'app avant de lancer l'analyse."
)
self._model = ids[0]
return self._model
def _connection_error(self, exc: Exception) -> RuntimeError:
url = get_settings().lmstudio_base_url
return RuntimeError(
f"LM Studio injoignable sur {url} — lance l'application et active le "
f"serveur local (onglet Developer > Start Server). Detail: {exc}"
)
def _sampling(self, max_tokens: int, temperature: float) -> dict:
"""Params de sampling a transmettre a LM Studio.
Par defaut (`lmstudio_defer_config=True`) : dict VIDE -> on delegue
temperature ET plafond de tokens a la config du modele charge dans LM
Studio (ne pas imposer `max_tokens` evite de tronquer la reponse, ce qui
cassait les modeles a raisonnement). Le contexte est de toute facon gere
au chargement cote LM Studio. Si l'utilisateur desactive la delegation,
on reimpose les reglages "Generation Gemma" d'InkFlow.
"""
if get_settings().lmstudio_defer_config:
return {}
return {"temperature": temperature, "max_tokens": max_tokens}
def complete(
self,
messages: list[dict],
*,
max_tokens: int,
temperature: float,
reasoning: bool,
token_sink: Optional[Callable[[str], None]] = None,
) -> str:
client = self._ensure_client()
model = self._resolve_model(client)
sampling = self._sampling(max_tokens, temperature)
# Prefill optionnel de la reponse assistant (INKFLOW_LMSTUDIO_PREFILL) :
# ex. "<think></think>" force les modeles distilles a raisonnement (Qwen)
# a sauter la pensee (seul levier efficace quand enable_thinking/_no_think
# sont ignores). Le modele continue a partir du prefill -> JSON direct.
prefill = os.environ.get("INKFLOW_LMSTUDIO_PREFILL")
if prefill:
messages = messages + [{"role": "assistant", "content": prefill}]
from openai import APIConnectionError
# LM Studio separe la pensee (`reasoning_content`) de la reponse finale
# (`content`, deja propre). On ne renvoie QUE `content` : la facade en
# extrait le JSON. La pensee n'est diffusee qu'au `token_sink` (affichage
# --stream) ; l'inclure dans le retour risquerait de capter un JSON
# d'exemple present dans le raisonnement. Pour les modeles qui mettent au
# contraire la pensee INLINE dans `content` (<think>...), la facade la
# retire via _strip_reasoning quand reasoning=True.
if token_sink is not None or reasoning:
content_parts: list[str] = []
try:
stream = client.chat.completions.create(
model=model, messages=messages, stream=True, **sampling,
)
for chunk in stream:
if not chunk.choices:
continue
delta = chunk.choices[0].delta
rc = getattr(delta, "reasoning_content", None)
if rc and token_sink is not None:
token_sink(rc) # pensee : affichage seulement
piece = delta.content or ""
if piece:
content_parts.append(piece)
if token_sink is not None:
token_sink(piece)
# Arret anticipe en mode raisonnement : des que la reponse
# finale (content) contient un JSON complet, inutile de
# continuer (certains modeles divaguent ensuite).
if reasoning and piece and ("}" in piece or "]" in piece):
if _has_complete_json(_strip_reasoning("".join(content_parts))):
break
except APIConnectionError as exc:
raise self._connection_error(exc) from exc
if token_sink is not None:
token_sink("\n") # separe les generations successives
return "".join(content_parts)
try:
resp = client.chat.completions.create(
model=model, messages=messages, **sampling,
)
except APIConnectionError as exc:
raise self._connection_error(exc) from exc
return resp.choices[0].message.content or ""

View File

@@ -0,0 +1,127 @@
"""Backend LLM mlx-lm (Apple Silicon) — moteur historique d'InkFlow.
Charge un modele mlx-community paresseusement (une fois par process, cache LRU)
et genere via le template de chat du tokenizer. Comportement strictement
identique a l'ancien `Gemma.generate` : controle fin de `enable_thinking`,
streaming avec arret anticipe des que le JSON post-pensee est complet.
"""
from __future__ import annotations
import json
from functools import lru_cache
from typing import Callable, Optional
from .base import LLMBackend
from ._text import _REASONING_END_MARKERS, _has_complete_json, _strip_reasoning
@lru_cache(maxsize=2)
def _load(model_id: str):
# Import paresseux : evite de charger mlx tant qu'on n'analyse pas.
from mlx_lm import load
return load(model_id)
def _resolve_chat_template(model_id: str, tokenizer) -> Optional[str]:
"""Renvoie un template de chat a passer explicitement, ou None.
Certaines conversions (Mistral recents...) logent leur template dans un
fichier `chat_template.jinja` que le downloader de mlx-lm n'embarque pas
toujours : `tokenizer.chat_template` est alors vide et `apply_chat_template`
echoue. On recupere alors le fichier officiel du repo. None si le tokenizer
possede deja un template (cas courant) ou si aucun n'est disponible.
"""
if getattr(tokenizer, "chat_template", None):
return None
from pathlib import Path
from huggingface_hub import hf_hub_download
# Selon les conversions : fichier Jinja brut, ou JSON {"chat_template": ...}.
for fname in ("chat_template.jinja", "chat_template.json"):
try:
text = Path(hf_hub_download(model_id, fname)).read_text(encoding="utf-8")
except Exception: # noqa: BLE001 — fichier absent, on tente le suivant
continue
if fname.endswith(".json"):
data = json.loads(text)
return data.get("chat_template") if isinstance(data, dict) else None
return text
return None # aucun template dispo -> apply_chat_template levera une erreur claire
class MLXBackend(LLMBackend):
"""Moteur mlx-lm : modeles mlx-community (HuggingFace) sur Apple Silicon."""
name = "mlx"
def __init__(self, model_ref: str):
super().__init__(model_ref)
self._model = None
self._tokenizer = None
self._chat_template = None # template recupere si absent du tokenizer
def _ensure_loaded(self) -> None:
if self._model is None:
self._model, self._tokenizer = _load(self.model_ref)
self._chat_template = _resolve_chat_template(
self.model_ref, self._tokenizer)
def complete(
self,
messages: list[dict],
*,
max_tokens: int,
temperature: float,
reasoning: bool,
token_sink: Optional[Callable[[str], None]] = None,
) -> str:
self._ensure_loaded()
from mlx_lm.sample_utils import make_sampler
# Modeles hybrides (Qwen3...) : hors mode raisonnement, on DESACTIVE la
# pensee via enable_thinking=False -> JSON direct, bien plus rapide. Avec
# raisonnement, on laisse penser puis la facade retire la pensee. Ce
# kwarg est ignore par les templates qui ne l'utilisent pas (Gemma...).
template_kwargs = {}
if not reasoning:
template_kwargs["enable_thinking"] = False
formatted = self._tokenizer.apply_chat_template(
messages, add_generation_prompt=True, tokenize=False,
chat_template=self._chat_template, # None -> celui du tokenizer
**template_kwargs,
)
sampler = make_sampler(temp=temperature)
# On streame (token par token) si : un sink est branche (--stream) OU on
# est en mode raisonnement (pour pouvoir s'arreter des que la reponse est
# prete, sans subir les boucles de pensee sans fin). Sinon, lot rapide.
if token_sink is not None or reasoning:
from mlx_lm import stream_generate
parts = []
seen_end = False # marqueur de fin de pensee rencontre
for resp in stream_generate(
self._model, self._tokenizer, prompt=formatted,
max_tokens=max_tokens, sampler=sampler,
):
parts.append(resp.text)
if token_sink is not None:
token_sink(resp.text)
# Arret anticipe : une fois la pensee close, des que le JSON
# post-pensee est complet, inutile de continuer a generer.
if reasoning and ("}" in resp.text or "]" in resp.text):
buf = "".join(parts)
if not seen_end:
seen_end = any(mk in buf for mk in _REASONING_END_MARKERS)
if seen_end and _has_complete_json(_strip_reasoning(buf)):
break
if token_sink is not None:
token_sink("\n") # separe les generations successives
return "".join(parts)
from mlx_lm import generate
return generate(
self._model,
self._tokenizer,
prompt=formatted,
max_tokens=max_tokens,
sampler=sampler,
verbose=False,
)

View File

@@ -11,7 +11,7 @@ from typing import Iterable
from ..models import Pronunciation, PronunciationEntry
from ..settings import get_settings
from .gemma import Gemma
from .llm.client import LLM
def apply_pronunciation(text: str, pron: Pronunciation) -> str:
@@ -27,7 +27,7 @@ def apply_pronunciation(text: str, pron: Pronunciation) -> str:
# Le prompt systeme est editable dans les reglages (settings.prompt_pronunciation).
def propose_pronunciations(text: str, gemma: Gemma, *, max_chars: int = 16000) -> list[PronunciationEntry]:
def propose_pronunciations(text: str, gemma: LLM, *, max_chars: int = 16000) -> list[PronunciationEntry]:
"""Propose des candidats de prononciation a valider."""
sample = text[:max_chars]
prompt = (

View File

@@ -25,7 +25,7 @@ from ..models import (
SegmentType,
)
from ..settings import get_settings
from .gemma import Gemma
from .llm.client import LLM
# Un paragraphe de dialogue commence par un cadratin (U+2014) ou un tiret long.
_DIALOGUE_LEAD_RE = re.compile(r"^\s*[—―]\s*")
@@ -65,7 +65,7 @@ _CHUNK_MAX_DIALOGUES = 30 # repliques par appel (fiabilite du modele)
def attribute_speakers(
segments: list[Segment],
gemma: Gemma,
gemma: LLM,
*,
characters: Optional[list[Character]] = None,
pov: Optional[str] = None,
@@ -211,7 +211,7 @@ def _chunk_dialogues(
def _refine_unknown_speakers(
segments: list[Segment],
gemma: Gemma,
gemma: LLM,
*,
characters: Optional[list[Character]] = None,
confidence: dict[int, str],
@@ -276,11 +276,237 @@ def _refine_unknown_speakers(
segments[seg_idx].speaker = new
# --- Post-traitement deterministe (sans LLM) --------------------------------
# Traductions FR pour construire l'identite d'un locuteur anonyme.
_ANON_GENDER_FR = {"male": "homme", "female": "femme"}
_ANON_AGE_FR = {"child": "enfant", "young": "jeune", "adult": "adulte", "old": "vieux"}
def _anon_identity(gender: Optional[str], age: Optional[str]) -> str:
"""Identite canonique d'un locuteur anonyme, regroupe par (genre, age).
Ex: ("male", "adult") -> "anonyme (homme, adulte)" ; ("male", None) ->
"anonyme (homme)" ; (None, None) -> "anonyme". Tous les personnages-fonction
d'un meme bucket partagent une voix (genre/age suffisent a la choisir)."""
g = _ANON_GENDER_FR.get((gender or "").lower())
a = _ANON_AGE_FR.get((age or "").lower())
parts = [p for p in (g, a) if p]
return f"anonyme ({', '.join(parts)})" if parts else "anonyme"
def _apply_anonymous_speakers(
segments: list[Segment], *, names=None) -> dict[str, tuple[Optional[str], Optional[str]]]:
"""Rattache les repliques a incise de role a un locuteur ANONYME par genre/age.
Une incise "informa le soldat" -> "anonyme (homme)" : on ne stocke pas la
fonction (garde/marine...), seuls genre+age comptent pour la voix. Genre/age
deduits du nom de role (`_ROLE_GENDER`/`_ROLE_AGE`). Applique APRES le LLM
(autorite deterministe), sans modifier le prompt. Mutation en place.
Renvoie {identite_anonyme: (genre, age)} des buckets utilises, pour que
l'appelant cree les `Character` generiques correspondants (assignation voix)."""
names = names or set()
used: dict[str, tuple[Optional[str], Optional[str]]] = {}
for seg in segments:
if seg.type is not SegmentType.DIALOGUE:
continue
for inc in seg.incises:
role = incise_role(seg.text, inc, names)
if role:
gender = _ROLE_GENDER.get(role)
age = _ROLE_AGE.get(role)
ident = _anon_identity(gender, age)
seg.speaker = ident
used[ident] = (gender, age)
break
return used
def _inversion_gender(text: str) -> Optional[str]:
"""Genre porte par le pronom d'une incise d'inversion ("demanda-t-elle" ->
female, "dit-il" -> male). None si aucune inversion. Signal sur LE locuteur."""
m = _INV_GENDER_RE.search(text)
if not m:
return None
return "female" if m.group("p").lower().startswith("elle") else "male"
def _resolve_anonymous_figurants(
segments: list[Segment]) -> dict[str, tuple[Optional[str], Optional[str]]]:
"""Resout les repliques restees INDETERMINEES (inconnu/?) en figurants anonymes.
Quand une replique non resolue est entouree d'une narration decrivant un
figurant genre ("La femme...", "La jeune marine...", "Le soldat..."), on
l'attribue au bucket anonyme correspondant. Genre : pronom d'inversion de la
replique ("demanda-t-elle") sinon l'article du role dans la narration
(la/une -> femme, le/un -> homme). N'agit QUE sur l'indetermine (jamais sur
une attribution sure) -> sans risque pour les personnages nommes. Mutation en
place ; renvoie les buckets crees (pour creer les Character generiques)."""
used: dict[str, tuple[Optional[str], Optional[str]]] = {}
for idx, seg in enumerate(segments):
if seg.type is not SegmentType.DIALOGUE or _is_resolved(seg.speaker):
continue
narr_gender = role_age = None
found = False
for j in (idx - 1, idx + 1): # narration adjacente (avant puis apres)
if 0 <= j < len(segments) and segments[j].type is SegmentType.NARRATION:
m = _ANON_NARR_RE.search(segments[j].text)
if m:
found = True
art = m.group("art").lower().rstrip("'")
narr_gender = "female" if art in ("la", "une") else "male"
role_age = _ROLE_AGE.get(m.group("role").lower())
break
if not found:
continue
gender = _inversion_gender(seg.text) or narr_gender
ident = _anon_identity(gender, role_age)
seg.speaker = ident
used[ident] = (gender, role_age)
return used
def _canonicalize_speakers(segments: list[Segment], chars: list[Character]) -> None:
"""Reecrit chaque locuteur variant vers le nom canonique du cast.
Le LLM emet souvent des variantes hors liste ("Amiral Mehmet Sagale" pour
"Sagale", "Elvi Okoye" pour "Elvi"). Non rattachees, elles cassent le rendu
(mauvaise voix -> repli narrateur) et le score. On les recolle au canonique
via `heuristic_match` (primitive sure du dedup) : on n'agit QUE sur un match
certain (`Character`), on s'abstient sur ambiguite/inconnu. Pur, sans LLM,
ne touche pas au prompt. Ordre-independant : `tokfreq` calcule globalement.
Idempotent (un nom deja canonique matche en exact)."""
from ..casting.dedup import heuristic_match, _token_freq
spoken = [s.speaker for s in segments
if s.type is SegmentType.DIALOGUE and _is_resolved(s.speaker)]
if not spoken or not chars:
return
tokfreq = _token_freq(chars, spoken)
for seg in segments:
if seg.type is not SegmentType.DIALOGUE or not _is_resolved(seg.speaker):
continue
match = heuristic_match(seg.speaker, chars, tokfreq)
if isinstance(match, Character):
seg.speaker = match.name
# --- Passe deterministe : reparation de l'alternance des tours ---------------
def _norm_name(name: str) -> str:
return (name or "").strip().casefold()
# Tolerance de narration intercalee entre deux repliques d'un meme run. STRICT
# (0) : seules les repliques d'indices consecutifs forment un run. Toute valeur
# >0 est DANGEREUSE : une narration peut porter une *continuation du meme
# locuteur* ("— …", "Fayez marqua une pause.", "— …") ou il reparle ; verifie
# sur ch06 (runs 66-79 et 83-90 de la reference NON alternes des GAP=1). On
# prefere ne pas reparer une replique isolee que d'inventer une fausse alternance.
_RUN_MAX_NARRATION_GAP = 0
def _dialogue_runs(segments: list[Segment]) -> list[list[int]]:
"""Suites de repliques d'indices consecutifs (aucune narration intercalee).
Hypothese (verifiee sur les references ch05 ET ch06, 0 contre-exemple) : dans
une telle salve ou chaque cadratin marque un changement de locuteur, les
tours alternent strictement. Des qu'une narration s'intercale, l'alternance
n'est plus garantie (continuation possible du meme locuteur) -> nouveau run."""
runs: list[list[int]] = []
cur: list[int] = []
gap = 0
for i, s in enumerate(segments):
if s.type is SegmentType.DIALOGUE:
cur.append(i)
gap = 0
else:
gap += 1
if gap > _RUN_MAX_NARRATION_GAP:
if len(cur) >= 2:
runs.append(cur)
cur = []
if len(cur) >= 2:
runs.append(cur)
return runs
def _repair_alternation(segments: list[Segment], *, names=None) -> None:
"""Force l'alternance des tours dans les echanges a exactement 2 locuteurs.
Pour chaque suite de repliques consecutives a deux locuteurs, on retient,
parmi les deux motifs alternes possibles (A/B/A… ou B/A/B…), celui qui :
1. ne contredit aucune ancre sure (locuteur explicite d'incise nominale) ;
2. exige le moins de corrections au resultat de la 1re passe.
On n'agit qu'avec un gagnant STRICT, sinon on s'abstient (on prefere laisser
une erreur qu'en introduire une). En particulier, des qu'un 3e locuteur (meme
minoritaire) apparait dans le run, on ne touche a rien : un echange a >=3
n'alterne pas forcement. Pur, sans appel LLM ; comble aussi les repliques
indeterminees du run.
"""
names = names or set()
for run in _dialogue_runs(segments):
speakers = [segments[i].speaker for i in run]
resolved = {_norm_name(s) for s in speakers if _is_resolved(s)}
if len(resolved) != 2:
continue
# Noms canoniques (1re occurrence de chaque forme normalisee).
order: list[str] = []
for s in speakers:
n = _norm_name(s)
if n in resolved and n not in order:
order.append(n)
name_a, name_b = order[0], order[1]
canon_of = {}
for s in speakers:
n = _norm_name(s)
if n in resolved:
canon_of.setdefault(n, s.strip())
# Ancres sures : locuteur explicite d'une incise nominale.
anchors: dict[int, str] = {}
for k, idx in enumerate(run):
seg = segments[idx]
for inc in seg.incises:
spk = incise_speaker(seg.text, inc, names)
if spk:
anchors[k] = _norm_name(spk)
break
# Une ancre nommant un tiers (hors paire) -> run suspect, on s'abstient.
if any(a not in (name_a, name_b) for a in anchors.values()):
continue
def pattern(start: str) -> list[str]:
other = name_b if start == name_a else name_a
return [start if k % 2 == 0 else other for k in range(len(run))]
candidates = [pattern(name_a), pattern(name_b)]
admissible = [p for p in candidates
if all(p[k] == a for k, a in anchors.items())]
if not admissible:
continue
def cost(p: list[str]) -> int: # corrections sur les repliques resolues
return sum(1 for k, idx in enumerate(run)
if _is_resolved(segments[idx].speaker)
and _norm_name(segments[idx].speaker) != p[k])
admissible.sort(key=cost)
if len(admissible) == 2 and cost(admissible[0]) == cost(admissible[1]):
continue # ex aequo sans ancre discriminante -> trop ambigu
chosen = admissible[0]
for k, idx in enumerate(run):
segments[idx].speaker = canon_of[chosen[k]]
# --- Extraction du casting (Gemma) ------------------------------------------
# Le prompt systeme est editable dans les reglages (settings.prompt_characters).
def extract_characters(text: str, gemma: Gemma) -> list[Character]:
def extract_characters(text: str, gemma: LLM) -> list[Character]:
"""Extrait les personnages et leurs attributs (genre, age) d'un texte."""
prompt = (
"A partir de l'extrait suivant, liste les personnages qui parlent ou "
@@ -374,17 +600,52 @@ _SPEECH_VERBS = {
"tempeta", "rétorque", "lâche", "informa", "renseigna", "indiqua",
"rappela", "avertit", "prévint", "prevint", "intima", "rétorquait",
"lançait", "questionnait", "reconnut", "constata", "répéta", "repeta",
"intervint", "intervient", "renchérissait",
}
# Noms de role pouvant etre sujet d'une incise ("informa le soldat").
# Noms de role (FONCTION) pouvant etre sujet d'une incise ("informa le soldat").
# On EXCLUT volontairement les rangs/titres (amiral, capitaine, lieutenant...) :
# ils precedent presque toujours un nom propre ("dit l'amiral Sagale") -> ce
# n'est pas un figurant anonyme mais une personne nommee ; les laisser ici ferait
# capter le titre au lieu du nom. Le nom propre est alors capte normalement.
_ROLE_NOUNS = {
"garde", "soldat", "sentinelle", "gardien", "prêtre", "pretre", "homme",
"femme", "fille", "garçon", "garcon", "vieille", "vieillard", "capitaine",
"lieutenant", "sergent", "général", "general", "amiral", "officier", "voix",
"femme", "fille", "garçon", "garcon", "vieille", "vieillard", "voix",
"inconnu", "inconnue", "étranger", "etranger", "enfant", "serviteur",
"servante", "messager", "domestique", "médecin", "medecin",
"servante", "messager", "domestique", "médecin", "medecin", "marine", "marin",
}
# Genre/age probables d'un personnage-fonction, pour l'attribuer a un locuteur
# anonyme regroupe (voix par genre/age). On ne mappe QUE les cas ou le genre de
# la PERSONNE est fortement implique (roles militaires/masculins, feminins
# explicites) ; les cas ambigus (medecin, officier, voix, sentinelle...) restent
# inconnus -> bucket "anonyme" generique. Mieux vaut un genre inconnu qu'errone.
_ROLE_GENDER = {
"soldat": "male", "garde": "male", "gardien": "male", "marine": "male",
"marin": "male", "homme": "male", "garçon": "male", "garcon": "male",
"vieillard": "male", "serviteur": "male", "messager": "male",
"prêtre": "male", "pretre": "male",
"femme": "female", "fille": "female", "servante": "female",
"vieille": "female", "inconnue": "female",
}
# Age probable (rare : seul "enfant" le donne nettement).
_ROLE_AGE = {
"enfant": "child", "garçon": "child", "garcon": "child",
"fille": "child", "vieillard": "old", "vieille": "old",
}
# Genre du pronom d'une incise d'inversion ("-t-elle"/"-il"). "-" => inversion.
_INV_GENDER_RE = re.compile(r"-(?:t-)?(?P<p>ils?|elles?)\b", re.IGNORECASE)
# Figurant genre decrit dans la narration : article (genre) + nom de role proche.
# Ex: "La femme", "La jeune marine", "Le soldat". Sert a resoudre une replique
# indeterminee en anonyme (cf. `_resolve_anonymous_figurants`).
_ANON_NARR_RE = re.compile(
r"\b(?P<art>la|le|une|un)\s+(?:[\wÀ-ÿ’'-]+\s+){0,2}?"
r"(?P<role>" + "|".join(re.escape(r) for r in sorted(_ROLE_NOUNS, key=len, reverse=True)) + r")\b",
re.IGNORECASE,
)
# Mots vides ignores quand on indexe les tokens d'un nom de personnage.
_NAME_STOP = {
"le", "la", "les", "un", "une", "de", "du", "des", "monsieur", "madame",
@@ -446,12 +707,14 @@ _REJECT = object() # le sujet n'en est pas un -> pas une incise
def _classify_subject(subj: str, idx: dict[str, str]):
"""Locuteur porte par le sujet d'une incise nominale.
"""Locuteur NOMME porte par le sujet d'une incise nominale.
- personnage connu -> nom canonique ;
- nom propre (capitalise) inconnu -> nom de surface (seed quand meme : le
texte le nomme, independamment de la fiabilite de l'extraction) ;
- nom de role generique ("le soldat") -> None (incise reelle, pas de seed) ;
- nom de role ("le soldat") -> None : pas un locuteur NOMME. L'incise reste
detectee (narration), et le rattachement a un anonyme (par genre/age) se
fait en post-traitement (cf. `_apply_anonymous_speakers` / `incise_role`) ;
- mot quelconque -> _REJECT (pas une incise).
"""
low = subj.lower()
@@ -464,11 +727,14 @@ def _classify_subject(subj: str, idx: dict[str, str]):
return _REJECT
def _nominal_matches(text: str, names) -> list[tuple[int, int, Optional[str]]]:
"""Passe 2 : (start, end, locuteur) pour chaque incise nominale.
def _nominal_matches(text: str, names
) -> list[tuple[int, int, Optional[str], str]]:
"""Passe 2 : (start, end, locuteur, sujet) pour chaque incise nominale.
Une incise nominale = verbe de parole + sujet (nom du casting, nom propre,
ou nom de role). Le sujet nom propre est seede meme absent du casting.
Le 4e champ est le sujet (minuscule) : sert a reconnaitre un nom de role
(`incise_role`) pour rattacher un locuteur anonyme par genre/age.
"""
idx = _name_token_index(names)
literals = sorted(set(idx) | _ROLE_NOUNS, key=len, reverse=True)
@@ -486,13 +752,15 @@ def _nominal_matches(text: str, names) -> list[tuple[int, int, Optional[str]]]:
r"[^.!?…»\",;]*?)"
r"(?P<close>[.!?…,])",
)
out: list[tuple[int, int, Optional[str]]] = []
out: list[tuple[int, int, Optional[str], str]] = []
for m in pat.finditer(text):
spk = _classify_subject(m.group("subj"), idx)
subj = m.group("subj")
spk = _classify_subject(subj, idx)
if spk is _REJECT:
continue
out.append((m.start("inc"),
_incise_end(text, m.end("close"), m.group("lead")), spk))
_incise_end(text, m.end("close"), m.group("lead")),
spk, subj.lower()))
return out
@@ -511,18 +779,33 @@ def _merge_spans(spans: list[tuple[int, int]]) -> list[Incise]:
def detect_incises(text: str, *, names=None) -> list[Incise]:
"""Bornes des incises dans une replique (inversion + nominale cast-aware)."""
spans = _inversion_spans(text)
spans += [(s, e) for s, e, _ in _nominal_matches(text, names or set())]
spans += [(s, e) for s, e, _, _ in _nominal_matches(text, names or set())]
return _merge_spans(spans)
def incise_speaker(text: str, incise: Incise, names) -> Optional[str]:
"""Locuteur explicite porte par une incise nominale ("compatit Holden")."""
for s, e, spk in _nominal_matches(text, names):
"""Locuteur NOMME explicite porte par une incise nominale ("compatit Holden").
None pour une incise de role ("informa le soldat") : un role n'est pas un
locuteur nomme (cf. `incise_role` pour le rattachement anonyme).
"""
for s, e, spk, _ in _nominal_matches(text, names):
if s == incise.start and e == incise.end:
return spk
return None
def incise_role(text: str, incise: Incise, names) -> Optional[str]:
"""Nom de role (minuscule) sujet d'une incise ("informa le soldat" -> "soldat").
Renvoie None si l'incise n'est pas une incise de role. Sert a rattacher la
replique a un locuteur anonyme regroupe par genre/age (cf. `_anon_identity`)."""
for s, e, _spk, subj in _nominal_matches(text, names):
if s == incise.start and e == incise.end and subj in _ROLE_NOUNS:
return subj
return None
def iter_incise_pieces(
text: str, incises: list[Incise]
) -> list[tuple[bool, str]]:
@@ -552,10 +835,10 @@ def iter_incise_pieces(
def analyze_chapter(
chapter: Chapter,
ct: ChapterText,
gemma: Gemma,
gemma: LLM,
*,
book_chars: Optional[list[Character]] = None,
dedup_gemma: Optional[Gemma] = None,
dedup_gemma: Optional[LLM] = None,
) -> tuple[ChapterAnalysis, list[Character]]:
"""Analyse complete d'un chapitre.
@@ -594,12 +877,18 @@ def analyze_chapter(
# Annotation deterministe des incises (bornes, non destructif) + seeding :
# une incise nominale qui nomme un personnage fixe le locuteur avec certitude
# AVANT l'appel LLM (corrige les cas que le petit modele rate).
# NB: ne PAS inclure les alias ici -> mesure : ca change trop le prompt et
# provoque de gros effets papillon (ch06 12B: 96% -> 80%). Les epithetes sont
# rattaches en post-traitement par la canonicalisation (sur le cast complet).
names = {c.name for c in chars}
for seg in segments:
if seg.type is not SegmentType.DIALOGUE:
continue
seg.incises = detect_incises(seg.text, names=names)
for inc in seg.incises:
# PRE-LLM : seuls les noms propres seedent (les incises de role
# renvoient None -> pas de seed, donc prompt inchange ; les roles
# sont rattaches en anonymes en post-traitement, sans effet papillon).
spk = incise_speaker(seg.text, inc, names)
if spk:
seg.speaker = spk
@@ -611,6 +900,22 @@ def analyze_chapter(
_refine_unknown_speakers(segments, gemma, characters=chapter_chars,
confidence=conf)
# Post-traitement deterministe (sans LLM). Ordre important :
# 1. rattache les incises de role a un locuteur anonyme par genre/age ;
# 2. repare l'alternance des tours dans les echanges a deux ;
# 3. recolle les variantes de noms au canonique du cast (rendu + score) ;
# 4. resout les figurants restes indetermines via la narration adjacente.
anon = _apply_anonymous_speakers(segments, names=names)
_repair_alternation(segments, names=names)
_canonicalize_speakers(segments, chars)
anon.update(_resolve_anonymous_figurants(segments))
# Cree les Character generiques des buckets anonymes (assignation de voix).
known = {c.name for c in chars}
for ident, (gender, age) in anon.items():
if ident not in known:
chars.append(Character(name=ident, gender=gender, age=age))
known.add(ident)
# Absorbe les locuteurs residuels (hors liste) en aliases (heuristique seule).
chars, _ = reconcile_characters(
chars, [], None, speaker_names=[s.speaker for s in segments])

View File

@@ -20,7 +20,7 @@ from pydantic import BaseModel
from ..config import DATA_DIR, book_data_dir, book_output_dir, ensure_dirs
from ..epub.parser import load_book, load_chapter_text, parse_epub
from ..models import Cast, ChapterAnalysis, Pronunciation
from ..models import Cast, ChapterAnalysis, Character, Pronunciation
from ..pipeline.orchestrator import load_state, orchestrator
from ..settings import Settings, get_settings, save_settings
from ..store import artifacts
@@ -196,6 +196,43 @@ def put_cast(slug: str, cast: Cast) -> dict:
return {"saved": True}
@app.get("/api/books/{slug}/cast/unresolved")
def get_unresolved_speakers(slug: str) -> dict:
"""Locuteurs apparaissant dans l'analyse mais rattaches a aucun personnage.
Surface les surfaces que la canonicalisation deterministe a refuse de
trancher, pour que l'utilisateur les aliase/fusionne a la main. Predicat =
rattachement a un Character (par nom/alias exact ou heuristique), independant
de l'assignation de voix."""
from ..casting.dedup import heuristic_match
from ..epub.parser import load_book
_require(slug)
cast = artifacts.load_cast(slug)
def resolves(spk: str) -> bool:
low = spk.lower()
for ch in cast.characters:
if ch.name.lower() == low or low in (a.lower() for a in ch.aliases):
return True
return isinstance(heuristic_match(spk, cast.characters), Character)
agg: dict[str, dict] = {}
for ch in load_book(slug).chapters:
if not artifacts.analysis_path(slug, ch.index).exists():
continue
for seg in artifacts.load_analysis(slug, ch.index).segments:
spk = (seg.speaker or "").strip()
if not spk or spk.lower() in {"narrateur", "inconnu", "?"}:
continue
if resolves(spk):
continue
row = agg.setdefault(spk, {"speaker": spk, "count": 0, "chapters": []})
row["count"] += 1
if ch.index not in row["chapters"]:
row["chapters"].append(ch.index)
return {"unresolved": sorted(agg.values(), key=lambda r: -r["count"])}
@app.get("/api/books/{slug}/pronunciation")
def get_pron(slug: str) -> dict:
_require(slug)
@@ -222,6 +259,16 @@ def write_settings(settings: Settings) -> dict:
return {"saved": True}
@app.get("/api/lmstudio/models")
def list_lmstudio_models() -> dict:
"""Modeles telecharges dans LM Studio (pour peupler le selecteur de l'UI)."""
from ..analysis.llm.lmstudio_backend import list_models
try:
return {"models": list_models(get_settings().lmstudio_base_url)}
except Exception as exc: # noqa: BLE001 — serveur down / injoignable
raise HTTPException(503, f"LM Studio injoignable: {exc}")
# --- Voicebank + preview -----------------------------------------------------
@app.get("/api/voicebank")

View File

@@ -1,10 +1,12 @@
"""Auto-casting : attribue une voix distincte a chaque personnage.
Strategie deterministe :
- Narrateur : voix FR native par defaut (ff_siwis), sinon premiere voix.
- Personnages : voix du meme genre, distinctes tant qu'il en reste ; au-dela on
recycle en repartissant le plus equitablement possible. Genre inconnu -> pool
mixte. L'ordre (tri par nom) garantit la reproductibilite.
- Narrateur : voix dediee de la voicebank (PREFERRED_NARRATOR), sinon 1re voix.
- Personnages nommes : voix du meme genre dans le pool *nomme* (anonymous=False),
distinctes tant qu'il en reste ; au-dela recyclage equitable.
- Figurants anonymes ("anonyme (...)") : voix du meme genre dans le pool *reserve*
(anonymous=True), pour ne pas consommer les voix des personnages nommes.
Genre inconnu -> pool mixte. L'ordre (tri par nom) garantit la reproductibilite.
L'utilisateur pourra surcharger ces choix dans l'UI.
"""
from __future__ import annotations
@@ -14,18 +16,29 @@ from typing import Optional
from ..models import Cast, Character, Voicebank
# Voix narrateur preferee (FR native).
PREFERRED_NARRATOR = "fr_f_siwis"
# Voix narrateur preferee (voix dediee de la voicebank CML).
PREFERRED_NARRATOR = "fr_narrator"
def _pick_pool(vb: Voicebank, gender: Optional[str], narrator_id: str) -> list[str]:
"""Voix candidates : on privilegie STRICTEMENT le genre (quitte a reutiliser).
def _is_anonymous(name: str) -> bool:
"""Un figurant anonyme ("anonyme (homme)", "anonyme (femme, vieux)", ...)."""
return name.strip().lower().startswith("anonyme")
On ne croise le genre que si aucune voix du bon genre n'existe. Le narrateur
est exclu tant qu'il reste d'autres options, pour le distinguer.
def _pick_pool(vb: Voicebank, gender: Optional[str], narrator_id: str,
*, anonymous: bool) -> list[str]:
"""Voix candidates : genre STRICT et pool reserve selon `anonymous`.
Les figurants anonymes tirent dans le sous-ensemble `anonymous=True`, les
personnages nommes dans le sous-ensemble `anonymous=False` — les deux ne se
melangent pas. On ne croise (tag puis genre) qu'en dernier recours si le pool
cible est vide. Le narrateur est exclu tant qu'il reste d'autres options.
"""
same = [e.id for e in vb.by_gender(gender)] if gender in ("male", "female") else []
pool = same if same else [e.id for e in vb.entries]
genders = (gender,) if gender in ("male", "female") else ("male", "female")
# 1) genre + tag exacts ; 2) genre seul ; 3) tout.
same_tag = [e.id for g in genders for e in vb.by_gender(g, anonymous=anonymous)]
same_gender = [e.id for g in genders for e in vb.by_gender(g)]
pool = same_tag or same_gender or [e.id for e in vb.entries]
non_narrator = [vid for vid in pool if vid != narrator_id]
return non_narrator or pool # garde le narrateur seulement s'il est seul
@@ -55,7 +68,7 @@ def assign_voices(
if respect_existing and ch.voice_id and vb.by_id(ch.voice_id):
usage[ch.voice_id] += 1
continue # respecte une attribution existante (override utilisateur)
pool = _pick_pool(vb, ch.gender, narrator_id)
pool = _pick_pool(vb, ch.gender, narrator_id, anonymous=_is_anonymous(ch.name))
# Choisit la voix la moins utilisee du pool (donc une voix neuve d'abord).
best = min(pool, key=lambda vid: (usage[vid], pool.index(vid)))
ch.voice_id = best

View File

@@ -132,12 +132,15 @@ def _absorb(
age: Optional[str] = None,
description: Optional[str] = None,
voice_id: Optional[str] = None,
keep_canonical: bool = False,
) -> None:
"""Fusionne la variante `name` dans `target` (mutation en place).
Enrichit les attributs manquants, recalcule le nom canonique et range les
autres formes en aliases.
"""
autres formes en aliases. `keep_canonical=True` GARDE le nom actuel de
`target` comme canonique (les autres formes deviennent aliases) : sert a
rendre stable un nom deja etabli dans le cast (un chapitre ne doit pas
renommer "Sagale" en "Amiral Mehmet Sagale")."""
target.gender = target.gender or gender
target.age = target.age or age
target.description = target.description or description
@@ -148,17 +151,36 @@ def _absorb(
f = (f or "").strip()
if f:
forms.setdefault(_norm(f), f)
canon = max(forms, key=lambda n: _completeness(forms[n]))
canon = (_norm(target.name) if keep_canonical
else max(forms, key=lambda n: _completeness(forms[n])))
target.name = forms[canon]
target.aliases = sorted(v for k, v in forms.items() if k != canon)
# Genre/age d'un locuteur anonyme "anonyme (homme, adulte)" (inverse de
# segmenter._anon_identity) -> pour qu'il herite d'une voix du bon genre.
_ANON_GENDER = {"homme": "male", "femme": "female"}
_ANON_AGE = {"enfant": "child", "jeune": "young", "adulte": "adult", "vieux": "old"}
def _anon_attrs(name: str) -> tuple[Optional[str], Optional[str]]:
low = name.strip().lower()
if not low.startswith("anonyme"):
return None, None
inside = low[low.find("(") + 1: low.find(")")] if "(" in low else ""
toks = [t.strip() for t in inside.split(",")]
gender = next((_ANON_GENDER[t] for t in toks if t in _ANON_GENDER), None)
age = next((_ANON_AGE[t] for t in toks if t in _ANON_AGE), None)
return gender, age
def _item(c) -> dict:
"""Normalise un personnage ou un nom brut en entree de reconciliation."""
if isinstance(c, Character):
return {"name": c.name, "gender": c.gender, "age": c.age,
"description": c.description, "voice_id": c.voice_id}
return {"name": str(c), "gender": None, "age": None,
gender, age = _anon_attrs(str(c)) # figurant anonyme -> genre/age depuis le nom
return {"name": str(c), "gender": gender, "age": age,
"description": None, "voice_id": None}
@@ -194,6 +216,9 @@ def reconcile_characters(
"""
chars = [c.model_copy(deep=True) for c in book_chars]
name_map: dict[str, str] = {}
# Noms deja etablis dans le cast : on les garde canoniques (un chapitre ne
# doit pas renommer un personnage existant en une forme plus longue/titree).
established = {_norm(c.name) for c in book_chars}
items = [_item(c) for c in new_chars]
seen = {_norm(it["name"]) for it in items}
@@ -214,7 +239,8 @@ def reconcile_characters(
pending.append(it)
elif m is not None:
_absorb(m, it["name"], gender=it["gender"], age=it["age"],
description=it["description"], voice_id=it["voice_id"])
description=it["description"], voice_id=it["voice_id"],
keep_canonical=_norm(m.name) in established)
name_map[_norm(it["name"])] = m.name
elif gemma is not None:
pending.append(it) # peut etre une variante non evidente ("Jim")
@@ -231,7 +257,8 @@ def reconcile_characters(
target = hm if isinstance(hm, Character) else None
if target is not None:
_absorb(target, it["name"], gender=it["gender"], age=it["age"],
description=it["description"], voice_id=it["voice_id"])
description=it["description"], voice_id=it["voice_id"],
keep_canonical=_norm(target.name) in established)
name_map[_norm(it["name"])] = target.name
else:
_create(chars, it, name_map)

View File

@@ -1,9 +1,14 @@
"""Banque de voix : un jeu de voix variees (genre/age) auto-suffisant.
"""Banque de voix : un jeu de voix francaises variees (genre, pool anonyme).
Chaque voix s'appuie sur une voix Kokoro (identite + clip de reference). Le clip
de reference est genere une fois en lisant un passage francais standard ; il sert
de reference de timbre pour le clonage Qwen3 (rendu final). Aucune ressource
externe a sourcer.
La banque de reference est peuplee par `scripts/import_voices.py` a partir de
**vrais clips de locuteurs francais** (CML-TTS, livres audio) : chaque voix a son
`ref_audio` + `ref_text`, qui servent de reference de timbre au clonage Qwen3
(rendu final). C'est la source de verite (metadata.json versionne).
`build_voicebank()` ci-dessous est un fallback **legacy** : il regenere des clips
*avec Kokoro* (presets a timbre anglais lisant du francais -> accent). Il ne se
declenche que si metadata.json est absent ou sans `ref_audio`. Re-peupler la
banque = relancer le script d'import, pas ce fallback.
Resolution moteur :
- Kokoro -> VoiceSpec(preset=kokoro_voice) (rapide, preview / draft)

View File

@@ -53,14 +53,16 @@ def analyze(
chapter: Optional[int] = typer.Option(None, help="Index de chapitre unique (def: tous)."),
limit: Optional[int] = typer.Option(None, help="Limiter au N premiers chapitres rendus."),
force: bool = typer.Option(False, help="Re-analyser meme si un artefact existe."),
backend: Optional[str] = typer.Option(None, help="Moteur LLM: mlx ou lmstudio (def: reglages)."),
model: Optional[str] = typer.Option(None, help="Identifiant de modele (def: reglages)."),
):
"""Analyse Gemma : segments narration/dialogue + locuteurs + casting."""
from .analysis.gemma import Gemma
from .analysis.llm.client import LLM
from .analysis.segmenter import analyze_chapter
from .settings import get_settings
book = load_book(slug)
gemma = Gemma()
gemma = LLM(model_id=model, backend=backend)
dedup_gemma = gemma if get_settings().dedup_use_gemma else None
cast = artifacts.load_cast(slug)
chars = list(cast.characters)
@@ -100,6 +102,8 @@ def benchmark(
slug: str,
models: Optional[str] = typer.Option(
None, help="Modeles a comparer, separes par des virgules (def: modele courant)."),
backend: Optional[str] = typer.Option(
None, help="Moteur LLM: mlx ou lmstudio (def: reglages)."),
chapter: Optional[int] = typer.Option(
None, help="Restreindre a un chapitre (def: tous ceux avec reference)."),
temperature: Optional[float] = typer.Option(
@@ -115,12 +119,16 @@ def benchmark(
import sys
from datetime import datetime
from .analysis import gemma as _gemma
from .analysis.llm import client as _llm
from .analysis.benchmark import run_benchmark
from .settings import get_settings
settings = get_settings()
backend_name = backend or settings.gemma_backend
default_model = (settings.lmstudio_model if backend_name == "lmstudio"
else settings.gemma_model)
model_ids = ([m.strip() for m in models.split(",") if m.strip()]
if models else [get_settings().gemma_model])
if models else [default_model])
chapters = [chapter] if chapter is not None else None
label = "artefacts en cache" if use_cached else f"{len(model_ids)} modele(s)"
@@ -137,17 +145,17 @@ def benchmark(
def _sink(piece: str) -> None:
sys.stdout.write(piece)
sys.stdout.flush()
_gemma.set_token_sink(_sink)
_llm.set_token_sink(_sink)
try:
report = run_benchmark(
slug, model_ids, chapters=chapters,
slug, model_ids, backend=backend_name, chapters=chapters,
temperature=temperature,
reasoning=reasoning if reasoning else None,
use_cached=use_cached,
progress=_progress)
finally:
if stream:
_gemma.set_token_sink(None)
_llm.set_token_sink(None)
report.generated_at = datetime.now().isoformat(timespec="seconds")
# Table comparative : une ligne par modele (agregat micro-moyenne).
@@ -197,9 +205,11 @@ def benchmark(
def pronounce(
slug: str,
chapter: Optional[int] = typer.Option(None, help="Index de chapitre (def: 1er rendu)."),
backend: Optional[str] = typer.Option(None, help="Moteur LLM: mlx ou lmstudio (def: reglages)."),
model: Optional[str] = typer.Option(None, help="Identifiant de modele (def: reglages)."),
):
"""Propose des candidats de prononciation (Gemma) -> pronunciation.json."""
from .analysis.gemma import Gemma
from .analysis.llm.client import LLM
from .analysis.pronunciation import merge_pronunciations, propose_pronunciations
book = load_book(slug)
@@ -209,7 +219,7 @@ def pronounce(
console.print("[red]Chapitre introuvable.[/]"); raise typer.Exit(1)
ct = load_chapter_text(slug, ch)
gemma = Gemma()
gemma = LLM(model_id=model, backend=backend)
with console.status("Recherche des mots a risque…"):
new = propose_pronunciations("\n".join(ct.paragraphs), gemma)
pron = merge_pronunciations(artifacts.load_pronunciation(slug), new)
@@ -228,6 +238,8 @@ def cast(
rebuild_voicebank: bool = typer.Option(False, help="Regenere les clips de la voicebank."),
dedup: bool = typer.Option(False, help="Deduplique d'abord les variantes de noms (heuristique)."),
llm: bool = typer.Option(False, "--llm", help="Ajoute la passe Gemma a la dedup (moins sur)."),
backend: Optional[str] = typer.Option(None, help="Moteur LLM pour --llm: mlx ou lmstudio (def: reglages)."),
model: Optional[str] = typer.Option(None, help="Identifiant de modele pour --llm (def: reglages)."),
):
"""Construit la voicebank (si besoin) et auto-assigne les voix au casting."""
from .casting.assign import assign_voices
@@ -243,8 +255,8 @@ def cast(
from .models import Cast
gemma = None
if llm:
from .analysis.gemma import Gemma
gemma = Gemma()
from .analysis.llm.client import LLM
gemma = LLM(model_id=model, backend=backend)
before = len(cast.characters)
with console.status("Deduplication du casting…"):
chars = dedup_cast(cast.characters, gemma)

View File

@@ -29,8 +29,17 @@ VOICEBANK_DIR = _env_path("INKFLOW_VOICEBANK_DIR", PROJECT_ROOT / "voicebank")
# Echantillons fournis
SAMPLES_DIR = PROJECT_ROOT / "samples"
# --- Moteur LLM d'analyse ----------------------------------------------------
# Backend par defaut : "mlx" (mlx-lm, Apple Silicon) ou "lmstudio" (API OpenAI
# locale de LM Studio, sert GGUF *et* MLX charges via sa GUI).
GEMMA_BACKEND = os.environ.get("INKFLOW_GEMMA_BACKEND", "mlx")
# Endpoint OpenAI-compatible de LM Studio (onglet Developer > Start Server).
LMSTUDIO_BASE_URL = os.environ.get(
"INKFLOW_LMSTUDIO_BASE_URL", "http://127.0.0.1:1234/v1"
)
# --- Modeles MLX (HuggingFace mlx-community) ---------------------------------
# Analyse de texte : Gemma via mlx-lm.
# Analyse de texte : Gemma via mlx-lm (backend "mlx").
GEMMA_MODEL = os.environ.get(
"INKFLOW_GEMMA_MODEL", "mlx-community/gemma-3-4b-it-4bit"
)

View File

@@ -119,6 +119,7 @@ class VoiceEntry(BaseModel):
label: Optional[str] = None # libelle lisible
ref_audio: Optional[str] = None # chemin du clip (relatif a voicebank/)
ref_text: Optional[str] = None # transcription du clip
anonymous: bool = False # voix reservee aux figurants "anonyme (...)"
class Voicebank(BaseModel):
@@ -127,8 +128,11 @@ class Voicebank(BaseModel):
def by_id(self, voice_id: str) -> Optional[VoiceEntry]:
return next((e for e in self.entries if e.id == voice_id), None)
def by_gender(self, gender: str) -> list[VoiceEntry]:
return [e for e in self.entries if e.gender == gender]
def by_gender(self, gender: str, *, anonymous: Optional[bool] = None) -> list[VoiceEntry]:
"""Voix d'un genre. `anonymous=False`/`True` filtre le pool reserve aux
figurants ; None ne filtre pas."""
return [e for e in self.entries
if e.gender == gender and (anonymous is None or e.anonymous == anonymous)]
class PronunciationEntry(BaseModel):

View File

@@ -124,7 +124,7 @@ class Orchestrator:
# --- etapes --------------------------------------------------------------
def run_analyze(self, slug: str, chapter_indexes: Optional[list[int]] = None) -> None:
def job() -> None:
from ..analysis.gemma import Gemma
from ..analysis.llm.client import LLM
from ..analysis.segmenter import analyze_chapter
from ..models import Cast
from ..settings import get_settings
@@ -137,7 +137,7 @@ class Orchestrator:
state.active_stage = "analyze"
self._save_and_emit(state)
gemma = Gemma()
gemma = LLM()
dedup_gemma = gemma if get_settings().dedup_use_gemma else None
cast = artifacts.load_cast(slug)
chars = list(cast.characters)
@@ -196,7 +196,7 @@ class Orchestrator:
tout en maintenant la coherence du livre (deduplication).
"""
def job() -> None:
from ..analysis.gemma import Gemma
from ..analysis.llm.client import LLM
from ..analysis.segmenter import extract_characters
from ..casting.dedup import reconcile_characters
from ..models import Cast
@@ -209,7 +209,7 @@ class Orchestrator:
state.active_stage = "cast"
self._save_and_emit(state)
gemma = Gemma()
gemma = LLM()
dedup_gemma = gemma if get_settings().dedup_use_gemma else None
cast = artifacts.load_cast(slug)
chars = list(cast.characters)
@@ -239,7 +239,7 @@ class Orchestrator:
def run_dedup_cast(self, slug: str) -> None:
"""Replie les doublons d'un casting deja constitue (Holden/James Holden...)."""
def job() -> None:
from ..analysis.gemma import Gemma
from ..analysis.llm.client import LLM
from ..casting.dedup import dedup_cast
from ..models import Cast
from ..settings import get_settings
@@ -250,7 +250,7 @@ class Orchestrator:
self._save_and_emit(state)
cast = artifacts.load_cast(slug)
gemma = Gemma() if get_settings().dedup_use_gemma else None
gemma = LLM() if get_settings().dedup_use_gemma else None
chars = dedup_cast(cast.characters, gemma)
artifacts.save_cast(slug, Cast(
narrator_voice_id=cast.narrator_voice_id, characters=chars))
@@ -259,7 +259,7 @@ class Orchestrator:
def run_pronounce(self, slug: str) -> None:
def job() -> None:
from ..analysis.gemma import Gemma
from ..analysis.llm.client import LLM
from ..analysis.pronunciation import (
merge_pronunciations,
propose_pronunciations,
@@ -271,7 +271,7 @@ class Orchestrator:
state.active_stage = "pronounce"
self._save_and_emit(state)
gemma = Gemma()
gemma = LLM()
pron = artifacts.load_pronunciation(slug)
targets = book.render_chapters[:3] # echantillon de chapitres
for i, ch in enumerate(targets):

View File

@@ -45,12 +45,22 @@ def make_voice_resolver(cast, voicebank, engine: str) -> VoiceResolver:
Replie sur la voix du narrateur si le locuteur n'a pas de voix attribuee.
"""
import logging
from ..casting.assign import resolve_speaker_voice
from ..casting.voicebank import voice_spec_for
logger = logging.getLogger(__name__)
warned: set[str] = set() # 1 warning par locuteur et par chapitre (resolver local)
def resolve(speaker: str):
vid = resolve_speaker_voice(speaker, cast, voicebank)
if vid is None:
if speaker != "narrateur" and speaker not in warned:
warned.add(speaker)
logger.warning(
"Locuteur sans voix attribuee, repli sur le narrateur: %r",
speaker)
vid = cast.narrator_voice_id
entry = voicebank.by_id(vid) if vid else None
if entry is None:

View File

@@ -67,6 +67,21 @@ DEFAULT_PROMPT_DEDUP = (
class Settings(BaseModel):
"""Reglages techniques globaux, persistes dans data/settings.json."""
# --- Moteur LLM d'analyse ---
# "mlx" : mlx-lm (Apple Silicon), utilise `gemma_model`.
# "lmstudio" : API OpenAI locale de LM Studio (sert GGUF *et* MLX), utilise
# `lmstudio_base_url` + `lmstudio_model`.
gemma_backend: str = config.GEMMA_BACKEND
lmstudio_base_url: str = config.LMSTUDIO_BASE_URL
lmstudio_model: str = "" # vide -> 1er modele charge dans LM Studio
# Par defaut, le backend LM Studio DELEGUE la config de generation
# (temperature, plafond de tokens) au modele charge dans LM Studio : on
# n'impose ni `temperature` ni `max_tokens` dans la requete. Les reglages
# "Generation Gemma" ci-dessous pilotent alors uniquement le backend MLX.
# Mettre a False pour reimposer ces reglages a LM Studio (utile pour des
# benchmarks reproductibles a temperature fixe).
lmstudio_defer_config: bool = True
# --- Modeles MLX (identifiants HuggingFace) ---
gemma_model: str = config.GEMMA_MODEL
qwen3_model: str = config.QWEN3_TTS_MODEL
@@ -179,7 +194,7 @@ def _invalidate_model_caches() -> None:
except Exception: # noqa: BLE001
pass
try:
from .analysis.gemma import _load
_load.cache_clear()
from .analysis.llm.factory import reset_llm_cache
reset_llm_cache()
except Exception: # noqa: BLE001
pass

View File

@@ -7,15 +7,33 @@ Deux modes :
"""
from __future__ import annotations
import logging
import numpy as np
from ..settings import get_settings
from .base import TTSBackend, VoiceSpec, to_mono_float32
from .chunk import chunk_text
logger = logging.getLogger(__name__)
# Qwen3 tolere des sequences plus longues que Kokoro, mais on borne quand meme.
_QWEN_MAX_CHARS = 500
# Garde-fou anti-derive : Qwen3 part parfois en boucle (audio 50x trop long) ou
# s'arrete net (sortie ~0 s). On estime la duree plausible d'un chunk depuis sa
# longueur (~15 caracteres/s en francais) et on rejette/reessaie les sorties hors
# bornes. Stochastique (temperature) -> un retry change le tirage.
_CHARS_PER_SEC = 15.0
_QWEN_RETRIES = 3
_MIN_FLOOR_SEC = 0.3 # en deca = generation echouee (silence)
def _bounds(n_chars: int) -> tuple[float, float, float]:
"""(attendu, min, max) en secondes pour un chunk de `n_chars` caracteres."""
expected = max(1.0, n_chars / _CHARS_PER_SEC)
return expected, max(_MIN_FLOOR_SEC, 0.4 * expected), 2.5 * expected + 2.0
class Qwen3Backend(TTSBackend):
name = "qwen3"
@@ -45,14 +63,46 @@ class Qwen3Backend(TTSBackend):
kwargs["voice"] = voice.preset or get_settings().qwen3_default_voice
return kwargs
def _gen_chunk_once(self, chunk: str, kwargs: dict) -> np.ndarray:
"""Genere l'audio (concatene) d'un chunk en un tirage."""
out: list[np.ndarray] = []
for result in self._model.generate(text=chunk, **kwargs):
self._sample_rate = getattr(result, "sample_rate", self._sample_rate)
out.append(to_mono_float32(result.audio))
return np.concatenate(out) if out else np.zeros(0, dtype=np.float32)
def _gen_chunk_guarded(self, chunk: str, kwargs: dict) -> np.ndarray:
"""Genere un chunk en rejetant les sorties aberrantes (boucle / coupure).
Retourne le 1er tirage dans les bornes ; sinon la tentative la plus proche
de la duree attendue (en excluant les silences et les derives extremes).
"""
sr = self._sample_rate
expected, lo, hi = _bounds(len(chunk))
attempts: list[np.ndarray] = []
for i in range(_QWEN_RETRIES):
audio = self._gen_chunk_once(chunk, kwargs)
dur = len(audio) / sr
if lo <= dur <= hi:
if i:
logger.info("Qwen3: chunk OK au retry %d (%.1fs)", i, dur)
return audio
logger.warning("Qwen3: sortie aberrante %.1fs (attendu ~%.1fs) — retry", dur, expected)
attempts.append(audio)
# Aucune tentative dans les bornes : on garde la moins mauvaise (ni
# silence ni derive), la plus proche de l'attendu.
valid = [a for a in attempts if _MIN_FLOOR_SEC <= len(a) / sr <= hi] or attempts
best = min(valid, key=lambda a: abs(len(a) / sr - expected))
logger.warning("Qwen3: chunk non stabilise apres %d essais, garde %.1fs: %r",
_QWEN_RETRIES, len(best) / sr, chunk[:60])
return best
def synthesize(self, text: str, voice: VoiceSpec) -> tuple[np.ndarray, int]:
self._ensure_loaded()
kwargs = self._gen_kwargs(voice)
pieces: list[np.ndarray] = []
for chunk in chunk_text(text, max_chars=_QWEN_MAX_CHARS):
for result in self._model.generate(text=chunk, **kwargs):
self._sample_rate = getattr(result, "sample_rate", self._sample_rate)
pieces.append(to_mono_float32(result.audio))
pieces = [self._gen_chunk_guarded(chunk, kwargs)
for chunk in chunk_text(text, max_chars=_QWEN_MAX_CHARS)]
pieces = [p for p in pieces if len(p)]
if not pieces:
return np.zeros(0, dtype=np.float32), self._sample_rate
return np.concatenate(pieces), self._sample_rate

View File

@@ -9,6 +9,8 @@ dependencies = [
"mlx-lm",
"mlx-audio",
"misaki", # phonemizer pour Kokoro (français inclus)
# Backend LLM alternatif : LM Studio via son API OpenAI locale (GGUF + MLX)
"openai",
# Parsing EPUB
"ebooklib",
"beautifulsoup4",

View File

@@ -0,0 +1,61 @@
"""Mesure l'effet de la passe d'alternance sur l'attribution (avant/apres).
Pour chaque modele : charge une fois, analyse le chapitre, intercepte les
locuteurs JUSTE avant `_repair_alternation` (etat "avant") puis lit l'etat
"apres", et score les deux contre la reference. Isole le gain de la passe
deterministe, independamment du cout du modele.
"""
from __future__ import annotations
import copy
import sys
from inkflow.analysis import segmenter
from inkflow.analysis.benchmark import _load_reference, _score_counts, _counts_to_score
from inkflow.analysis.llm.client import LLM
from inkflow.analysis.llm.factory import reset_llm_cache
from inkflow.epub.parser import load_book, load_chapter_text
from inkflow.store import artifacts
SLUG = "la-colere-de-tiamat"
CH = int(__import__("os").environ.get("DELTA_CH", "5"))
def main(model_ids: list[str]) -> None:
book = load_book(SLUG)
chapter = next(c for c in book.chapters if c.index == CH)
ct = load_chapter_text(SLUG, chapter)
cast = artifacts.load_cast(SLUG)
ref = _load_reference(SLUG, CH)
orig_repair = segmenter._repair_alternation
print(f"{'modele':<40} {'avant':>7} {'apres':>7} {'delta':>7}")
for model_id in model_ids:
captured: dict[str, list] = {}
def spy(segments, **kw): # capture l'etat avant reparation
captured["before"] = copy.deepcopy(segments)
orig_repair(segments, **kw)
segmenter._repair_alternation = spy
try:
gemma = LLM(model_id=model_id)
analysis, _ = segmenter.analyze_chapter(
chapter, ct, gemma, book_chars=list(cast.characters),
dedup_gemma=None)
finally:
segmenter._repair_alternation = orig_repair
reset_llm_cache()
from inkflow.models import ChapterAnalysis
before = ChapterAnalysis(index=CH, title=ct.title,
segments=captured["before"])
s_before = _counts_to_score(CH, _score_counts(ref, before, cast))
s_after = _counts_to_score(CH, _score_counts(ref, analysis, cast))
b = s_before.speaker_acc_dialogue
a = s_after.speaker_acc_dialogue
print(f"{model_id:<40} {b:>6.1%} {a:>6.1%} {a - b:>+6.1%}")
if __name__ == "__main__":
main(sys.argv[1:] or ["mlx-community/gemma-3-4b-it-4bit"])

View File

@@ -0,0 +1,299 @@
"""Importe de vraies voix francaises dans la voicebank (clips + ref_text).
Probleme resolu : `build_voicebank()` generait les clips de reference *avec
Kokoro lui-meme* — et la plupart des voix empruntaient un timbre Kokoro
**anglais** lisant du francais phonemise. Resultat : un fort accent anglais que
Qwen3 clonait fidelement. Ce script **remplace toute la banque** par de vrais
enregistrements de locuteurs francais, ce qui donne a Qwen3 une reference de
timbre reellement francophone.
Source : **CML-TTS French** (`ylacombe/cml-tts`, config `french`), CC-BY,
non-gated. Corpus de **livres audio** taille pour le TTS : voix studio, registre
narrateur, prose reelle. On telecharge des shards parquet (audio WAV 24 kHz
embarque) via `huggingface_hub`, shard apres shard, jusqu'a remplir les quotas.
Allocation des roles (chaque voix = un locuteur distinct, `speaker_id`) :
- 1 **narrateur** dedie (`fr_narrator`).
- N **voix nommees** par genre (`fr_f_*`, `fr_m_*`) pour les personnages.
- M **voix anonymes** par genre (`fr_anon_f_*`, `fr_anon_m_*`, `anonymous=True`),
reservees aux figurants "anonyme (...)" par `assign_voices` (jamais melangees
avec les voix nommees).
Qualite : un clip par locuteur, le plus propre (`levenshtein` mini), duree bornee.
Genre absent du corpus -> estime par **F0 (YIN, anti-octave)**.
Usage (depuis backend/, venv actif) :
python scripts/import_voices.py # quotas par defaut, REMPLACE la banque
python scripts/import_voices.py --named-f 18 --named-m 14 --anon 4
python scripts/import_voices.py --shards french/dev/0002.parquet french/dev/0000.parquet
Le clip est ecrit a son sr natif (24 kHz) ; Qwen3 reechantillonne la reference.
La banque resultante a un `ref_audio` partout, donc `build_voicebank()` (legacy)
ne la regenerera pas. Le `kokoro_voice` reste renseigne (preset de meme genre)
pour le preview/draft Kokoro ; le timbre final vient du ref_audio via Qwen3.
"""
from __future__ import annotations
import argparse
import io
import sys
from pathlib import Path
import numpy as np
import soundfile as sf
# Permet de lancer le script sans `pip install -e` : on ajoute backend/ au path.
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from inkflow.casting.voicebank import save_voicebank # noqa: E402
from inkflow.config import VOICEBANK_DIR # noqa: E402
from inkflow.models import VoiceEntry, Voicebank # noqa: E402
# Presets Kokoro de secours par genre (preview/draft uniquement ; le timbre final
# vient du ref_audio clone par Qwen3). On cycle dessus pour varier les previews.
_KOKORO_BY_GENDER = {
"female": ["af_bella", "af_heart", "af_nicole", "bf_emma"],
"male": ["am_fenrir", "am_michael", "bm_george", "am_eric"],
}
# Shards CML-TTS French par defaut (branche refs/convert/parquet). dev/test
# partagent un petit pool fixe de locuteurs (~17F/17M au total) ; la variete est
# dans train (chaque shard = quelques lecteurs distincts). On lit test (le plus
# fourni) puis des shards train jusqu'a remplir les quotas.
_DEFAULT_SHARDS = (
["french/test/0000.parquet", "french/dev/0002.parquet"]
+ [f"french/train/{i:04d}.parquet" for i in range(12)]
)
def _to_mono(arr: np.ndarray) -> np.ndarray:
arr = np.asarray(arr, dtype=np.float32)
if arr.ndim > 1:
arr = arr.mean(axis=1)
return arr
def _yin_f0(frame: np.ndarray, sr: int, lo: int, hi: int, thresh: float = 0.15) -> float:
"""F0 d'une trame par YIN (anti-octave). 0.0 si non voisee.
1) fonction de difference d(tau) ; 2) moyenne cumulee normalisee d'(tau) ;
3) premier tau sous le seuil absolu (evite de prendre l'octave superieure).
C'est l'etape (2)-(3) qui rend YIN robuste aux erreurs d'octave de
l'autocorrelation simple (qui faisait passer un homme pour une femme).
"""
n = len(frame)
diff = np.zeros(hi + 1)
for tau in range(1, hi + 1):
d = frame[: n - tau] - frame[tau:n]
diff[tau] = np.dot(d, d)
cum = np.cumsum(diff[1:])
cmnd = np.ones(hi + 1)
taus = np.arange(1, hi + 1)
cmnd[1:] = diff[1:] * taus / np.maximum(cum, 1e-9)
tau = -1
t = lo
while t < hi:
if cmnd[t] < thresh:
while t + 1 < hi and cmnd[t + 1] < cmnd[t]:
t += 1 # descend jusqu'au minimum local
tau = t
break
t += 1
if tau == -1: # aucun creux net -> min global de la bande
tau = lo + int(np.argmin(cmnd[lo:hi]))
if cmnd[tau] > 0.6: # vraiment pas de periodicite -> non voisee
return 0.0
return sr / tau
def estimate_gender(arr: np.ndarray, sr: int) -> tuple[str, float]:
"""Estime le genre par F0 mediane (YIN par trame, numpy pur).
Voix parlee : H ~85-180 Hz (med ~120), F ~165-255 Hz (med ~210). Renvoie
("unknown", med) si la mediane tombe dans la zone ambigue 150-180 Hz -> on
prefere ecarter le locuteur que de mal le classer (assez de candidats).
"""
win = int(0.04 * sr)
hop = win // 2
lo = max(1, int(sr / 350)) # 350 Hz
hi = int(sr / 70) # 70 Hz
energy_thresh = 0.10 * np.sqrt(np.mean(arr ** 2) + 1e-9)
f0s: list[float] = []
for start in range(0, max(0, len(arr) - win), hop):
frame = arr[start:start + win].astype(np.float64)
if np.sqrt(np.mean(frame ** 2)) < energy_thresh:
continue
f0 = _yin_f0(frame - frame.mean(), sr, lo, hi)
if f0 > 0:
f0s.append(f0)
if len(f0s) < 10:
return "unknown", 0.0
med = float(np.median(f0s))
if 150 <= med <= 180:
return "unknown", med
return ("male" if med < 165 else "female"), med
def _iter_parquet_rows(dataset: str, shard: str):
"""Telecharge le shard parquet (audio embarque) et itere ses lignes en dict."""
from huggingface_hub import hf_hub_download
import pyarrow.parquet as pq
print(f" · telechargement {shard}", flush=True)
path = hf_hub_download(dataset, shard, repo_type="dataset",
revision="refs/convert/parquet")
pf = pq.ParquetFile(path)
for batch in pf.iter_batches(batch_size=128):
cols = {name: batch.column(name) for name in batch.schema.names}
for i in range(batch.num_rows):
yield {name: col[i].as_py() for name, col in cols.items()}
def _gather_voices(dataset, shards, min_dur, max_dur, max_lev, need_f, need_m):
"""Collecte des locuteurs distincts classes par genre (YIN), shard par shard.
S'arrete des que chaque genre a assez de candidats. Renvoie
{"female": [(spk, lev, bytes, text), ...trie par qualite], "male": [...]}.
"""
best: dict[object, dict] = {} # speaker_id -> meilleur clip vu
classified: dict[object, str] = {} # speaker_id -> gender (cache)
buckets = {"female": [], "male": []}
for shard in shards:
for row in _iter_parquet_rows(dataset, shard):
dur = row.get("duration") or 0.0
if not (min_dur <= dur <= max_dur):
continue
nwords = row.get("num_words") or 0
# Debit de parole : un ref_text qui ne couvre pas l'audio (fragment
# tronque, ou audio plein de silence) casse le clonage Qwen3 (sortie
# vide). On exige un debit plausible 1.5-4.5 mots/s.
wps = nwords / dur if dur else 0
if nwords < 8 or not (1.5 <= wps <= 4.5):
continue
lev = (row.get("levenshtein") or 0) / max(nwords, 1)
if lev > max_lev:
continue
spk = row.get("speaker_id")
text = (row.get("text") or "").strip()
if spk is None or len(text) < 15:
continue
cur = best.get(spk)
if cur is None or lev < cur["lev"]:
best[spk] = {"lev": lev, "bytes": row["audio"]["bytes"], "text": text}
# Classe les nouveaux locuteurs de ce shard.
for spk, c in best.items():
if spk in classified:
continue
arr, sr = sf.read(io.BytesIO(c["bytes"]), dtype="float32")
g, _ = estimate_gender(_to_mono(arr), sr)
classified[spk] = g
if g in buckets:
buckets[g].append((spk, c["lev"], c["bytes"], c["text"]))
nf, nm = len(buckets["female"]), len(buckets["male"])
print(f" -> {nf} femmes / {nm} hommes candidats", flush=True)
if nf >= need_f and nm >= need_m:
break
for g in buckets:
buckets[g].sort(key=lambda t: t[1]) # plus propre d'abord
return buckets
def _write_clip(vid: str, raw: bytes) -> tuple[str, int]:
arr, sr = sf.read(io.BytesIO(raw), dtype="float32")
arr = _to_mono(arr)
rel = f"clips/{vid}.wav"
sf.write(str(VOICEBANK_DIR / rel), arr, sr)
return rel, sr
def _entry(vid, gender, idx, spk, text, *, anonymous, label) -> VoiceEntry:
kokoro = _KOKORO_BY_GENDER[gender][(idx - 1) % len(_KOKORO_BY_GENDER[gender])]
rel, _ = _write_clip(vid, spk[2])
return VoiceEntry(id=vid, kokoro_voice=kokoro, gender=gender, age="adult",
lang="fr", label=label, ref_audio=rel, ref_text=text,
anonymous=anonymous)
def import_voices(*, dataset, shards, named_f, named_m, anon, min_dur, max_dur,
max_lev) -> Voicebank:
need_f = named_f + anon + 1 # +1 narrateur (feminin)
need_m = named_m + anon
print(f"Objectif : {need_f} femmes / {need_m} hommes (distincts).", flush=True)
buckets = _gather_voices(dataset, shards, min_dur, max_dur, max_lev, need_f, need_m)
fem, mal = buckets["female"], buckets["male"]
if len(fem) < need_f or len(mal) < need_m:
print(f"⚠ Pas assez de locuteurs (F {len(fem)}/{need_f}, H {len(mal)}/{need_m}) — "
"quotas reduits. Ajoute des shards via --shards.", flush=True)
named_f = min(named_f, max(0, len(fem) - anon - 1))
named_m = min(named_m, max(0, len(mal) - anon))
# Remplacement complet : on vide les clips existants.
clips = VOICEBANK_DIR / "clips"
clips.mkdir(parents=True, exist_ok=True)
for old in clips.glob("*.wav"):
old.unlink()
entries: list[VoiceEntry] = []
fi = mi = 0 # curseurs dans les buckets tries par qualite
# 1) Narrateur (1re voix feminine, la plus propre).
spk = fem[fi]; fi += 1
entries.append(_entry("fr_narrator", "female", 1, spk, spk[3],
anonymous=False, label="Narrateur (FR)"))
# 2) Voix nommees.
for i in range(1, named_f + 1):
spk = fem[fi]; fi += 1
entries.append(_entry(f"fr_f_{i}", "female", i, spk, spk[3],
anonymous=False, label=f"Voix F {i} (FR)"))
for i in range(1, named_m + 1):
spk = mal[mi]; mi += 1
entries.append(_entry(f"fr_m_{i}", "male", i, spk, spk[3],
anonymous=False, label=f"Voix H {i} (FR)"))
# 3) Voix anonymes (reservees aux figurants).
for i in range(1, anon + 1):
if fi >= len(fem):
break
spk = fem[fi]; fi += 1
entries.append(_entry(f"fr_anon_f_{i}", "female", i, spk, spk[3],
anonymous=True, label=f"Anonyme F {i} (FR)"))
for i in range(1, anon + 1):
if mi >= len(mal):
break
spk = mal[mi]; mi += 1
entries.append(_entry(f"fr_anon_m_{i}", "male", i, spk, spk[3],
anonymous=True, label=f"Anonyme H {i} (FR)"))
vb = Voicebank(entries=entries)
save_voicebank(vb)
na = sum(1 for e in entries if e.anonymous)
print(f"\n{len(entries)} voix → {VOICEBANK_DIR / 'metadata.json'}")
print(f" narrateur 1 · nommees {len(entries) - na - 1} · anonymes {na}")
for e in entries:
tag = " [anon]" if e.anonymous else ""
print(f" {e.id:14} {e.gender:6} kokoro={e.kokoro_voice}{tag}")
return vb
def main() -> None:
p = argparse.ArgumentParser(description=__doc__)
p.add_argument("--dataset", default="ylacombe/cml-tts")
p.add_argument("--shards", nargs="+", default=_DEFAULT_SHARDS,
help="Shards parquet a consommer dans l'ordre jusqu'aux quotas.")
p.add_argument("--named-f", type=int, default=18, help="Voix feminines nommees.")
p.add_argument("--named-m", type=int, default=14, help="Voix masculines nommees.")
p.add_argument("--anon", type=int, default=4, help="Voix anonymes par genre.")
p.add_argument("--min-dur", type=float, default=6.0)
p.add_argument("--max-dur", type=float, default=15.0)
p.add_argument("--max-lev", type=float, default=0.5,
help="Distance Levenshtein max par mot (qualite ; plus bas = plus propre).")
args = p.parse_args()
import_voices(dataset=args.dataset, shards=args.shards, named_f=args.named_f,
named_m=args.named_m, anon=args.anon, min_dur=args.min_dur,
max_dur=args.max_dur, max_lev=args.max_lev)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,201 @@
"""Tests purs : canonicalisation des noms variants + anonymes par genre/age.
`_canonicalize_speakers`, `_apply_anonymous_speakers` et `_anon_identity` sont
deterministes et testables sans Gemma ni disque (cf. test_incises.py).
"""
from __future__ import annotations
from inkflow.analysis.segmenter import (
_anon_identity,
_apply_anonymous_speakers,
_canonicalize_speakers,
_inversion_gender,
_resolve_anonymous_figurants,
)
from inkflow.models import Character, Incise, Segment, SegmentType
def _C(name, gender=None, age=None, aliases=None):
return Character(name=name, gender=gender, age=age, aliases=aliases or [])
def _D(text, speaker, incises=None):
return Segment(type=SegmentType.DIALOGUE, text=text, speaker=speaker,
incises=incises or [])
def _N(text="narration"):
return Segment(type=SegmentType.NARRATION, text=text, speaker="narrateur")
# --- Canonicalisation des variantes de noms ----------------------------------
def test_canon_variante_vers_canonique():
chars = [_C("Sagale"), _C("Elvi"), _C("Holden")]
segs = [_D("a", "Amiral Mehmet Sagale"), _D("b", "Elvi Okoye"),
_D("c", "Holden")]
_canonicalize_speakers(segs, chars)
assert [s.speaker for s in segs] == ["Sagale", "Elvi", "Holden"]
def test_canon_reciproque_forme_courte_vers_complete():
# Le cast porte le nom complet ; une surface courte distinctive s'y recolle.
chars = [_C("Elvi Okoye")]
segs = [_D("a", "Okoye")]
_canonicalize_speakers(segs, chars)
assert segs[0].speaker == "Elvi Okoye"
def test_canon_marine_unique_distinctif():
chars = [_C("Marine"), _C("Holden")]
segs = [_D("a", "Marine de gauche")]
_canonicalize_speakers(segs, chars)
assert segs[0].speaker == "Marine"
def test_canon_ambiguite_sabstient():
# Deux personnages partagent le token "marine" -> non distinctif -> abstention.
chars = [_C("Marine Lopez"), _C("Marine Cho")]
segs = [_D("a", "Marine de gauche")]
_canonicalize_speakers(segs, chars)
assert segs[0].speaker == "Marine de gauche" # inchange
def test_canon_inconnu_total_inchange():
chars = [_C("Holden"), _C("Kajri")]
segs = [_D("a", "Bob")]
_canonicalize_speakers(segs, chars)
assert segs[0].speaker == "Bob"
def test_canon_narrateur_et_inconnu_jamais_touches():
chars = [_C("Sagale")]
segs = [_N(), _D("a", "inconnu"), _D("b", "?")]
_canonicalize_speakers(segs, chars)
assert [s.speaker for s in segs] == ["narrateur", "inconnu", "?"]
def test_canon_idempotent():
chars = [_C("Sagale")]
segs = [_D("a", "Amiral Mehmet Sagale")]
_canonicalize_speakers(segs, chars)
once = segs[0].speaker
_canonicalize_speakers(segs, chars)
assert segs[0].speaker == once == "Sagale"
# --- Identite anonyme par (genre, age) ---------------------------------------
def test_anon_identity_format():
assert _anon_identity("male", "adult") == "anonyme (homme, adulte)"
assert _anon_identity("male", None) == "anonyme (homme)"
assert _anon_identity("female", None) == "anonyme (femme)"
assert _anon_identity(None, None) == "anonyme"
assert _anon_identity(None, "child") == "anonyme (enfant)"
def test_apply_anonymous_role_par_genre():
# "informa le soldat" -> anonyme (homme) ; renvoie le bucket avec genre/age.
t = "La réception commence, madame, informa le soldat."
inc = Incise(start=t.index("informa"), end=len(t))
segs = [_D(t, "inconnu", [inc])]
used = _apply_anonymous_speakers(segs, names={"Kajri"})
assert segs[0].speaker == "anonyme (homme)"
assert used == {"anonyme (homme)": ("male", None)}
def test_apply_anonymous_role_inconnu_genre():
# "une voix" : role sans genre fiable -> bucket generique "anonyme".
t = "Par ici, indiqua une voix."
inc = Incise(start=t.index("indiqua"), end=len(t))
segs = [_D(t, "inconnu", [inc])]
used = _apply_anonymous_speakers(segs, names=set())
assert segs[0].speaker == "anonyme"
assert used == {"anonyme": (None, None)}
def test_apply_anonymous_ignore_nom_propre():
# Incise a nom propre -> pas un anonyme, speaker inchange.
t = "Bonjour, lança Drummer."
inc = Incise(start=t.index("lança"), end=len(t))
segs = [_D(t, "Drummer", [inc])]
used = _apply_anonymous_speakers(segs, names={"Drummer"})
assert segs[0].speaker == "Drummer"
assert used == {}
# --- Rang/titre devant un nom propre -----------------------------------------
def test_rang_titre_capte_le_nom_propre():
# "dit l'amiral Sagale" : le rang n'est pas un anonyme, on capte "Sagale".
from inkflow.analysis.segmenter import detect_incises, incise_role, incise_speaker
t = "Dr Okoye, dit l'amiral Sagale."
inc = detect_incises(t, names={"Sagale"})[0]
assert incise_speaker(t, inc, {"Sagale"}) == "Sagale"
assert incise_role(t, inc, {"Sagale"}) is None
# --- Stabilite du nom canonique etabli ---------------------------------------
def test_reconcile_garde_nom_etabli_stable():
# Un nom deja dans le cast ("Sagale") n'est pas renomme par une forme plus
# longue trouvee dans un chapitre ("Amiral Mehmet Sagale") -> alias.
from inkflow.casting.dedup import reconcile_characters
book = [_C("Sagale", gender="male")]
found = [_C("Amiral Mehmet Sagale", gender="male")]
chars, _ = reconcile_characters(book, found, None)
sagale = next(c for c in chars if c.name == "Sagale")
assert "Amiral Mehmet Sagale" in sagale.aliases
def test_reconcile_nouveau_perso_garde_forme_complete():
# Sans nom etabli, le comportement reste "la forme la plus complete gagne".
from inkflow.casting.dedup import reconcile_characters
chars, _ = reconcile_characters([], [_C("Jim"), _C("Jim Holden")], None)
assert any(c.name == "Jim Holden" and "Jim" in c.aliases for c in chars)
# --- Figurants anonymes resolus via la narration adjacente -------------------
def test_inversion_gender():
assert _inversion_gender("Souhaitez-vous une escorte ? demanda-t-elle.") == "female"
assert _inversion_gender("Stop, dit-il.") == "male"
assert _inversion_gender("Je pars maintenant.") is None
def test_figurant_femme_via_narration_avant():
# Replique indeterminee + narration decrivant "La jeune marine" -> anonyme femme.
segs = [
_N("La jeune marine toucha quelque chose au poignet de son armure."),
_D("Prévenez-nous quand vous serez prête à ressortir.", "inconnu"),
]
used = _resolve_anonymous_figurants(segs)
assert segs[1].speaker == "anonyme (femme)"
assert "anonyme (femme)" in used
def test_figurant_genre_par_pronom_inversion_prioritaire():
# "demanda-t-elle" (féminin) prime, narration "Le soldat" -> on garde femme.
segs = [
_N("Le soldat s'avança vers eux."),
_D("Souhaitez-vous une escorte ? demanda-t-elle.", "?"),
]
_resolve_anonymous_figurants(segs)
assert segs[0].speaker == "narrateur"
assert segs[1].speaker == "anonyme (femme)"
def test_figurant_ne_touche_pas_les_resolus():
# Une replique deja attribuee n'est jamais ecrasee, meme avec narration de role.
segs = [
_N("Le soldat montait la garde."),
_D("J'arrive.", "Holden"),
]
_resolve_anonymous_figurants(segs)
assert segs[1].speaker == "Holden"
def test_figurant_sans_narration_de_role_inchange():
segs = [_N("La pièce était sombre."), _D("Qui est là ?", "inconnu")]
_resolve_anonymous_figurants(segs)
assert segs[1].speaker == "inconnu"

View File

@@ -6,7 +6,7 @@ parasite present dans la pensee).
"""
from __future__ import annotations
from inkflow.analysis.gemma import (
from inkflow.analysis.llm._text import (
_extract_json,
_has_complete_json,
_strip_reasoning,

View File

@@ -8,6 +8,7 @@ from __future__ import annotations
from inkflow.analysis.segmenter import (
detect_incises,
incise_role,
incise_speaker,
iter_incise_pieces,
)
@@ -202,3 +203,125 @@ def test_bornes_non_chevauchantes_et_triees():
assert all(incs[i].end <= incs[i + 1].start for i in range(len(incs) - 1))
for inc in incs:
assert 0 <= inc.start < inc.end <= len(text)
# --- Passe deterministe : reparation de l'alternance des tours ---------------
from inkflow.analysis.segmenter import _repair_alternation # noqa: E402
from inkflow.models import Incise, Segment, SegmentType # noqa: E402
def _D(text: str, speaker: str, incises=None) -> Segment:
return Segment(type=SegmentType.DIALOGUE, text=text, speaker=speaker,
incises=incises or [])
def _N(text: str = "narration") -> Segment:
return Segment(type=SegmentType.NARRATION, text=text, speaker="narrateur")
def _speakers(segments, sl):
return [segments[i].speaker for i in sl]
def test_alternance_corrige_doublons_de_tour():
# Echange a deux, le modele a double des tours (D,H,H) -> doit redevenir D,H,D.
segs = [
_N(),
_D("Je suis ravie.", "Drummer"),
_D("C'est moche.", "Holden"),
_D("Je ne devrais pas la ramener.", "Holden"), # erreur
_N(),
]
_repair_alternation(segs, names={"Drummer", "Holden"})
assert _speakers(segs, [1, 2, 3]) == ["Drummer", "Holden", "Drummer"]
def test_alternance_ancre_par_incise_nominale():
# Seed nominal en tete (compatit Holden) -> fixe la parite du motif.
t0 = "Toutes mes condoléances, compatit Holden."
seed = [Incise(start=t0.index("compatit"), end=len(t0))]
segs = [
_N(),
_D(t0, "Holden", seed),
_D("Merci.", "Kajri"),
_D("Nous n'avons pas été présentés.", "Kajri"), # erreur
_D("James Holden.", "Holden"), # erreur
_D("Ah, croustillant.", "Kajri"), # erreur
_N(),
]
_repair_alternation(segs, names={"Holden", "Kajri"})
assert _speakers(segs, [1, 2, 3, 4, 5]) == [
"Holden", "Kajri", "Holden", "Kajri", "Holden"]
def test_alternance_trois_locuteurs_ancres_sabstient():
# Un 3e locuteur (meme via incise) dans le run -> pas d'alternance binaire forcee.
ta = "Ça satisfait, disait Bobbie."
tb = "Oui, convint Naomi."
tc = "Avec des jeunes, précisa Alex."
segs = [
_N(),
_D(ta, "Bobbie", [Incise(start=ta.index("disait"), end=len(ta))]),
_D(tb, "Naomi", [Incise(start=tb.index("convint"), end=len(tb))]),
_D(tc, "Alex", [Incise(start=tc.index("précisa"), end=len(tc))]),
_N(),
]
_repair_alternation(segs, names={"Bobbie", "Naomi", "Alex"})
assert _speakers(segs, [1, 2, 3]) == ["Bobbie", "Naomi", "Alex"]
def test_alternance_run_deja_correct_inchange():
segs = [_N(), _D("a", "Holden"), _D("b", "Kajri"),
_D("c", "Holden"), _D("d", "Kajri"), _N()]
before = _speakers(segs, [1, 2, 3, 4])
_repair_alternation(segs, names={"Holden", "Kajri"})
assert _speakers(segs, [1, 2, 3, 4]) == before
def test_alternance_trois_locuteurs_sabstient():
# 3 locuteurs distincts dans le run -> pas d'alternance binaire, on ne touche pas.
segs = [_N(), _D("a", "Holden"), _D("b", "Kajri"),
_D("c", "Drummer"), _N()]
_repair_alternation(segs, names={"Holden", "Kajri", "Drummer"})
assert _speakers(segs, [1, 2, 3]) == ["Holden", "Kajri", "Drummer"]
def test_alternance_narration_intercalee_rompt_le_run():
# STRICT (GAP=0) : toute narration entre deux repliques coupe le run, car
# elle peut porter une continuation du meme locuteur (cf. ch06). On ne force
# donc PAS l'alternance a travers une narration.
segs = [_N(), _D("a", "Drummer"), _N("il marqua une pause"),
_D("b", "Holden"), _D("c", "Holden"), _N()]
_repair_alternation(segs, names={"Holden", "Drummer"})
# Le run effectif est [b, c] (consecutifs) : 1 seul locuteur resolu -> abstention.
assert _speakers(segs, [1, 3, 4]) == ["Drummer", "Holden", "Holden"]
def test_incise_role_renvoie_le_nom_de_role():
# "informa le soldat" : pas un locuteur NOMME, mais un role identifiable.
text = "La réception commence, madame, informa le soldat."
inc = detect_incises(text, names=NAMES)[0]
assert incise_speaker(text, inc, NAMES) is None # pas de nom propre
assert incise_role(text, inc, NAMES) == "soldat" # role detecte
# Un nom propre n'est pas un role.
text2 = "Bonjour, lança Drummer."
inc2 = detect_incises(text2, names=set())[0]
assert incise_role(text2, inc2, set()) is None
def test_alternance_seed_contradictoire_sabstient():
# Deux seeds nominaux contradictoires avec toute alternance -> abstention.
ta = "Bonjour, dit Holden."
tb = "Salut, répondit Holden."
segs = [
_N(),
_D(ta, "Holden", [Incise(start=ta.index("dit"), end=len(ta))]),
_D("Entre les deux.", "Kajri"),
_D(tb, "Holden", [Incise(start=tb.index("répondit"), end=len(tb))]),
_N(),
]
# Motif alterne impossible (Holden en 0 et 2 exige Kajri en 1, OK en fait) :
# ici l'alternance H,K,H EST coherente avec les deux ancres -> applique.
_repair_alternation(segs, names={"Holden", "Kajri"})
assert _speakers(segs, [1, 2, 3]) == ["Holden", "Kajri", "Holden"]

View File

@@ -0,0 +1,147 @@
"""Tests du backend LM Studio (sans reseau ni paquet openai installe).
On injecte un faux module `openai` dans sys.modules : le backend l'importe
paresseusement, on peut donc valider la construction des messages, le parsing de
la reponse (content + reasoning_content), le streaming et l'erreur de connexion
sans dependance ni serveur.
"""
from __future__ import annotations
import sys
import types
from types import SimpleNamespace
import pytest
import inkflow.analysis.llm.lmstudio_backend as lm
from inkflow.analysis.llm._text import _extract_json, _strip_reasoning
from inkflow.analysis.llm.lmstudio_backend import LMStudioBackend
class _FakeAPIConnectionError(Exception):
pass
@pytest.fixture(autouse=True)
def fake_openai(monkeypatch):
"""Faux module openai (APIConnectionError + OpenAI) injecte dans sys.modules."""
mod = types.ModuleType("openai")
mod.APIConnectionError = _FakeAPIConnectionError
mod.OpenAI = lambda **kw: None # jamais utilise (on injecte _client a la main)
monkeypatch.setitem(sys.modules, "openai", mod)
return mod
@pytest.fixture(autouse=True)
def settings(monkeypatch):
"""Reglages controles (defaut : delegation a LM Studio) sans lire le disque."""
state = SimpleNamespace(lmstudio_defer_config=True,
lmstudio_base_url="http://127.0.0.1:1234/v1")
monkeypatch.setattr(lm, "get_settings", lambda: state)
return state
def _message(content, reasoning=None):
msg = SimpleNamespace(content=content, reasoning_content=reasoning)
return SimpleNamespace(choices=[SimpleNamespace(message=msg)])
class _FakeCompletions:
"""Capture les kwargs et renvoie une reponse (ou leve) preprogrammee."""
def __init__(self, *, response=None, stream=None, raises=None):
self.response, self.stream, self.raises = response, stream, raises
self.kwargs = None
def create(self, **kwargs):
self.kwargs = kwargs
if self.raises is not None:
raise self.raises
return self.stream if kwargs.get("stream") else self.response
def _client(completions):
return SimpleNamespace(chat=SimpleNamespace(completions=completions))
def _backend(completions, *, model="m"):
b = LMStudioBackend(model)
b._client = _client(completions) # court-circuite _ensure_client (pas d'openai reel)
return b
def test_non_stream_content_delegue_la_config(settings):
# Par defaut on DELEGUE a LM Studio : ni temperature ni max_tokens imposes
# (sinon on tronquait la reponse / on ecrasait la config du modele).
comp = _FakeCompletions(response=_message('{"speaker": "Marie"}'))
b = _backend(comp)
out = b.complete(
[{"role": "system", "content": "sys"}, {"role": "user", "content": "u"}],
max_tokens=128, temperature=0.1, reasoning=False)
assert _extract_json(out) == {"speaker": "Marie"}
assert comp.kwargs["model"] == "m"
assert comp.kwargs["messages"][0]["role"] == "system"
assert "temperature" not in comp.kwargs # delegue a LM Studio
assert "max_tokens" not in comp.kwargs
def test_non_stream_params_imposes_si_delegation_off(settings):
# lmstudio_defer_config=False -> on reimpose les reglages InkFlow.
settings.lmstudio_defer_config = False
comp = _FakeCompletions(response=_message('{"speaker": "Marie"}'))
b = _backend(comp)
b.complete([{"role": "user", "content": "u"}],
max_tokens=128, temperature=0.1, reasoning=False)
assert comp.kwargs["temperature"] == 0.1
assert comp.kwargs["max_tokens"] == 128
def test_reasoning_content_exclu_du_retour():
# LM Studio separe la pensee (reasoning_content) de la reponse (content,
# propre). Le retour ne doit contenir QUE content : un JSON d'exemple present
# dans la pensee ne doit pas etre capte a la place de la vraie reponse.
comp = _FakeCompletions(
response=_message('{"capitale": "Paris"}',
reasoning='exemple parasite: {"capitale": "Londres"}'))
b = _backend(comp)
out = b.complete([{"role": "user", "content": "u"}],
max_tokens=128, temperature=0.0, reasoning=False)
assert _extract_json(out) == {"capitale": "Paris"}
assert "parasite" not in out
def test_streaming_token_sink():
def _delta(content=None, reasoning=None):
return SimpleNamespace(choices=[SimpleNamespace(
delta=SimpleNamespace(content=content, reasoning_content=reasoning))])
chunks = [_delta(reasoning="je pense "), _delta(content='{"a"'), _delta(content=": 1}")]
comp = _FakeCompletions(stream=iter(chunks))
b = _backend(comp)
seen = []
out = b.complete([{"role": "user", "content": "u"}], max_tokens=64,
temperature=0.1, reasoning=False, token_sink=seen.append)
assert comp.kwargs["stream"] is True
assert _extract_json(out) == {"a": 1}
assert "je pense" not in out # la pensee est exclue du retour
assert "je pense" in "".join(seen) # mais diffusee au sink (affichage)
def test_erreur_connexion_message_clair():
comp = _FakeCompletions(raises=_FakeAPIConnectionError("refused"))
b = _backend(comp)
with pytest.raises(RuntimeError) as exc:
b.complete([{"role": "user", "content": "u"}], max_tokens=64,
temperature=0.1, reasoning=False)
assert "LM Studio injoignable" in str(exc.value)
def test_resolve_modele_actif_si_ref_vide():
comp = _FakeCompletions(response=_message("{}"))
client = _client(comp)
client.models = SimpleNamespace(
list=lambda: SimpleNamespace(data=[SimpleNamespace(id="gemma-4")]))
b = LMStudioBackend("") # ref vide -> doit prendre le 1er modele charge
b._client = client
b.complete([{"role": "user", "content": "u"}], max_tokens=64,
temperature=0.1, reasoning=False)
assert comp.kwargs["model"] == "gemma-4"