WebSocket Server¶
Documentation for WebSocket endpoints, connection lifecycle, and message handling.
Source: backend/api/routes/websocket.py
Overview¶
The WebSocket server provides real-time bidirectional communication between the backend and frontend clients. It handles connection lifecycle management, message validation, heartbeat mechanisms, and graceful disconnection.
Endpoints¶
Events Endpoint (/ws/events)¶
Streams security event notifications in real-time.
# backend/api/routes/websocket.py:313
@router.websocket("/ws/events")
async def websocket_events_endpoint(
websocket: WebSocket,
redis: RedisClient = Depends(get_redis),
_token_valid: bool = Depends(validate_websocket_token),
) -> None:
Purpose: Delivers security events as they are created by the AI pipeline.
Message Types Received:
event- Security event notificationalert.*- Alert lifecycle eventsdetection.*- Detection eventscamera_status- Camera online/offlinescene_change- View tampering detected
System Endpoint (/ws/system)¶
Streams system health and GPU statistics.
# backend/api/routes/websocket.py:533
@router.websocket("/ws/system")
async def websocket_system_status(
websocket: WebSocket,
redis: RedisClient = Depends(get_redis),
_token_valid: bool = Depends(validate_websocket_token),
) -> None:
Purpose: Provides periodic system status updates including GPU metrics, service health, and worker status.
Update Frequency: Every 5 seconds
Job Logs Endpoint (/ws/jobs/{job_id}/logs)¶
Streams real-time logs for active jobs.
# backend/api/routes/websocket.py:755
@router.websocket("/ws/jobs/{job_id}/logs")
async def websocket_job_logs(
websocket: WebSocket,
job_id: str,
redis: RedisClient = Depends(get_redis),
_token_valid: bool = Depends(validate_websocket_token),
) -> None:
Purpose: Streams log entries for active processing jobs.
Connection Lifecycle¶
stateDiagram-v2
[*] --> Connecting: Client connects
Connecting --> Authenticating: Upgrade accepted
Authenticating --> Active: Auth passed
Authenticating --> Rejected: Auth failed
Rejected --> [*]: Close 1008
Active --> Idle: No messages
Idle --> Active: Message received
Idle --> Timeout: Idle timeout (300s)
Active --> Disconnecting: Client disconnect
Timeout --> Disconnecting: Close 1000
Disconnecting --> [*]: Cleanup complete Connection States¶
- Connecting: WebSocket upgrade request received
- Authenticating: Validating API key or token
- Active: Connected and receiving messages
- Idle: No recent client messages
- Timeout: Idle timeout exceeded
- Disconnecting: Graceful cleanup in progress
Authentication¶
Two authentication methods are supported (both optional, can be used together):
API Key Authentication (when api_key_enabled=true):
Or via Sec-WebSocket-Protocol header:
Token Authentication (when WEBSOCKET_TOKEN is configured):
Connections without valid credentials when auth is enabled receive close code 1008 (Policy Violation).
Heartbeat Mechanism¶
The server sends periodic ping messages to keep connections alive and detect disconnected clients.
# backend/api/routes/websocket.py:267
async def send_heartbeat(
websocket: WebSocket,
interval: int,
stop_event: asyncio.Event,
connection_id: str = "",
) -> None:
Heartbeat Message Format¶
Server sends:
The lastSeq field contains the current sequence number, enabling clients to detect message gaps during idle periods (NEM-3142).
Expected Client Response¶
Configuration¶
| Setting | Default | Description |
|---|---|---|
websocket_ping_interval_seconds | 30 | Heartbeat interval |
websocket_idle_timeout_seconds | 300 | Connection idle timeout |
Message Loop¶
The main message loop handles incoming client messages with validation and routing.
# backend/api/routes/websocket.py:430-490
while True:
try:
data = await asyncio.wait_for(
websocket.receive_text(),
timeout=idle_timeout,
)
# Support legacy plain "ping" string
if data == "ping":
await websocket.send_text('{"type":"pong"}')
continue
# Validate and handle JSON messages
message = await validate_websocket_message(websocket, data)
if message is not None:
await handle_validated_message(websocket, message, connection_id)
except TimeoutError:
await websocket.close(code=1000, reason="Idle timeout")
break
Message Validation¶
All incoming messages are validated against the WebSocketMessage schema:
# backend/api/routes/websocket.py:79
async def validate_websocket_message(
websocket: WebSocket, raw_data: str
) -> WebSocketMessage | None:
Invalid messages receive an error response:
{
"type": "error",
"error": "invalid_json",
"message": "Message must be valid JSON",
"details": { "raw_data_preview": "..." }
}
Supported Message Types¶
# backend/api/schemas/websocket.py:40
class WebSocketMessageType(StrEnum):
PING = auto() # Client heartbeat
PONG = auto() # Response to server ping
SUBSCRIBE = auto() # Subscribe to event patterns
UNSUBSCRIBE = auto() # Unsubscribe from patterns
RESYNC = auto() # Request sequence resync
Message Handling¶
# backend/api/routes/websocket.py:123
async def handle_validated_message(
websocket: WebSocket, message: WebSocketMessage, connection_id: str
) -> None:
Ping/Pong¶
Subscribe¶
// Client sends:
{
"type": "subscribe",
"data": {"events": ["alert.*", "camera.status_changed"]}
}
// Server responds:
{
"action": "subscribed",
"events": ["alert.*", "camera.status_changed"]
}
Unsubscribe¶
// Client sends:
{
"type": "unsubscribe",
"data": {"events": ["alert.*"]}
}
// Server responds:
{
"action": "unsubscribed",
"events": ["alert.*"]
}
Resync¶
Request resync after detecting a sequence gap:
// Client sends:
{
"type": "resync",
"data": {"channel": "events", "last_sequence": 42}
}
// Server responds:
{
"type": "resync_ack",
"channel": "events",
"last_sequence": 42
}
Graceful Disconnect¶
Connection cleanup ensures all resources are properly released:
# backend/api/routes/websocket.py:507-530
finally:
# Stop heartbeat task
heartbeat_stop.set()
if heartbeat_task is not None:
heartbeat_task.cancel()
# Ensure the connection is properly cleaned up
await broadcaster.disconnect(websocket)
# Clean up subscriptions (NEM-2383)
subscription_manager.remove_connection(connection_id)
# Clean up sequence tracker (NEM-3142)
sequence_tracker.remove_connection(connection_id, websocket)
# Clear connection_id context (NEM-1640)
set_connection_id(None)
Cleanup Steps¶
- Signal heartbeat task to stop
- Cancel and await heartbeat task
- Remove from EventBroadcaster connection set
- Remove from SubscriptionManager
- Remove from SequenceTracker
- Clear async context connection ID
Sequence Tracking¶
Messages include sequence numbers for gap detection:
# backend/api/routes/websocket.py:233
async def send_sequenced_message(
websocket: WebSocket,
connection_id: str,
message: dict[str, Any],
) -> bool:
Example message with sequence:
Rate Limiting¶
Connections are rate-limited before acceptance:
# backend/api/routes/websocket.py:383
if not await check_websocket_rate_limit(websocket, redis):
await websocket.close(code=1008) # Policy Violation
return
Error Codes¶
| Code | Name | When Used |
|---|---|---|
| 1000 | Normal Closure | Idle timeout, graceful close |
| 1008 | Policy Violation | Auth failed, rate limited |
| 1011 | Internal Error | Unexpected exception |
Connection ID Generation¶
Each connection gets a unique identifier for tracking and logging:
# backend/api/routes/websocket.py:398
connection_id = f"ws-events-{uuid.uuid4().hex[:8]}"
set_connection_id(connection_id)
Connection ID format:
/ws/events:ws-events-{8 hex chars}/ws/system:ws-system-{8 hex chars}/ws/jobs/{id}/logs:ws-job-logs-{job_id[:8]}-{8 hex chars}
Related Documentation¶
- EventBroadcaster - Redis pub/sub integration
- SubscriptionManager - Event filtering
- Message Formats - Detailed schema documentation