Initial commit: InkFlow — EPUB vers livre audio local (MLX/Kokoro)
This commit is contained in:
0
backend/inkflow/audio/__init__.py
Normal file
0
backend/inkflow/audio/__init__.py
Normal file
125
backend/inkflow/audio/postprocess.py
Normal file
125
backend/inkflow/audio/postprocess.py
Normal file
@@ -0,0 +1,125 @@
|
||||
"""Assemblage audio final : concat -> normalisation -> WAV -> MP3 taggue.
|
||||
|
||||
Pas de pydub (casse en Python 3.13) : concat/normalisation en numpy, encodage
|
||||
mp3 + cover via ffmpeg CLI, tags via les metadonnees ffmpeg.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import shutil
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import numpy as np
|
||||
import soundfile as sf
|
||||
|
||||
from ..settings import get_settings
|
||||
|
||||
|
||||
def _resample(audio: np.ndarray, src_sr: int, dst_sr: int) -> np.ndarray:
|
||||
if src_sr == dst_sr or audio.size == 0:
|
||||
return audio
|
||||
duration = audio.size / src_sr
|
||||
n_dst = int(round(duration * dst_sr))
|
||||
x_src = np.linspace(0.0, duration, num=audio.size, endpoint=False)
|
||||
x_dst = np.linspace(0.0, duration, num=n_dst, endpoint=False)
|
||||
return np.interp(x_dst, x_src, audio).astype(np.float32)
|
||||
|
||||
|
||||
def silence(seconds: float, sr: int) -> np.ndarray:
|
||||
return np.zeros(int(seconds * sr), dtype=np.float32)
|
||||
|
||||
|
||||
def concat_segments(
|
||||
parts: list[tuple[np.ndarray, int]],
|
||||
*,
|
||||
target_sr: Optional[int] = None,
|
||||
gap_seconds: float = 0.35,
|
||||
intra_gap_seconds: float = 0.12,
|
||||
glued: Optional[list[bool]] = None,
|
||||
) -> tuple[np.ndarray, int]:
|
||||
"""Concatene des segments (audio, sr) avec un silence entre chacun.
|
||||
|
||||
`glued[i] == True` (ex: une incise et sa replique, issues du meme paragraphe)
|
||||
insere un silence court `intra_gap_seconds` au lieu de `gap_seconds`.
|
||||
"""
|
||||
if target_sr is None:
|
||||
target_sr = get_settings().target_sample_rate
|
||||
gap = silence(gap_seconds, target_sr)
|
||||
intra_gap = silence(intra_gap_seconds, target_sr)
|
||||
buf: list[np.ndarray] = []
|
||||
first = True
|
||||
for i, (audio, sr) in enumerate(parts):
|
||||
if audio is None or audio.size == 0:
|
||||
continue
|
||||
if not first:
|
||||
use_intra = glued is not None and i < len(glued) and glued[i]
|
||||
buf.append(intra_gap if use_intra else gap)
|
||||
first = False
|
||||
buf.append(_resample(np.asarray(audio, dtype=np.float32), sr, target_sr))
|
||||
if not buf:
|
||||
return np.zeros(0, dtype=np.float32), target_sr
|
||||
return np.concatenate(buf), target_sr
|
||||
|
||||
|
||||
def normalize_loudness(audio: np.ndarray, target_dbfs: Optional[float] = None) -> np.ndarray:
|
||||
"""Normalise le niveau RMS vers target_dbfs, avec garde anti-saturation."""
|
||||
if audio.size == 0:
|
||||
return audio
|
||||
if target_dbfs is None:
|
||||
target_dbfs = get_settings().target_dbfs
|
||||
rms = float(np.sqrt(np.mean(audio.astype(np.float64) ** 2)))
|
||||
if rms < 1e-6:
|
||||
return audio
|
||||
current_dbfs = 20.0 * np.log10(rms)
|
||||
gain = 10.0 ** ((target_dbfs - current_dbfs) / 20.0)
|
||||
out = audio * gain
|
||||
peak = float(np.max(np.abs(out))) if out.size else 0.0
|
||||
if peak > 0.99: # limiteur simple pour eviter le clipping
|
||||
out *= 0.99 / peak
|
||||
return out.astype(np.float32)
|
||||
|
||||
|
||||
def write_wav(path: str | Path, audio: np.ndarray, sr: int) -> Path:
|
||||
path = Path(path)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
sf.write(str(path), audio, sr)
|
||||
return path
|
||||
|
||||
|
||||
def encode_mp3(
|
||||
wav_path: str | Path,
|
||||
mp3_path: str | Path,
|
||||
*,
|
||||
bitrate: Optional[str] = None,
|
||||
title: Optional[str] = None,
|
||||
album: Optional[str] = None,
|
||||
artist: Optional[str] = None,
|
||||
track: Optional[int] = None,
|
||||
cover_path: Optional[str | Path] = None,
|
||||
) -> Path:
|
||||
"""Encode un WAV en MP3 (ffmpeg) avec tags ID3 et cover optionnelle."""
|
||||
if bitrate is None:
|
||||
bitrate = get_settings().mp3_bitrate
|
||||
if not shutil.which("ffmpeg"):
|
||||
raise RuntimeError("ffmpeg introuvable — brew install ffmpeg")
|
||||
wav_path, mp3_path = Path(wav_path), Path(mp3_path)
|
||||
mp3_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
cmd = ["ffmpeg", "-y", "-i", str(wav_path)]
|
||||
has_cover = cover_path and Path(cover_path).exists()
|
||||
if has_cover:
|
||||
cmd += ["-i", str(cover_path), "-map", "0:a", "-map", "1:v",
|
||||
"-c:v", "mjpeg", "-disposition:v", "attached_pic"]
|
||||
cmd += ["-c:a", "libmp3lame", "-b:a", bitrate]
|
||||
|
||||
meta = {"title": title, "album": album, "artist": artist}
|
||||
if track is not None:
|
||||
meta["track"] = str(track)
|
||||
for key, val in meta.items():
|
||||
if val:
|
||||
cmd += ["-metadata", f"{key}={val}"]
|
||||
cmd += ["-id3v2_version", "3", str(mp3_path)]
|
||||
|
||||
subprocess.run(cmd, check=True, capture_output=True)
|
||||
return mp3_path
|
||||
Reference in New Issue
Block a user