Initial project structure

Scaffold all modules, route stubs, data models, and config.
No logic implemented yet — all core methods raise NotImplementedError.
Establishes the full directory layout matching the architecture in CLAUDE.md.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Jaroslav Benes
2026-04-08 14:48:48 +02:00
commit 083cbb1fa7
32 changed files with 1507 additions and 0 deletions

0
fellowship/__init__.py Normal file
View File

View File

83
fellowship/api/events.py Normal file
View File

@@ -0,0 +1,83 @@
"""
All WebSocket/SSE event types sent from Fellowship to connected members.
Every event is a Pydantic model serialized to JSON.
"""
from typing import Any, Literal
from pydantic import BaseModel
class HistoryEvent(BaseModel):
type: Literal["history"] = "history"
messages: list[dict[str, Any]]
class TurnStartEvent(BaseModel):
type: Literal["turn_start"] = "turn_start"
bot: str
turn: int
class BotMessageEvent(BaseModel):
type: Literal["bot_message"] = "bot_message"
bot: str
content: str
turn: int
class TokenEvent(BaseModel):
"""Only emitted when stream_tokens is enabled."""
type: Literal["token"] = "token"
bot: str
token: str
turn: int
class TurnEndEvent(BaseModel):
type: Literal["turn_end"] = "turn_end"
bot: str
turn: int
tokens: int
class TalkerMessageEvent(BaseModel):
type: Literal["talker_message"] = "talker_message"
talker_id: str
talker_name: str
content: str
turn: int
class MemberJoinedEvent(BaseModel):
type: Literal["member_joined"] = "member_joined"
role: Literal["talker", "observer"]
class MemberLeftEvent(BaseModel):
type: Literal["member_left"] = "member_left"
role: Literal["talker", "observer"]
class SessionPausedEvent(BaseModel):
type: Literal["session_paused"] = "session_paused"
class SessionResumedEvent(BaseModel):
type: Literal["session_resumed"] = "session_resumed"
class SessionEndEvent(BaseModel):
type: Literal["session_end"] = "session_end"
reason: Literal["max_turns", "max_time", "max_context", "orchestrator", "client_request"]
class ErrorEvent(BaseModel):
type: Literal["error"] = "error"
message: str
class DebugEvent(BaseModel):
"""Only emitted when debug mode is active."""
type: Literal["debug"] = "debug"
category: str
data: dict[str, Any]

View File

View File

@@ -0,0 +1,69 @@
"""
Pydantic request and response models for the session API endpoints.
"""
from typing import Optional
from pydantic import BaseModel, Field
from fellowship.core.session import (
BotConfig,
ParticipationMode,
TurnOrder,
SessionState,
)
class CreateSessionRequest(BaseModel):
bots: list[BotConfig] = Field(description="List of bots in this session")
global_system_prompt: Optional[str] = Field(
default=None, description="System prompt injected for all bots"
)
goal: Optional[str] = Field(
default=None,
description="Natural language goal. Required for orchestrator end_session tool to be available.",
)
participation_mode: ParticipationMode = Field(
default=ParticipationMode.AUTONOMOUS,
description="How human talkers interact with the session",
)
turn_order: TurnOrder = Field(
default=TurnOrder.ROUND_ROBIN,
description="How the next bot speaker is selected",
)
max_talkers: int = Field(default=1, description="Maximum simultaneous talker connections")
max_turns: Optional[int] = Field(default=None, description="End session after N bot turns")
max_time: Optional[int] = Field(default=None, description="End session after N seconds")
max_context_tokens: Optional[int] = Field(
default=None, description="End or summarize when total context reaches N tokens"
)
rectify_history: bool = Field(default=True, description="Enable history rectification")
summarize_context: bool = Field(
default=False, description="Summarize old context instead of ending when limit is reached"
)
stream_tokens: bool = Field(default=False, description="Stream bot responses token-by-token")
llm_base_url: Optional[str] = Field(
default=None, description="Override LLM backend URL for this session"
)
llm_api_key: Optional[str] = Field(
default=None, description="Override LLM API key for this session"
)
debug: Optional[bool] = Field(
default=None, description="Override debug mode for this session"
)
class CreateSessionResponse(BaseModel):
token: str = Field(description="Session token used for all subsequent interactions")
state: SessionState
bot_count: int
class SessionStatusResponse(BaseModel):
token: str
state: SessionState
bot_count: int
turn_count: int
talker_count: int
observer_count: int
participation_mode: ParticipationMode
turn_order: TurnOrder

