Circuit Breaker Pattern¶
The circuit breaker pattern protects external services from cascading failures by monitoring failure rates and temporarily blocking calls to unhealthy services. This prevents cascade failures where one failing service overwhelms downstream services.
Source: backend/services/circuit_breaker.py
Overview¶
The CircuitBreaker class (backend/services/circuit_breaker.py:270-1016) implements a thread-safe async circuit breaker with three states:
- CLOSED: Normal operation, calls pass through
- OPEN: Service failing, calls rejected immediately with
CircuitBreakerError - HALF_OPEN: Testing recovery, limited calls allowed
Key Features¶
- Configurable failure thresholds and recovery timeouts
- Half-open state for gradual recovery testing
- Excluded exceptions that don't count as failures
- Thread-safe async implementation using
asyncio.Lock - Global registry for managing multiple circuit breakers
- Prometheus metrics integration for monitoring
- OpenTelemetry tracing support
State Diagram¶


%%{init: {
'theme': 'dark',
'themeVariables': {
'primaryColor': '#3B82F6',
'primaryTextColor': '#FFFFFF',
'primaryBorderColor': '#60A5FA',
'secondaryColor': '#A855F7',
'tertiaryColor': '#009688',
'background': '#121212',
'mainBkg': '#1a1a2e',
'lineColor': '#666666'
}
}}%%
stateDiagram-v2
[*] --> CLOSED: Initial State
CLOSED --> OPEN: failures >= threshold
OPEN --> HALF_OPEN: recovery_timeout elapsed
HALF_OPEN --> CLOSED: success_threshold met
HALF_OPEN --> OPEN: any failure occurs
CLOSED: Normal Operation
CLOSED: Calls pass through
CLOSED: Track failures
OPEN: Circuit Tripped
OPEN: Calls rejected immediately
OPEN: CircuitBreakerError raised
HALF_OPEN: Testing Recovery
HALF_OPEN: Limited calls allowed
HALF_OPEN: Track successes Configuration¶
The CircuitBreakerConfig dataclass (backend/services/circuit_breaker.py:126-142) defines circuit breaker behavior:
# backend/services/circuit_breaker.py:126-142
@dataclass(slots=True)
class CircuitBreakerConfig:
"""Configuration for circuit breaker behavior."""
failure_threshold: int = 5
recovery_timeout: float = 30.0
half_open_max_calls: int = 3
success_threshold: int = 2
excluded_exceptions: tuple[type[Exception], ...] = ()
Configuration Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
failure_threshold | int | 5 | Number of consecutive failures before opening circuit |
recovery_timeout | float | 30.0 | Seconds to wait before transitioning from OPEN to HALF_OPEN |
half_open_max_calls | int | 3 | Maximum concurrent calls allowed in HALF_OPEN state |
success_threshold | int | 2 | Consecutive successes needed in HALF_OPEN to close circuit |
excluded_exceptions | tuple | () | Exception types that do not count as failures |
Pre-Registered Circuit Breakers¶
The application pre-registers circuit breakers at startup for all known external services (backend/main.py:266-315):
| Service | Type | Configuration |
|---|---|---|
yolo26 | AI Service | failure_threshold=5, recovery_timeout=30s |
nemotron | AI Service | failure_threshold=5, recovery_timeout=30s |
postgresql | Infrastructure | failure_threshold=10, recovery_timeout=60s |
redis | Infrastructure | failure_threshold=10, recovery_timeout=60s |
AI Service Configuration (quick failure detection):
ai_service_config = CircuitBreakerConfig(
failure_threshold=5, # Open after 5 consecutive failures
recovery_timeout=30.0, # Try recovery after 30 seconds
half_open_max_calls=3, # Allow 3 test calls in half-open
)
Infrastructure Configuration (more tolerant for transient issues):
infrastructure_config = CircuitBreakerConfig(
failure_threshold=10, # Open after 10 consecutive failures
recovery_timeout=60.0, # Try recovery after 60 seconds
half_open_max_calls=5, # Allow 5 test calls in half-open
)
## CircuitState Enum
The `CircuitState` enum (`backend/services/circuit_breaker.py:118-123`) defines the three states:
```python
# backend/services/circuit_breaker.py:118-123
class CircuitState(StrEnum):
"""Circuit breaker states."""
CLOSED = auto()
OPEN = auto()
HALF_OPEN = auto()
Usage Patterns¶
Method 1: Call Wrapper¶
from backend.services.circuit_breaker import CircuitBreaker, CircuitBreakerConfig
config = CircuitBreakerConfig(
failure_threshold=5,
recovery_timeout=30.0,
)
breaker = CircuitBreaker(name="ai_service", config=config)
try:
result = await breaker.call(async_operation, arg1, arg2)
except CircuitBreakerError:
# Handle service unavailable
result = fallback_value
Method 2: Async Context Manager¶
The context manager (backend/services/circuit_breaker.py:939-970) handles:
- State checking on entry
- Call counting in HALF_OPEN state
- Success/failure recording on exit
Method 3: Protected Call¶
# backend/services/circuit_breaker.py:823-862
result = await breaker.protected_call(
lambda: client.fetch_data(),
record_on=(ConnectionError, TimeoutError),
)
Method 4: Protect Context Manager¶
# backend/services/circuit_breaker.py:864-917
try:
async with breaker.protect():
result = await risky_operation()
except CircuitOpenError as e:
# Provides recovery_time_remaining for Retry-After headers
raise HTTPException(
status_code=503,
headers={"Retry-After": str(int(e.recovery_time_remaining))}
)
Global Registry¶
The CircuitBreakerRegistry class (backend/services/circuit_breaker.py:985-1056) manages multiple circuit breakers:
# backend/services/circuit_breaker.py:1071-1084
from backend.services.circuit_breaker import get_circuit_breaker
# Get or create from global registry
breaker = get_circuit_breaker("yolo26", CircuitBreakerConfig(
failure_threshold=5,
recovery_timeout=30.0,
))
# The registry ensures singleton instances per name
another_ref = get_circuit_breaker("yolo26")
assert breaker is another_ref
Registry Methods¶
| Method | Description |
|---|---|
get_or_create(name, config) | Get existing or create new circuit breaker |
get(name) | Get existing circuit breaker (returns None if not found) |
get_all_status() | Get status of all circuit breakers |
reset_all() | Reset all circuit breakers to CLOSED |
list_names() | List all registered circuit breaker names |
Prometheus Metrics¶
The circuit breaker emits metrics for monitoring (backend/services/circuit_breaker.py:64-115):
Legacy Metrics¶
| Metric | Type | Labels | Description |
|---|---|---|---|
circuit_breaker_state | Gauge | service | Current state (0=closed, 1=open, 2=half_open) |
circuit_breaker_failures_total | Counter | service | Total failures recorded |
circuit_breaker_state_changes_total | Counter | service, from_state, to_state | State transitions |
circuit_breaker_calls_total | Counter | service, result | Total calls (success/failure) |
circuit_breaker_rejected_total | Counter | service | Calls rejected by open circuit |
HSI-Prefixed Metrics (Grafana Dashboard)¶
| Metric | Type | Labels | Description |
|---|---|---|---|
hsi_circuit_breaker_state | Gauge | service | Current state for Grafana |
hsi_circuit_breaker_trips_total | Counter | service | Times circuit has tripped open |
State Transitions¶
CLOSED to OPEN¶
Triggered when failure_count >= failure_threshold (backend/services/circuit_breaker.py:535-568):
# backend/services/circuit_breaker.py:535-568
def _transition_to_open(self) -> None:
"""Transition circuit to OPEN state."""
prev_state = self._state
self._state = CircuitState.OPEN
self._opened_at = time.monotonic()
self._success_count = 0
self._half_open_calls = 0
self._last_state_change = datetime.now(UTC)
# Update Prometheus metrics
CIRCUIT_BREAKER_STATE.labels(service=self._name).set(1)
HSI_CIRCUIT_BREAKER_STATE.labels(service=self._name).set(1)
HSI_CIRCUIT_BREAKER_TRIPS_TOTAL.labels(service=self._name).inc()
OPEN to HALF_OPEN¶
Triggered when recovery_timeout elapses (backend/services/circuit_breaker.py:528-533, 570-598):
# backend/services/circuit_breaker.py:528-533
def _should_attempt_recovery(self) -> bool:
"""Check if recovery timeout has elapsed."""
if self._opened_at is None:
return False
elapsed = time.monotonic() - self._opened_at
return elapsed >= self._config.recovery_timeout
HALF_OPEN to CLOSED¶
Triggered when success_count >= success_threshold (backend/services/circuit_breaker.py:600-628):
# backend/services/circuit_breaker.py:600-628
def _transition_to_closed(self) -> None:
"""Transition circuit to CLOSED state."""
self._state = CircuitState.CLOSED
self._failure_count = 0
self._success_count = 0
self._opened_at = None
self._half_open_calls = 0
self._last_state_change = datetime.now(UTC)
CIRCUIT_BREAKER_STATE.labels(service=self._name).set(0)
HSI_CIRCUIT_BREAKER_STATE.labels(service=self._name).set(0)
HALF_OPEN to OPEN¶
Triggered on any failure during HALF_OPEN (backend/services/circuit_breaker.py:519-521):
# backend/services/circuit_breaker.py:519-521
if self._state == CircuitState.HALF_OPEN:
# Any failure in half-open reopens the circuit
self._transition_to_open()
Exception Classes¶
CircuitBreakerError¶
Primary exception raised when circuit is open (backend/services/circuit_breaker.py:187-228):
# backend/services/circuit_breaker.py:187-228
class CircuitBreakerError(Exception):
"""Exception raised when circuit breaker is open."""
def __init__(
self,
service_name: str,
state: str | CircuitState | None = None,
*,
recovery_timeout: float | None = None,
) -> None:
self.service_name = service_name
self.name = service_name # Alias for compatibility
self.recovery_timeout = recovery_timeout
CircuitOpenError¶
Exception with recovery timing information (backend/services/circuit_breaker.py:234-255):
# backend/services/circuit_breaker.py:234-255
class CircuitOpenError(Exception):
"""Raised when circuit breaker prevents call proactively."""
def __init__(self, service_name: str, recovery_time_remaining: float) -> None:
self.service_name = service_name
self.recovery_time_remaining = recovery_time_remaining
Status and Metrics Methods¶
get_status()¶
Returns dictionary with current state (backend/services/circuit_breaker.py:661-682):
status = breaker.get_status()
# Returns:
# {
# "name": "yolo26",
# "state": "closed",
# "failure_count": 0,
# "success_count": 0,
# "total_calls": 150,
# "rejected_calls": 0,
# "last_failure_time": None,
# "opened_at": None,
# "config": {
# "failure_threshold": 5,
# "recovery_timeout": 30.0,
# "half_open_max_calls": 3,
# "success_threshold": 2,
# }
# }
get_metrics()¶
Returns CircuitBreakerMetrics dataclass (backend/services/circuit_breaker.py:684-708):
# backend/services/circuit_breaker.py:145-184
@dataclass(slots=True)
class CircuitBreakerMetrics:
"""Metrics for circuit breaker monitoring."""
name: str
state: CircuitState
failure_count: int = 0
success_count: int = 0
total_calls: int = 0
rejected_calls: int = 0
last_failure_time: datetime | None = None
last_state_change: datetime | None = None
Manual Control¶
reset()¶
Manually reset to CLOSED state (backend/services/circuit_breaker.py:630-651):
force_open()¶
Force circuit to OPEN state for maintenance (backend/services/circuit_breaker.py:653-659):
Thread Safety¶
The circuit breaker uses asyncio.Lock for thread-safe state management (backend/services/circuit_breaker.py:319):
Async methods acquire the lock before state changes:
allow_call()-backend/services/circuit_breaker.py:389-400_record_success()-backend/services/circuit_breaker.py:478-499_record_failure()-backend/services/circuit_breaker.py:501-526
Sync methods (for compatibility) operate without locks:
record_success()-backend/services/circuit_breaker.py:749-763record_failure()-backend/services/circuit_breaker.py:769-786
Best Practices¶
- Name circuit breakers descriptively: Use service names like "yolo26", "nemotron", "redis"
- Use the global registry: Ensures singleton instances and centralized management
- Configure excluded exceptions: Don't count validation errors as circuit breaker failures
- Monitor metrics: Set up Grafana alerts on
hsi_circuit_breaker_statechanges - Implement fallbacks: Always have a degraded response when circuit is open
Integration with Other Patterns¶
Circuit Breaker + Retry Pattern¶
The circuit breaker works with the retry handler (backend/services/retry_handler.py) to provide comprehensive failure handling:
from backend.services.circuit_breaker import get_circuit_breaker
from backend.services.retry_handler import with_retry
breaker = get_circuit_breaker("ai_service")
@with_retry(max_attempts=3, backoff_factor=2.0)
async def call_ai_service():
async with breaker:
return await ai_client.analyze(data)
Flow:
- Retry handler attempts the operation
- Circuit breaker checks if service is healthy
- If circuit is open, immediately fails (no retry)
- If circuit is closed, executes operation
- Success/failure recorded for circuit state management
Circuit Breaker + Graceful Degradation¶
When circuit is open, use fallback responses:
from backend.services.circuit_breaker import CircuitBreakerError
async def get_risk_analysis(detection):
try:
async with nemotron_breaker:
return await nemotron_client.analyze(detection)
except CircuitBreakerError:
# Fallback: use rule-based risk scoring
return await rule_based_scorer.score(detection)
WebSocket Circuit Breaker¶
A specialized circuit breaker exists for WebSocket connections (backend/core/websocket_circuit_breaker.py):
from backend.core.websocket_circuit_breaker import WebSocketCircuitBreaker
ws_breaker = WebSocketCircuitBreaker(
name="event_ws",
max_failures=3,
recovery_timeout=10.0,
)
async with ws_breaker.protect():
await websocket.send_json(event)
Observability¶
Grafana Dashboard Queries¶
Monitor circuit breaker state:
# Current state (0=closed, 1=open, 2=half_open)
hsi_circuit_breaker_state{service="yolo26"}
# Trip rate (circuit opening events)
rate(hsi_circuit_breaker_trips_total[5m])
# Rejection rate (calls blocked by open circuit)
rate(circuit_breaker_rejected_total[5m])
Alerting Rules¶
# Alert when circuit opens
- alert: CircuitBreakerOpen
expr: hsi_circuit_breaker_state > 0
for: 1m
labels:
severity: warning
annotations:
summary: 'Circuit breaker {{ $labels.service }} is open'
# Alert on high failure rate
- alert: CircuitBreakerHighFailureRate
expr: rate(circuit_breaker_failures_total[5m]) > 0.1
for: 5m
labels:
severity: warning
Testing Circuit Breakers¶
Unit Test Example¶
import pytest
from backend.services.circuit_breaker import (
CircuitBreaker,
CircuitBreakerConfig,
CircuitState,
reset_circuit_breaker_registry,
)
@pytest.fixture(autouse=True)
def reset_registry():
reset_circuit_breaker_registry()
yield
reset_circuit_breaker_registry()
async def test_circuit_opens_after_threshold():
config = CircuitBreakerConfig(failure_threshold=3)
breaker = CircuitBreaker(name="test", config=config)
# Record failures
for _ in range(3):
breaker.record_failure()
assert breaker.state == CircuitState.OPEN
async def test_circuit_recovers():
config = CircuitBreakerConfig(
failure_threshold=3,
recovery_timeout=0.1, # 100ms for testing
success_threshold=2,
)
breaker = CircuitBreaker(name="test", config=config)
# Open the circuit
for _ in range(3):
breaker.record_failure()
assert breaker.state == CircuitState.OPEN
# Wait for recovery timeout
await asyncio.sleep(0.15)
# Trigger transition to half-open
assert breaker.allow_request() is True
assert breaker.state == CircuitState.HALF_OPEN
# Record successes to close
breaker.record_success()
breaker.record_success()
assert breaker.state == CircuitState.CLOSED
Related Documentation¶
- Retry Handler - Works with circuit breakers for retry logic
- Graceful Degradation - Fallback strategies when circuit is open
- Health Monitoring - Service health checks that feed circuit breaker state
- Dead Letter Queue - Failed messages when circuit is open
Source: NEM-3458 - Circuit Breaker Documentation, NEM-4119 - Circuit Breaker Pattern Documentation