48 lines
1.5 KiB
Python
48 lines
1.5 KiB
Python
"""Gestionnaire de connexions WebSocket avec diffusion thread-safe.
|
|
|
|
L'orchestrateur tourne dans un thread worker ; il appelle `broadcast_threadsafe`
|
|
qui replanifie l'envoi sur la boucle asyncio de l'API.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from collections import defaultdict
|
|
from typing import Optional
|
|
|
|
from fastapi import WebSocket
|
|
|
|
|
|
class ConnectionManager:
|
|
def __init__(self) -> None:
|
|
self.active: dict[str, set[WebSocket]] = defaultdict(set)
|
|
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
|
|
|
def bind_loop(self, loop: asyncio.AbstractEventLoop) -> None:
|
|
self._loop = loop
|
|
|
|
async def connect(self, slug: str, ws: WebSocket) -> None:
|
|
await ws.accept()
|
|
self.active[slug].add(ws)
|
|
|
|
def disconnect(self, slug: str, ws: WebSocket) -> None:
|
|
self.active[slug].discard(ws)
|
|
|
|
def broadcast_threadsafe(self, slug: str, data: dict) -> None:
|
|
"""Appelable depuis n'importe quel thread (worker orchestrateur)."""
|
|
if self._loop is None:
|
|
return
|
|
self._loop.call_soon_threadsafe(self._dispatch, slug, data)
|
|
|
|
def _dispatch(self, slug: str, data: dict) -> None:
|
|
for ws in list(self.active.get(slug, ())):
|
|
asyncio.create_task(self._safe_send(slug, ws, data))
|
|
|
|
async def _safe_send(self, slug: str, ws: WebSocket, data: dict) -> None:
|
|
try:
|
|
await ws.send_json({"type": "state", "state": data})
|
|
except Exception: # noqa: BLE001 — connexion fermee
|
|
self.disconnect(slug, ws)
|
|
|
|
|
|
manager = ConnectionManager()
|