Code Patterns and Conventions¶
This document covers key patterns used throughout the codebase. Understanding these patterns will help you write consistent, maintainable code.
Backend Patterns¶
Settings Management¶
The application uses a cached singleton pattern for settings (backend/core/config.py):
from functools import lru_cache
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
database_url: str
redis_url: str = "redis://localhost:6379/0"
debug: bool = False
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=False,
)
@lru_cache
def get_settings() -> Settings:
"""Return cached settings singleton."""
return Settings()
Usage:
from backend.core import get_settings
settings = get_settings() # Cached, same instance every time
print(settings.database_url)
Testing pattern - clear cache:
from backend.core.config import get_settings
def test_something():
get_settings.cache_clear() # Force reload
# ... test code ...
Database Sessions¶
Dependency Injection (API Routes)¶
Use FastAPI's dependency injection for API routes (backend/core/database.py):
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from backend.core.database import get_db
@router.get("/cameras")
async def list_cameras(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Camera))
return result.scalars().all()
Context Manager (Services)¶
Use context managers in services for proper cleanup:
from backend.core.database import get_session
async def process_detections():
async with get_session() as session:
result = await session.execute(select(Detection))
detections = result.scalars().all()
# Auto-commit on success, rollback on exception
Async/Await Patterns¶
All I/O operations must be async:
# Database operations
async with get_session() as session:
result = await session.execute(query)
await session.commit()
# Redis operations
await redis.set("key", value)
result = await redis.get("key")
# HTTP operations
async with httpx.AsyncClient() as client:
response = await client.post(url, json=data)
# File watching (asyncio for debouncing)
async def debounced_handler():
await asyncio.sleep(0.1)
await process_file()
Concurrent operations:
import asyncio
# Run multiple independent operations concurrently
results = await asyncio.gather(
fetch_cameras(session),
fetch_events(session),
fetch_detections(session),
)
Error Handling¶
Service Layer¶
from backend.core.logging import get_logger
logger = get_logger(__name__)
async def detect_objects(image_path: str) -> list[Detection]:
try:
async with httpx.AsyncClient() as client:
response = await client.post(url, json={"image": image_path})
response.raise_for_status()
return parse_detections(response.json())
except httpx.HTTPStatusError as e:
logger.error("Detection failed", extra={
"status_code": e.response.status_code,
"image_path": image_path,
})
return [] # Graceful degradation
except Exception as e:
logger.exception("Unexpected error during detection")
raise
API Routes¶
from fastapi import HTTPException, status
@router.get("/cameras/{camera_id}")
async def get_camera(camera_id: str, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Camera).where(Camera.id == camera_id))
camera = result.scalar_one_or_none()
if camera is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Camera {camera_id} not found"
)
return camera
Type Hints¶
All public functions must have type hints:
from typing import Optional
from datetime import datetime
async def get_events(
camera_id: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
limit: int = 100,
offset: int = 0,
) -> list[Event]:
"""Fetch events with optional filtering."""
...
SQLAlchemy 2.0 Mapped types:
from sqlalchemy import String, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column, relationship
class Camera(Base):
__tablename__ = "cameras"
id: Mapped[str] = mapped_column(String(50), primary_key=True)
name: Mapped[str] = mapped_column(String(100))
status: Mapped[str] = mapped_column(String(20), default="online")
detections: Mapped[list["Detection"]] = relationship(back_populates="camera")
Redis Patterns¶
Queue Operations with Backpressure¶
from backend.core.redis import RedisClient, OverflowPolicy
async def add_detection_to_queue(redis: RedisClient, detection: dict):
success = await redis.add_to_queue_safe(
queue_name="detection_queue",
data=detection,
max_size=1000,
overflow_policy=OverflowPolicy.DLQ, # Send to dead-letter queue on overflow
)
if not success:
logger.warning("Queue full, detection sent to DLQ")
Pub/Sub Pattern¶
# Publisher
await redis.publish("events", {"type": "new_event", "event_id": event.id})
# Subscriber
async for message in redis.listen("events"):
await handle_event(message)
Frontend Patterns¶
Functional Components¶
All components use functional components with hooks:
import { useState, useEffect } from 'react';
import { Camera } from '../types/api';
import { api } from '../services/api';
interface CameraCardProps {
camera: Camera;
onSelect: (camera: Camera) => void;
}
export function CameraCard({ camera, onSelect }: CameraCardProps) {
const [status, setStatus] = useState(camera.status);
useEffect(() => {
// Cleanup on unmount
return () => {
// Cancel subscriptions
};
}, [camera.id]);
return (
<div onClick={() => onSelect(camera)}>
<h3>{camera.name}</h3>
<span>{status}</span>
</div>
);
}
Custom Hooks¶
Extract reusable logic into custom hooks:
// useWebSocket.ts
export function useWebSocket(channel: string) {
const [messages, setMessages] = useState<Message[]>([]);
const [connected, setConnected] = useState(false);
useEffect(() => {
const ws = new WebSocket(`ws://localhost:8000/ws/${channel}`);
ws.onopen = () => setConnected(true);
ws.onclose = () => setConnected(false);
ws.onmessage = (event) => {
setMessages((prev) => [...prev, JSON.parse(event.data)]);
};
return () => ws.close();
}, [channel]);
return { messages, connected };
}
// Usage
function EventFeed() {
const { messages, connected } = useWebSocket('events');
// ...
}
Tailwind CSS Patterns¶
Use design system classes consistently:
// Use semantic color classes
<div className="bg-background-primary text-text-primary">
<button className="bg-nvidia-green hover:bg-nvidia-green-dark">
Action
</button>
</div>
// Responsive design
<div className="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-3 gap-4">
{/* Grid items */}
</div>
// Component composition
<Card>
<CardHeader>Title</CardHeader>
<CardContent>Content</CardContent>
</Card>
Testing Patterns¶
Mocking at Boundaries¶
Mock at the boundary, not internal functions:
# GOOD: Mock at boundary
with patch("httpx.AsyncClient") as mock_http:
mock_response = MagicMock()
mock_response.status_code = 200
mock_client = AsyncMock()
mock_client.post = AsyncMock(return_value=mock_response)
mock_http.return_value.__aenter__.return_value = mock_client
# Test code...
# BAD: Mock internal function (tests implementation, not behavior)
with patch("backend.services.detector_client._parse_response"):
# Too granular
Redis Mocking Pattern¶
@pytest.fixture
def mock_redis_client():
mock_client = AsyncMock(spec=RedisClient)
mock_client.get = AsyncMock(return_value=None)
mock_client.set = AsyncMock(return_value=True)
mock_client.add_to_queue = AsyncMock()
mock_client.publish = AsyncMock()
mock_client.health_check = AsyncMock(return_value={"status": "healthy"})
return mock_client
HTTP Client Mocking Pattern¶
@pytest.fixture
def mock_http_client():
with patch("httpx.AsyncClient") as mock_http:
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"result": "success"}
mock_client = AsyncMock()
mock_client.post = AsyncMock(return_value=mock_response)
mock_client.get = AsyncMock(return_value=mock_response)
mock_http.return_value.__aenter__.return_value = mock_client
yield mock_client
Database Session Mocking¶
@pytest.fixture
def mock_session():
session = AsyncMock()
session.add = MagicMock()
session.commit = AsyncMock()
session.flush = AsyncMock()
session.refresh = AsyncMock()
session.execute = AsyncMock()
return session
Integration Test Patterns¶
@pytest.mark.asyncio
async def test_full_workflow(client, integration_db):
"""Test complete camera -> detection -> event workflow."""
# Create camera
response = await client.post("/api/cameras", json={
"id": "test_cam",
"name": "Test Camera"
})
assert response.status_code == 201
# Trigger detection
response = await client.post("/api/detections", json={
"camera_id": "test_cam",
"image_path": "/path/to/image.jpg",
"objects": [{"class": "person", "confidence": 0.95}]
})
assert response.status_code == 201
# Verify event created
response = await client.get("/api/events?camera_id=test_cam")
assert response.status_code == 200
assert len(response.json()["events"]) > 0
Parallel Test Isolation¶
Use unique_id() for test data to avoid conflicts:
from backend.tests.conftest import unique_id
@pytest.mark.asyncio
async def test_camera_creation(isolated_db):
camera_id = unique_id("camera") # e.g., "camera_abc12345"
async with get_session() as session:
camera = Camera(id=camera_id, name="Test Camera")
session.add(camera)
await session.commit()
# Query uses unique ID, no conflicts with parallel tests
result = await session.execute(
select(Camera).where(Camera.id == camera_id)
)
assert result.scalar_one().name == "Test Camera"
Logging Patterns¶
Structured Logging¶
from backend.core.logging import get_logger
logger = get_logger(__name__)
# Include structured context
logger.info("Processing detection", extra={
"camera_id": camera.id,
"detection_id": detection.id,
"confidence": detection.confidence,
"duration_ms": elapsed_ms,
})
# Error with exception
try:
await process()
except Exception:
logger.exception("Processing failed", extra={
"camera_id": camera.id,
})
Request ID Propagation¶
Request IDs are automatically propagated via middleware:
# In API routes, request_id is in request state
@router.get("/cameras")
async def list_cameras(request: Request):
request_id = request.state.request_id
logger.info("Listing cameras", extra={"request_id": request_id})
Service Architecture Patterns¶
Worker Pattern¶
Background workers follow this pattern:
class DetectionQueueWorker:
def __init__(self, redis: RedisClient):
self.redis = redis
self._running = False
self._task: Optional[asyncio.Task] = None
async def start(self):
self._running = True
self._task = asyncio.create_task(self._run())
async def stop(self):
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
async def _run(self):
while self._running:
try:
item = await self.redis.pop_from_queue("detection_queue", timeout=1)
if item:
await self._process(item)
except asyncio.CancelledError:
break
except Exception:
logger.exception("Worker error")
await asyncio.sleep(1) # Back off on error
Circuit Breaker Pattern¶
from backend.services.circuit_breaker import CircuitBreaker
breaker = CircuitBreaker(
failure_threshold=5,
recovery_timeout=30,
half_open_max_calls=3,
)
async def call_external_service():
async with breaker:
return await external_api.call()
Retry Pattern¶
from backend.services.retry_handler import RetryHandler
retry = RetryHandler(
max_retries=3,
base_delay=1.0,
max_delay=30.0,
exponential_base=2,
)
async def unreliable_operation():
return await retry.execute(
operation=lambda: external_api.call(),
on_retry=lambda attempt, error: logger.warning(f"Retry {attempt}: {error}"),
)
Related Documentation¶
- Setup Guide - Development environment setup
- Testing Guide - Test strategy and running tests
- Contributing Guide - PR process and code standards
- Backend AGENTS.md - Backend architecture
- Services AGENTS.md - Service layer patterns