""" Session store — in-memory registry of all active sessions. Keyed by session token. Also holds the associated SessionLoop and MessageQueue per session. """ import logging import secrets from dataclasses import dataclass, field from typing import Optional from fellowship.core.session import Session from fellowship.core.loop import SessionLoop from fellowship.core.queue import MessageQueue logger = logging.getLogger(__name__) @dataclass class SessionEntry: session: Session loop: SessionLoop queue: MessageQueue class SessionStore: def __init__(self) -> None: self._sessions: dict[str, SessionEntry] = {} def create(self, session: Session, loop: SessionLoop, queue: MessageQueue) -> str: """Register a new session. Returns the session token.""" self._sessions[session.token] = SessionEntry(session, loop, queue) return session.token def get(self, token: str) -> Optional[SessionEntry]: """Return the SessionEntry for the given token, or None if not found.""" return self._sessions.get(token) def remove(self, token: str) -> None: """Remove a session from the store.""" self._sessions.pop(token, None) def generate_token(self) -> str: """Generate a cryptographically random session token.""" return secrets.token_urlsafe(32) def all_tokens(self) -> list[str]: return list(self._sessions.keys()) # Global singleton — imported by routes and other modules session_store = SessionStore()