""" Connection hub — manages all WebSocket and SSE connections for a session. Broadcasts events to every connected member. Multiple talkers and unlimited observers can be connected simultaneously. """ import logging from dataclasses import dataclass, field from typing import Literal from fastapi import WebSocket from pydantic import BaseModel logger = logging.getLogger(__name__) @dataclass class ConnectedMember: websocket: WebSocket role: Literal["talker", "observer"] talker_id: str | None = None # Only set for talkers talker_name: str | None = None # Only set for talkers class ConnectionHub: def __init__(self, session_token: str) -> None: self.session_token = session_token self._members: list[ConnectedMember] = [] async def connect(self, member: ConnectedMember) -> None: """Accept and register a new WebSocket connection.""" await member.websocket.accept() self._members.append(member) logger.info("[%s] member connected role=%s", self.session_token[:8], member.role) def disconnect(self, websocket: WebSocket) -> None: """Remove a disconnected WebSocket from the member list.""" self._members = [m for m in self._members if m.websocket is not websocket] async def broadcast(self, event: BaseModel) -> None: """Send an event to all connected members.""" raise NotImplementedError async def send_to(self, websocket: WebSocket, event: BaseModel) -> None: """Send an event to a single connection.""" raise NotImplementedError def talker_count(self) -> int: return sum(1 for m in self._members if m.role == "talker") def observer_count(self) -> int: return sum(1 for m in self._members if m.role == "observer") def member_count(self) -> int: return len(self._members) # Global registry of hubs — one per active session, keyed by session token _hubs: dict[str, ConnectionHub] = {} def get_hub(session_token: str) -> ConnectionHub: """Get or create the ConnectionHub for a session.""" if session_token not in _hubs: _hubs[session_token] = ConnectionHub(session_token) return _hubs[session_token] def remove_hub(session_token: str) -> None: """Remove the hub when a session ends.""" _hubs.pop(session_token, None)