View File

View File

@@ -0,0 +1,94 @@
"""
Session API routes. Route handlers are thin — they validate input and delegate to core/.
All routes are mounted under /v1 in main.py.
"""
import logging
from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.responses import StreamingResponse
from fellowship.api.models.session import (
CreateSessionRequest,
CreateSessionResponse,
SessionStatusResponse,
)
logger = logging.getLogger(__name__)
router = APIRouter(tags=["sessions"])
@router.post(
"/session/create",
response_model=CreateSessionResponse,
summary="Initialize a new session",
description="Create a new Fellowship session with the given bots and options. Returns a session token.",
)
async def create_session(body: CreateSessionRequest) -> CreateSessionResponse:
raise NotImplementedError
@router.get(
"/session/{token}",
response_model=SessionStatusResponse,
summary="Get session status",
description="Returns current state, turn count, and connected member counts for a session.",
)
async def get_session(token: str) -> SessionStatusResponse:
raise NotImplementedError
@router.delete(
"/session/{token}",
summary="End a session",
description="Terminates the session loop and disconnects all members.",
)
async def end_session(token: str) -> dict[str, str]:
raise NotImplementedError
@router.post(
"/session/{token}/pause",
summary="Pause a session",
description="Halts the session loop. Members remain connected and receive a session_paused event.",
)
async def pause_session(token: str) -> dict[str, str]:
raise NotImplementedError
@router.post(
"/session/{token}/resume",
summary="Resume a paused session",
description="Restarts the session loop. Members receive a session_resumed event.",
)
async def resume_session(token: str) -> dict[str, str]:
raise NotImplementedError
@router.get(
"/session/{token}/history",
summary="Get full conversation history",
description="Returns the complete ordered message log for the session.",
)
async def get_history(token: str) -> dict:
raise NotImplementedError
@router.websocket("/session/{token}/connect")
async def websocket_connect(websocket: WebSocket, token: str, role: str = "observer") -> None:
"""
WebSocket connection for talkers (role=talker) and observers (role=observer).
On connect: sends a history event with the full conversation, then streams live events.
Talkers may send user_message and ping frames.
"""
raise NotImplementedError
@router.get(
"/session/{token}/stream",
summary="SSE observe-only stream",
description="Server-Sent Events stream for observers. Sends history replay then live events.",
)
async def sse_stream(token: str) -> StreamingResponse:
raise NotImplementedError

29
fellowship/config.py Normal file
View File

@@ -0,0 +1,29 @@
"""
Server-wide configuration loaded from .env via Pydantic Settings.
All modules import `settings` from here — never read env vars directly.
"""
from pydantic import Field
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# LLM backend
llm_base_url: str = Field(description="OpenAI-compatible LLM backend base URL")
llm_api_key: str = Field(default="not-needed", description="API key for the LLM backend")
# Default models (can be overridden per-session or per-bot)
default_bot_model: str = Field(description="Default model used for bot turns")
default_orchestrator_model: str = Field(description="Model used for orchestrator calls")
# Server limits
max_bots_per_session: int = Field(default=10, description="Hard cap on bots per session")
session_ttl_default: int = Field(default=3600, description="Default idle TTL in seconds")
# Debug
debug: bool = Field(default=False, description="Enable debug mode server-wide")
model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}
settings = Settings() # type: ignore[call-arg]

View File

View File

@@ -0,0 +1,42 @@
"""
Context manager — assembles the conversation history for a bot's prompt.
Ensures bots only see messages, never foreign system prompts.
Handles context summarization when enabled.
"""
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from fellowship.core.session import Session, BotConfig
logger = logging.getLogger(__name__)
class ContextManager:
def __init__(self, session: "Session") -> None:
self.session = session
def build_context(self, bot: "BotConfig") -> list[dict]:
"""
Return the conversation history formatted as LLM messages for the given bot.
Uses shared_context or scoped_context based on session options.
Skips any slots with content=None (reserved but not yet filled).
"""
raise NotImplementedError
def estimate_tokens(self, messages: list[dict]) -> int:
"""
Estimate total token count for a list of messages.
Used to check against max_context_tokens.
"""
raise NotImplementedError
async def summarize(self) -> None:
"""
Summarize the older portion of history to reduce context size.
Retains a recent tail of messages intact.
Stores the summary in session state; future context builds use summary + tail.
Full history list is preserved unchanged.
"""
raise NotImplementedError

