Files

48 lines
1.5 KiB
Python

"""Gestionnaire de connexions WebSocket avec diffusion thread-safe.
L'orchestrateur tourne dans un thread worker ; il appelle `broadcast_threadsafe`
qui replanifie l'envoi sur la boucle asyncio de l'API.
"""
from __future__ import annotations
import asyncio
from collections import defaultdict
from typing import Optional
from fastapi import WebSocket
class ConnectionManager:
def __init__(self) -> None:
self.active: dict[str, set[WebSocket]] = defaultdict(set)
self._loop: Optional[asyncio.AbstractEventLoop] = None
def bind_loop(self, loop: asyncio.AbstractEventLoop) -> None:
self._loop = loop
async def connect(self, slug: str, ws: WebSocket) -> None:
await ws.accept()
self.active[slug].add(ws)
def disconnect(self, slug: str, ws: WebSocket) -> None:
self.active[slug].discard(ws)
def broadcast_threadsafe(self, slug: str, data: dict) -> None:
"""Appelable depuis n'importe quel thread (worker orchestrateur)."""
if self._loop is None:
return
self._loop.call_soon_threadsafe(self._dispatch, slug, data)
def _dispatch(self, slug: str, data: dict) -> None:
for ws in list(self.active.get(slug, ())):
asyncio.create_task(self._safe_send(slug, ws, data))
async def _safe_send(self, slug: str, ws: WebSocket, data: dict) -> None:
try:
await ws.send_json({"type": "state", "state": data})
except Exception: # noqa: BLE001 — connexion fermee
self.disconnect(slug, ws)
manager = ConnectionManager()