Graceful Degradation¶
The DegradationManager provides graceful degradation capabilities when dependent services become unavailable, allowing the system to continue operating in a degraded mode.
Source: backend/services/degradation_manager.py
Overview¶
The DegradationManager class (backend/services/degradation_manager.py:347-1169) manages:
- Service health state tracking
- Degradation mode transitions
- Fallback queue strategies (memory and disk)
- Automatic recovery detection
- Job queueing for later processing
Degradation Modes¶
Defined in backend/services/degradation_manager.py:68-74:
# backend/services/degradation_manager.py:68-74
class DegradationMode(Enum):
"""System degradation modes."""
NORMAL = "normal"
DEGRADED = "degraded"
MINIMAL = "minimal"
OFFLINE = "offline"
Mode Descriptions¶
| Mode | Description | Available Features |
|---|---|---|
| NORMAL | All services healthy, full functionality | detection, analysis, events, media |
| DEGRADED | Some services unavailable, using fallbacks | events, media (read-only) |
| MINIMAL | Critical services down, basic functionality only | media |
| OFFLINE | All services down, queueing only | none (jobs queued) |
Configuration¶
The manager constructor (backend/services/degradation_manager.py:374-421):
# backend/services/degradation_manager.py:374-421
def __init__(
self,
redis_client: Any | None = None,
fallback_dir: str | None = None,
failure_threshold: int = 3,
recovery_threshold: int = 2,
max_memory_queue_size: int = 1000,
check_interval: float = 15.0,
health_check_timeout: float = DEFAULT_HEALTH_CHECK_TIMEOUT, # 10.0s
) -> None:
Configuration Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
redis_client | Any | None | Redis client for job queueing |
fallback_dir | str | ~/.cache/hsi_fallback | Directory for disk-based fallback queues |
failure_threshold | int | 3 | Consecutive failures before marking unhealthy |
recovery_threshold | int | 2 | Consecutive successes to confirm recovery |
max_memory_queue_size | int | 1000 | Max jobs in memory queue |
check_interval | float | 15.0 | Seconds between health checks |
health_check_timeout | float | 10.0 | Timeout for individual health checks |
Service Health Tracking¶
ServiceHealth Dataclass¶
Defined in backend/services/degradation_manager.py:85-119:
# backend/services/degradation_manager.py:85-119
@dataclass(slots=True)
class ServiceHealth:
"""Health information for a monitored service."""
name: str
status: DegradationServiceStatus = DegradationServiceStatus.UNKNOWN
last_check: float | None = None
last_success: float | None = None
consecutive_failures: int = 0
error_message: str | None = None
@property
def is_healthy(self) -> bool:
return self.status == DegradationServiceStatus.HEALTHY
DegradationServiceStatus Enum¶
Defined in backend/services/degradation_manager.py:77-82:
# backend/services/degradation_manager.py:77-82
class DegradationServiceStatus(Enum):
"""Service health status for degradation manager monitoring."""
HEALTHY = "healthy"
UNHEALTHY = "unhealthy"
UNKNOWN = "unknown"
Registering Services¶
Register services for monitoring (backend/services/degradation_manager.py:438-456):
# backend/services/degradation_manager.py:438-456
def register_service(
self,
name: str,
health_check: Callable[[], Any],
critical: bool = False,
) -> None:
"""Register a service for health monitoring."""
Usage:
manager = get_degradation_manager()
# Register AI services
manager.register_service(
name="yolo26",
health_check=detector_client.health_check,
critical=True, # System cannot function without this
)
manager.register_service(
name="nemotron",
health_check=llm_client.health_check,
critical=False, # Can use fallback risk scoring
)
Mode Transitions¶
Transition Logic¶
The _evaluate_mode_transition() method (backend/services/degradation_manager.py:525-549):
# backend/services/degradation_manager.py:525-549
async def _evaluate_mode_transition(self) -> None:
"""Evaluate and perform mode transitions based on service health."""
critical_unhealthy = 0
total_unhealthy = 0
for service in self._services.values():
if service.health.consecutive_failures >= self.failure_threshold:
total_unhealthy += 1
if service.critical:
critical_unhealthy += 1
# Determine new mode
if critical_unhealthy == 0 and total_unhealthy == 0:
self._mode = DegradationMode.NORMAL
elif critical_unhealthy == 0:
self._mode = DegradationMode.DEGRADED
elif critical_unhealthy < len(critical_services):
self._mode = DegradationMode.MINIMAL
else:
self._mode = DegradationMode.OFFLINE
State Transition Diagram¶
%%{init: {
'theme': 'dark',
'themeVariables': {
'primaryColor': '#3B82F6',
'primaryTextColor': '#FFFFFF',
'primaryBorderColor': '#60A5FA',
'secondaryColor': '#A855F7',
'tertiaryColor': '#009688',
'background': '#121212',
'mainBkg': '#1a1a2e',
'lineColor': '#666666'
}
}}%%
stateDiagram-v2
[*] --> NORMAL
NORMAL --> DEGRADED: non-critical service fails
DEGRADED --> MINIMAL: critical service fails
MINIMAL --> OFFLINE: all critical services fail
DEGRADED --> NORMAL: services recover
MINIMAL --> DEGRADED: critical service recovers
OFFLINE --> MINIMAL: some service recovers
NORMAL: All Services OK
NORMAL: Full functionality
DEGRADED: Using Fallbacks
DEGRADED: events, media (read-only)
MINIMAL: Basic Only
MINIMAL: media serving only
OFFLINE: Queueing Only
OFFLINE: Jobs queued for later Fallback Queues¶
In-Memory Queue¶
When Redis is unavailable, jobs are queued in memory (backend/services/degradation_manager.py:663-701):
# backend/services/degradation_manager.py:663-701
def _queue_to_memory(self, job: QueuedJob) -> bool:
"""Queue job to in-memory fallback queue."""
try:
queue_size_before = len(self._memory_queue)
queue_at_capacity = queue_size_before >= self.max_memory_queue_size
if queue_at_capacity:
logger.warning(
f"Memory queue overflow: dropping oldest job "
f"(queue_size={queue_size_before}, max_size={self.max_memory_queue_size})"
)
self._memory_queue.append(job)
return True
except Exception as e:
logger.error(f"Failed to queue to memory: {e}")
return False
Disk-Based Fallback Queue¶
The FallbackQueue class (backend/services/degradation_manager.py:180-344) provides persistent storage:
# backend/services/degradation_manager.py:180-344
class FallbackQueue:
"""Disk-based fallback queue for when Redis is unavailable."""
def __init__(
self,
queue_name: str,
fallback_dir: str,
max_size: int = 10000,
):
self._queue_name = queue_name
self._fallback_dir = Path(fallback_dir) / queue_name
self._max_size = max_size
self._fallback_dir.mkdir(parents=True, exist_ok=True)
Methods:
| Method | Line | Description |
|---|---|---|
add(item) | 232-282 | Add item to disk queue |
get() | 284-318 | Get oldest item (removes from queue) |
peek(limit) | 320-344 | Peek at items without removing |
count() | 229-230 | Count items in queue |
Queue with Fallback¶
The queue_with_fallback() method (backend/services/degradation_manager.py:727-781):
# backend/services/degradation_manager.py:727-781
async def queue_with_fallback(
self,
queue_name: str,
item: dict[str, Any],
) -> bool:
"""Queue an item with automatic fallback to disk.
Attempts to queue to Redis first. If Redis is unavailable,
falls back to disk-based queue.
"""
# Try Redis first if available
if self._redis_healthy and self._redis is not None:
try:
result = await self._redis.add_to_queue_safe(
queue_name, item,
overflow_policy=QueueOverflowPolicy.DLQ,
)
if result.success:
return True
except Exception:
self._redis_healthy = False
# Fallback to disk
fallback = self._get_fallback_queue(queue_name)
return await fallback.add(item)
Job Queueing¶
QueuedJob Dataclass¶
Defined in backend/services/degradation_manager.py:122-155:
# backend/services/degradation_manager.py:122-155
@dataclass(slots=True)
class QueuedJob:
"""A job queued for later processing."""
job_type: str
data: dict[str, Any]
queued_at: str
retry_count: int = 0
Queueing for Later¶
The queue_job_for_later() method (backend/services/degradation_manager.py:607-661):
# backend/services/degradation_manager.py:607-661
async def queue_job_for_later(
self,
job_type: str,
data: dict[str, Any],
) -> bool:
"""Queue a job for later processing."""
job = QueuedJob(
job_type=job_type,
data=data,
queued_at=datetime.now(UTC).isoformat(),
retry_count=0,
)
# Try Redis first
if self._redis and self._redis_healthy:
try:
result = await self._redis.add_to_queue_safe(
self.DEGRADED_QUEUE,
job.to_dict(),
overflow_policy=QueueOverflowPolicy.DLQ,
)
if result.success:
return True
except Exception:
self._redis_healthy = False
# Fall back to in-memory queue
return self._queue_to_memory(job)
Processing Queued Jobs¶
The process_queued_jobs() method (backend/services/degradation_manager.py:873-946):
# Example usage
processed = await manager.process_queued_jobs(
job_type="detection",
processor=process_detection,
max_jobs=100,
)
print(f"Processed {processed} queued jobs")
Health Check Loop¶
The health check loop (backend/services/degradation_manager.py:1088-1112):
# backend/services/degradation_manager.py:1088-1112
async def _health_check_loop(self) -> None:
"""Main health check loop."""
while self._running:
try:
# Run health checks for all registered services
await self.run_health_checks()
# Check Redis health
await self.check_redis_health()
# Drain memory queue to Redis if available
if self._redis_healthy:
await self.drain_memory_queue_to_redis()
await asyncio.sleep(self._check_interval)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Health check loop error: {e}")
await asyncio.sleep(self._check_interval)
Health Check Timeout¶
Each health check has a configurable timeout (backend/services/degradation_manager.py:551-585):
# backend/services/degradation_manager.py:551-585
async def run_health_checks(self) -> None:
"""Run health checks for all registered services."""
for service in self._services.values():
try:
is_healthy = await asyncio.wait_for(
service.health_check(),
timeout=self._health_check_timeout, # Default: 10.0s
)
await self.update_service_health(service.name, is_healthy=is_healthy)
except TimeoutError:
await self.update_service_health(
service.name,
is_healthy=False,
error_message=f"Health check timed out after {self._health_check_timeout}s",
)
Recovery Handling¶
Draining Memory Queue¶
When Redis recovers, drain memory queue (backend/services/degradation_manager.py:977-1024):
# backend/services/degradation_manager.py:977-1024
async def drain_memory_queue_to_redis(self) -> int:
"""Drain in-memory queue to Redis when available."""
if not self._redis or not self._redis_healthy:
return 0
drained = 0
while self._memory_queue:
job = self._memory_queue.popleft()
try:
result = await self._redis.add_to_queue_safe(
self.DEGRADED_QUEUE,
job.to_dict(),
overflow_policy=QueueOverflowPolicy.DLQ,
)
if result.success:
drained += 1
else:
self._memory_queue.appendleft(job)
break
except Exception:
self._memory_queue.appendleft(job)
break
return drained
Draining Fallback Queue¶
Drain disk queue to Redis (backend/services/degradation_manager.py:783-846):
# Example
drained = await manager.drain_fallback_queue("detection_queue")
print(f"Drained {drained} items from fallback to Redis")
Status and Monitoring¶
get_status()¶
Get overall status (backend/services/degradation_manager.py:1114-1133):
# backend/services/degradation_manager.py:1114-1133
def get_status(self) -> dict[str, Any]:
"""Get overall degradation status."""
return {
"mode": self._mode.value,
"is_degraded": self.is_degraded,
"redis_healthy": self._redis_healthy,
"memory_queue_size": len(self._memory_queue),
"fallback_queues": {
name: queue.count()
for name, queue in self._fallback_queues.items()
},
"services": {
name: service.health.to_dict()
for name, service in self._services.items()
},
"available_features": self.get_available_features(),
"health_check_timeout": self._health_check_timeout,
}
Available Features by Mode¶
The get_available_features() method (backend/services/degradation_manager.py:1034-1049):
# backend/services/degradation_manager.py:1034-1049
def get_available_features(self) -> list[str]:
"""Get list of available features based on current mode."""
all_features = ["detection", "analysis", "events", "media"]
if self._mode == DegradationMode.NORMAL:
return all_features
elif self._mode == DegradationMode.DEGRADED:
return ["events", "media"] # Read-only features
elif self._mode == DegradationMode.MINIMAL:
return ["media"] # Basic media serving only
else:
return []
Global Manager Access¶
# backend/services/degradation_manager.py:1140-1163
from backend.services.degradation_manager import get_degradation_manager
# Get or create global instance
manager = get_degradation_manager(
redis_client=redis,
fallback_dir="/var/lib/hsi/fallback",
)
# Start health monitoring
await manager.start()
# Use in request handlers
if manager.should_queue_job("detection"):
await manager.queue_job_for_later("detection", job_data)
else:
await process_detection(job_data)
# Stop gracefully
await manager.stop()
Best Practices¶
- Register critical services: Mark essential services as
critical=True - Set appropriate timeouts: Health check timeout should be less than check interval
- Monitor fallback queues: Large disk queues indicate prolonged outages
- Test fallback paths: Verify disk fallback works before production
- Plan for recovery: Have procedures to drain queues after recovery
Related Documentation¶
- Circuit Breaker - Integration with circuit breaker pattern
- Retry Handler - Retry logic coordination
- Health Monitoring - Automatic service recovery
Source: NEM-3458 - Graceful Degradation Documentation