66
fellowship/core/loop.py Normal file
View File

@@ -0,0 +1,66 @@
"""
Session loop — the core driver for each active session.
One SessionLoop instance runs per session as an asyncio Task.
Never dispatches two LLM calls simultaneously.
Starts immediately for autonomous mode; waits for first talker message otherwise.
"""
import asyncio
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from fellowship.core.session import Session
logger = logging.getLogger(__name__)
class SessionLoop:
def __init__(self, session: "Session") -> None:
self.session = session
self._task: asyncio.Task | None = None
self._pause_event = asyncio.Event()
self._pause_event.set() # Not paused by default
def start(self) -> None:
"""Start the loop as a background asyncio Task."""
self._task = asyncio.create_task(self._run(), name=f"loop-{self.session.token[:8]}")
def stop(self) -> None:
"""Cancel the loop task."""
if self._task:
self._task.cancel()
def pause(self) -> None:
"""Pause the loop. Clears the internal event so the loop blocks."""
self._pause_event.clear()
def resume(self) -> None:
"""Resume a paused loop."""
self._pause_event.set()
async def _run(self) -> None:
"""
Main loop body. Runs until the session ends or the task is cancelled.
Each iteration:
1. Wait if paused.
2. Check for pending talker messages (all modes).
3. Determine next bot speaker (round_robin or orchestrator).
4. Execute bot turn.
5. Check end conditions.
"""
raise NotImplementedError
async def _handle_talker_message(self) -> bool:
"""
Drain one message from the talker queue into history.
Returns True if a message was processed.
"""
raise NotImplementedError
async def _check_end_conditions(self) -> bool:
"""
Check all configured end conditions (max_turns, max_time, max_context_tokens).
Returns True if the session should end.
"""
raise NotImplementedError

View File

@@ -0,0 +1,92 @@
"""
Orchestrator — stateless LLM call that selects the next speaker or ends the session.
Called fresh each turn when turn_order is ORCHESTRATED.
Output is a tool call only; any text is discarded.
"""
import logging
from dataclasses import dataclass
from typing import Literal, Optional, TYPE_CHECKING
if TYPE_CHECKING:
from fellowship.core.session import Session
logger = logging.getLogger(__name__)
@dataclass
class OrchestratorDecision:
action: Literal["select_speaker", "hold", "end_session"]
bot_name: Optional[str] = None # set when action == "select_speaker"
reason: Optional[str] = None # set when action == "end_session"
# Tool definitions sent to the LLM with every orchestrator call
ORCHESTRATOR_TOOLS = [
{
"type": "function",
"function": {
"name": "select_speaker",
"description": "Choose which bot should speak next.",
"parameters": {
"type": "object",
"properties": {
"bot_name": {"type": "string", "description": "Name of the bot to speak next"},
},
"required": ["bot_name"],
},
},
},
{
"type": "function",
"function": {
"name": "hold",
"description": (
"Do not prompt any bot this turn. "
"Use when the conversation implies bots should stay silent."
),
"parameters": {"type": "object", "properties": {}},
},
},
{
"type": "function",
"function": {
"name": "end_session",
"description": "End the session. Only use when the session goal has been reached.",
"parameters": {
"type": "object",
"properties": {
"reason": {"type": "string", "description": "Why the session is ending"},
},
"required": ["reason"],
},
},
},
]
class Orchestrator:
def __init__(self, session: "Session") -> None:
self.session = session
async def decide(self) -> OrchestratorDecision:
"""
Build the orchestrator prompt, call the LLM, parse the tool call response.
Any text output from the LLM is ignored — only the tool call matters.
"""
raise NotImplementedError
def _build_system_prompt(self) -> str:
"""
Build the orchestrator system prompt including:
- Its role and instructions
- Overview of how Fellowship works
- Full bot roster (names, roles, system prompts)
- Session goal (if set)
- Instruction to always respond with a tool call
"""
raise NotImplementedError
def _parse_tool_call(self, response: dict) -> OrchestratorDecision:
"""Parse the LLM tool call response into an OrchestratorDecision."""
raise NotImplementedError

43
fellowship/core/queue.py Normal file
View File

@@ -0,0 +1,43 @@
"""
Talker message queue — holds incoming talker messages in arrival order.
The session loop drains this queue one message at a time.
"""
import asyncio
import logging
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class QueuedMessage:
talker_id: str
talker_name: str
content: str
class MessageQueue:
def __init__(self) -> None:
self._queue: asyncio.Queue[QueuedMessage] = asyncio.Queue()
def enqueue(self, message: QueuedMessage) -> None:
"""Add a talker message to the queue. Non-blocking."""
self._queue.put_nowait(message)
async def dequeue(self) -> QueuedMessage:
"""Wait for and return the next message. Blocks until one is available."""
return await self._queue.get()
def dequeue_nowait(self) -> QueuedMessage | None:
"""Return the next message without waiting, or None if the queue is empty."""
try:
return self._queue.get_nowait()
except asyncio.QueueEmpty:
return None
def is_empty(self) -> bool:
return self._queue.empty()
def size(self) -> int:
return self._queue.qsize()

View File

@@ -0,0 +1,41 @@
"""
History rectifier — manages slot reservation and message insertion ordering.
Ensures a bot's response appears at the correct logical position in history
even when talker messages arrive during LLM generation.
"""
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from fellowship.core.session import Session, Message
logger = logging.getLogger(__name__)
class HistoryRectifier:
def __init__(self, session: "Session") -> None:
self.session = session
def reserve_slot(self, sender: str, turn: int) -> int:
"""
Append a placeholder Message (content=None) to history.
Returns the index of the reserved slot.
Called immediately before an LLM call is dispatched.
"""
raise NotImplementedError
def fill_slot(self, index: int, content: str, tokens: int) -> None:
"""
Fill the reserved slot at the given index with the completed response.
Called when the LLM call returns.
"""
raise NotImplementedError
def insert_after_slot(self, slot_index: int, message: "Message") -> None:
"""
Insert a talker message after the given slot index.
Called when a talker message arrives while a slot is reserved.
Subsequent messages increment their positions accordingly.
"""
raise NotImplementedError

View File

@@ -0,0 +1,76 @@
"""
Session data model — the single source of truth for a session's state.
All other core modules read from and write to a Session instance.
"""
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Optional
class ParticipationMode(str, Enum):
AUTONOMOUS = "autonomous"
REACTIVE = "reactive"
COLLABORATIVE = "collaborative"
class TurnOrder(str, Enum):
ROUND_ROBIN = "round_robin"
ORCHESTRATED = "orchestrated"
class SessionState(str, Enum):
WAITING = "waiting" # Waiting for first talker message (reactive/collaborative)
RUNNING = "running" # Loop is active
PAUSED = "paused" # Paused via API
ENDED = "ended" # Session is over
@dataclass
class BotConfig:
name: str
system_prompt: str
model: Optional[str] = None
temperature: Optional[float] = None
role: Optional[str] = None
@dataclass
class Message:
role: str # "bot", "talker", "system"
sender: str # bot name or talker display name
content: Optional[str] # None while a rectification slot is reserved
turn: int
tokens: Optional[int] = None
@dataclass
class SessionOptions:
participation_mode: ParticipationMode = ParticipationMode.AUTONOMOUS
turn_order: TurnOrder = TurnOrder.ROUND_ROBIN
max_talkers: int = 1
max_turns: Optional[int] = None
max_time: Optional[int] = None
max_context_tokens: Optional[int] = None
rectify_history: bool = True
summarize_context: bool = False
stream_tokens: bool = False
goal: Optional[str] = None
debug: bool = False
llm_base_url: Optional[str] = None
llm_api_key: Optional[str] = None
@dataclass
class Session:
token: str
bots: list[BotConfig]
options: SessionOptions
global_system_prompt: Optional[str] = None
state: SessionState = SessionState.WAITING
history: list[Message] = field(default_factory=list)
turn_count: int = 0
robin_index: int = 0 # Current position in round_robin order
created_at: float = 0.0 # Unix timestamp
talker_count: int = 0
observer_count: int = 0

View File

@@ -0,0 +1,48 @@
"""
Turn engine — constructs a bot's prompt and executes its LLM call.
Handles rectification slot reservation and filling.
"""
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from fellowship.core.session import Session, BotConfig, Message
logger = logging.getLogger(__name__)
class TurnEngine:
def __init__(self, session: "Session") -> None:
self.session = session
async def execute_turn(self, bot: "BotConfig") -> "Message":
"""
Full turn pipeline for one bot:
1. Reserve a rectification slot in history.
2. Assemble the bot's prompt (global system + bot system + context).
3. Call the LLM.
4. Fill the reserved slot with the response.
5. Return the completed Message.
"""
raise NotImplementedError
def _reserve_slot(self, bot: "BotConfig") -> int:
"""
Append a placeholder Message (content=None) to history and return its index.
This is the rectification slot — talker messages arriving during generation
are inserted after this index.
"""
raise NotImplementedError
def _fill_slot(self, index: int, content: str, tokens: int) -> None:
"""Fill a previously reserved slot with the completed bot response."""
raise NotImplementedError
def _build_prompt(self, bot: "BotConfig") -> list[dict]:
"""
Assemble the messages list for the LLM call:
- System message: global_system_prompt + bot system_prompt
- User/assistant messages from context (messages only, no foreign system prompts)
"""
raise NotImplementedError

View File

View File

@@ -0,0 +1,71 @@
"""
Connection hub — manages all WebSocket and SSE connections for a session.
Broadcasts events to every connected member.
Multiple talkers and unlimited observers can be connected simultaneously.
"""
import logging
from dataclasses import dataclass, field
from typing import Literal
from fastapi import WebSocket
from pydantic import BaseModel
logger = logging.getLogger(__name__)
@dataclass
class ConnectedMember:
websocket: WebSocket
role: Literal["talker", "observer"]
talker_id: str | None = None # Only set for talkers
talker_name: str | None = None # Only set for talkers
class ConnectionHub:
def __init__(self, session_token: str) -> None:
self.session_token = session_token
self._members: list[ConnectedMember] = []
async def connect(self, member: ConnectedMember) -> None:
"""Accept and register a new WebSocket connection."""
await member.websocket.accept()
self._members.append(member)
logger.info("[%s] member connected role=%s", self.session_token[:8], member.role)
def disconnect(self, websocket: WebSocket) -> None:
"""Remove a disconnected WebSocket from the member list."""
self._members = [m for m in self._members if m.websocket is not websocket]
async def broadcast(self, event: BaseModel) -> None:
"""Send an event to all connected members."""
raise NotImplementedError
async def send_to(self, websocket: WebSocket, event: BaseModel) -> None:
"""Send an event to a single connection."""
raise NotImplementedError
def talker_count(self) -> int:
return sum(1 for m in self._members if m.role == "talker")
def observer_count(self) -> int:
return sum(1 for m in self._members if m.role == "observer")
def member_count(self) -> int:
return len(self._members)
# Global registry of hubs — one per active session, keyed by session token
_hubs: dict[str, ConnectionHub] = {}
def get_hub(session_token: str) -> ConnectionHub:
"""Get or create the ConnectionHub for a session."""
if session_token not in _hubs:
_hubs[session_token] = ConnectionHub(session_token)
return _hubs[session_token]
def remove_hub(session_token: str) -> None:
"""Remove the hub when a session ends."""
_hubs.pop(session_token, None)

View File

62
fellowship/llm/client.py Normal file
View File

@@ -0,0 +1,62 @@
"""
LLM client — the only module that communicates with the LLM backend.
Uses the OpenAI-compatible chat completions API via httpx.
All other modules call this; nothing else touches the LLM directly.
"""
import logging
from typing import AsyncIterator, Optional
import httpx
from fellowship.config import settings
logger = logging.getLogger(__name__)
# Retry config
MAX_RETRIES = 1
RETRY_DELAY = 2.0 # seconds
class LLMClient:
def __init__(
self,
base_url: Optional[str] = None,
api_key: Optional[str] = None,
) -> None:
self.base_url = (base_url or settings.llm_base_url).rstrip("/")
self.api_key = api_key or settings.llm_api_key
async def chat(
self,
model: str,
messages: list[dict],
temperature: Optional[float] = None,
max_tokens: Optional[int] = None,
tools: Optional[list[dict]] = None,
) -> dict:
"""
Send a chat completion request. Returns the full response dict.
Retries once on failure before raising.
"""
raise NotImplementedError
async def chat_stream(
self,
model: str,
messages: list[dict],
temperature: Optional[float] = None,
max_tokens: Optional[int] = None,
) -> AsyncIterator[str]:
"""
Send a streaming chat completion request.
Yields content tokens as they arrive.
Only used when stream_tokens is enabled.
"""
raise NotImplementedError
def _headers(self) -> dict[str, str]:
return {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}

35
fellowship/logger.py Normal file
View File

@@ -0,0 +1,35 @@
"""
Logging setup for Fellowship.
Call setup_logging() once at startup. All modules use standard logging.getLogger(__name__).
Logs are written to logs/{YYYY-MM-DD}.log and to stdout in debug mode.
"""
import logging
import logging.handlers
import os
from datetime import date
def setup_logging() -> None:
os.makedirs("logs", exist_ok=True)
log_file = f"logs/{date.today().isoformat()}.log"
formatter = logging.Formatter(
fmt="%(asctime)s [%(levelname)s] %(name)s%(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
file_handler = logging.handlers.TimedRotatingFileHandler(
log_file, when="midnight", backupCount=30, encoding="utf-8"
)
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.DEBUG)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
console_handler.setLevel(logging.DEBUG)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(file_handler)
root.addHandler(console_handler)

View File

View File

@@ -0,0 +1,34 @@
"""
Memory store — SQLite-backed persistence for cross-session memory.
Only active when a session is created with memory: new or memory: inherit:<token>.
"""
import logging
from typing import Optional
import aiosqlite
logger = logging.getLogger(__name__)
DB_PATH = "fellowship_memory.db"
class MemoryStore:
def __init__(self, db_path: str = DB_PATH) -> None:
self.db_path = db_path
async def init(self) -> None:
"""Create tables if they don't exist. Call once at startup."""
raise NotImplementedError
async def save(self, session_token: str, memory: str) -> None:
"""Persist a memory string for the given session token."""
raise NotImplementedError
async def load(self, session_token: str) -> Optional[str]:
"""Load the stored memory for the given session token, or None if absent."""
raise NotImplementedError
async def delete(self, session_token: str) -> None:
"""Delete memory for a session."""
raise NotImplementedError

View File

@@ -0,0 +1,51 @@
"""
Session store — in-memory registry of all active sessions.
Keyed by session token. Also holds the associated SessionLoop and MessageQueue per session.
"""
import logging
import secrets
from dataclasses import dataclass, field
from typing import Optional
from fellowship.core.session import Session
from fellowship.core.loop import SessionLoop
from fellowship.core.queue import MessageQueue
logger = logging.getLogger(__name__)
@dataclass
class SessionEntry:
session: Session
loop: SessionLoop
queue: MessageQueue
class SessionStore:
def __init__(self) -> None:
self._sessions: dict[str, SessionEntry] = {}
def create(self, session: Session, loop: SessionLoop, queue: MessageQueue) -> str:
"""Register a new session. Returns the session token."""
self._sessions[session.token] = SessionEntry(session, loop, queue)
return session.token
def get(self, token: str) -> Optional[SessionEntry]:
"""Return the SessionEntry for the given token, or None if not found."""
return self._sessions.get(token)
def remove(self, token: str) -> None:
"""Remove a session from the store."""
self._sessions.pop(token, None)
def generate_token(self) -> str:
"""Generate a cryptographically random session token."""
return secrets.token_urlsafe(32)
def all_tokens(self) -> list[str]:
return list(self._sessions.keys())
# Global singleton — imported by routes and other modules
session_store = SessionStore()