Dead-Letter Queue (DLQ) Management¶
The dead-letter queue system captures failed jobs that have exhausted all retry attempts, providing inspection, reprocessing, and management capabilities.
Source: backend/services/retry_handler.py, backend/api/routes/dlq.py, backend/core/constants.py
Overview¶
Jobs that fail after all retry attempts are moved to dead-letter queues for:
- Inspection: Examine failure reasons with enriched error context
- Debugging: Access stack traces, HTTP statuses, and system state
- Recovery: Requeue jobs for reprocessing after fixes
- Cleanup: Clear queues when issues are unrecoverable
DLQ Architecture¶
%%{init: {
'theme': 'dark',
'themeVariables': {
'primaryColor': '#3B82F6',
'primaryTextColor': '#FFFFFF',
'primaryBorderColor': '#60A5FA',
'secondaryColor': '#A855F7',
'tertiaryColor': '#009688',
'background': '#121212',
'mainBkg': '#1a1a2e',
'lineColor': '#666666'
}
}}%%
flowchart TB
subgraph Processing["Processing Queues"]
DQ[detection_queue]
AQ[analysis_queue]
end
subgraph DLQs["Dead-Letter Queues"]
DLQ1[dlq:detection_queue]
DLQ2[dlq:analysis_queue]
end
DQ -->|fail| DLQ1
AQ -->|fail| DLQ2
DLQ1 --> API
DLQ2 --> API
subgraph API["DLQ Management API<br/>/api/dlq/*"]
STATS[Stats]
LIST[List]
REQUEUE[Requeue]
CLEAR[Clear]
REQUEUE_ALL[Requeue-all]
end
style DLQ1 fill:#E74856,color:#fff
style DLQ2 fill:#E74856,color:#fff
style API fill:#3B82F6,color:#fff Queue Names¶
Defined in backend/core/constants.py:146-178:
# backend/core/constants.py:146-178
# Main processing queues
DETECTION_QUEUE = "detection_queue"
ANALYSIS_QUEUE = "analysis_queue"
# Dead-Letter Queue names
DLQ_PREFIX = "dlq:"
DLQ_DETECTION_QUEUE = f"{DLQ_PREFIX}{DETECTION_QUEUE}" # "dlq:detection_queue"
DLQ_ANALYSIS_QUEUE = f"{DLQ_PREFIX}{ANALYSIS_QUEUE}" # "dlq:analysis_queue"
DLQ_OVERFLOW_PREFIX = "dlq:overflow:"
Helper Functions¶
# backend/core/constants.py:181-200
def get_dlq_name(queue_name: str) -> str:
"""Get the DLQ name for a given queue."""
if queue_name.startswith(DLQ_PREFIX):
return queue_name
return f"{DLQ_PREFIX}{queue_name}"
def get_dlq_overflow_name(queue_name: str) -> str:
"""Get the overflow DLQ name for a given queue."""
clean_name = queue_name.removeprefix(DLQ_PREFIX).removeprefix("overflow:")
return f"{DLQ_OVERFLOW_PREFIX}{clean_name}"
Job Format in DLQ¶
The JobFailure dataclass (backend/services/retry_handler.py:99-161) defines the DLQ job structure:
# backend/services/retry_handler.py:99-161
@dataclass(slots=True)
class JobFailure:
"""Record of a failed job with enriched error context."""
original_job: dict[str, Any] # Original job payload
error: str # Last error message
attempt_count: int # Number of attempts made
first_failed_at: str # ISO timestamp of first failure
last_failed_at: str # ISO timestamp of last failure
queue_name: str # Original queue name
# Error context enrichment fields (NEM-1474)
error_type: str | None = None # Exception class name
stack_trace: str | None = None # Truncated stack trace (max 4096 chars)
http_status: int | None = None # HTTP status code (for network errors)
response_body: str | None = None # Truncated AI service response (max 2048 chars)
retry_delays: list[float] | None = None # Delays applied between retries
context: dict[str, Any] | None = None # System state snapshot
Example DLQ Job¶
{
"original_job": {
"camera_id": "front_door",
"file_path": "/export/foscam/front_door/image_001.jpg",
"timestamp": "2024-01-15T10:30:00.000000"
},
"error": "Connection refused: YOLO26 service unavailable",
"attempt_count": 3,
"first_failed_at": "2024-01-15T10:30:01.000000",
"last_failed_at": "2024-01-15T10:30:15.000000",
"queue_name": "detection_queue",
"error_type": "ConnectionRefusedError",
"stack_trace": "Traceback (most recent call last):\n File ...",
"http_status": null,
"response_body": null,
"retry_delays": [1.0, 2.1, 4.3],
"context": {
"detection_queue_depth": 15,
"analysis_queue_depth": 3,
"dlq_circuit_breaker_state": "closed"
}
}
DLQ Statistics¶
The DLQStats dataclass (backend/services/retry_handler.py:175-181):
# backend/services/retry_handler.py:175-181
@dataclass(slots=True)
class DLQStats:
"""Statistics about dead-letter queues."""
detection_queue_count: int = 0
analysis_queue_count: int = 0
total_count: int = 0
REST API Endpoints¶
The DLQ API is defined in backend/api/routes/dlq.py:1-391.
GET /api/dlq/stats¶
Get DLQ statistics (backend/api/routes/dlq.py:107-131):
GET /api/dlq/jobs/{queue_name}¶
List jobs in a DLQ with pagination (backend/api/routes/dlq.py:134-206):
Parameters:
queue_name:dlq:detection_queueordlq:analysis_queuestart: Start index (default: 0)limit: Max jobs to return (default: 100, max: 1000)
Response:
{
"queue_name": "dlq:detection_queue",
"items": [
{
"original_job": {...},
"error": "...",
"attempt_count": 3,
"first_failed_at": "...",
"last_failed_at": "...",
"queue_name": "detection_queue",
"error_type": "ConnectionError",
"stack_trace": "...",
"http_status": null,
"response_body": null,
"retry_delays": [1.0, 2.0, 4.0],
"context": {...}
}
],
"pagination": {
"total": 5,
"limit": 100,
"offset": 0,
"has_more": false
}
}
POST /api/dlq/requeue/{queue_name}¶
Requeue oldest job from DLQ (backend/api/routes/dlq.py:209-259):
Requires: API key authentication (when enabled)
Response:
{
"success": true,
"message": "Job requeued from dlq:detection_queue to detection_queue",
"job": null
}
POST /api/dlq/requeue-all/{queue_name}¶
Requeue all jobs from DLQ (backend/api/routes/dlq.py:262-337):
Requires: API key authentication (when enabled)
Parameters:
- Limited to
settings.max_requeue_iterationsto prevent resource exhaustion
Response:
{
"success": true,
"message": "Requeued 5 jobs from dlq:detection_queue to detection_queue",
"job": null
}
DELETE /api/dlq/{queue_name}¶
Clear all jobs from DLQ (backend/api/routes/dlq.py:340-391):
Requires: API key authentication (when enabled)
Response:
{
"success": true,
"message": "Cleared 5 jobs from dlq:detection_queue",
"queue_name": "dlq:detection_queue"
}
API Key Authentication¶
Destructive operations require API key authentication (backend/api/routes/dlq.py:41-85):
# backend/api/routes/dlq.py:41-85
async def verify_api_key(
x_api_key: str | None = Header(None, alias="X-API-Key"),
api_key: str | None = None,
) -> None:
"""Verify API key for destructive DLQ operations."""
settings = get_settings()
# Skip authentication if disabled
if not settings.api_key_enabled:
return
# Use header first, fall back to query param
key = x_api_key or api_key
if not key:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="API key required.",
)
DLQ Circuit Breaker¶
When the DLQ becomes unavailable, a circuit breaker prevents cascading failures (backend/services/retry_handler.py:24-28):
%%{init: {
'theme': 'dark',
'themeVariables': {
'primaryColor': '#3B82F6',
'primaryTextColor': '#FFFFFF',
'primaryBorderColor': '#60A5FA',
'secondaryColor': '#A855F7',
'tertiaryColor': '#009688',
'background': '#121212',
'mainBkg': '#1a1a2e',
'lineColor': '#666666'
}
}}%%
stateDiagram-v2
[*] --> CLOSED
CLOSED --> OPEN: DLQ write failures
OPEN --> HALF_OPEN: recovery_timeout elapsed
HALF_OPEN --> CLOSED: write succeeds
HALF_OPEN --> OPEN: write fails
CLOSED: Normal Operation
CLOSED: DLQ writes proceed
OPEN: DLQ Failing
OPEN: Writes skipped to prevent
OPEN: resource exhaustion
HALF_OPEN: Testing Recovery
HALF_OPEN: Limited writes allowed Data Loss Risk¶
When the DLQ circuit breaker opens, jobs cannot be preserved (backend/services/retry_handler.py:509-531):
# backend/services/retry_handler.py:509-531
if not await self._dlq_circuit_breaker.allow_call():
logger.error(
f"CRITICAL DATA LOSS: DLQ circuit breaker is open, "
f"job from {queue_name} will be PERMANENTLY LOST.",
extra={"data_loss": True, "lost_job_data": job_data},
)
return False
Programmatic DLQ Management¶
Using RetryHandler¶
from backend.services.retry_handler import get_retry_handler
handler = get_retry_handler(redis_client)
# Get statistics
stats = await handler.get_dlq_stats()
print(f"Total DLQ jobs: {stats.total_count}")
# List jobs (without removing)
jobs = await handler.get_dlq_jobs("dlq:detection_queue", start=0, end=9)
for job in jobs:
print(f"Failed at: {job.last_failed_at}, Error: {job.error}")
# Requeue single job
original_job = await handler.requeue_dlq_job("dlq:detection_queue")
# Move to specific queue
await handler.move_dlq_job_to_queue(
dlq_name="dlq:detection_queue",
target_queue="detection_queue",
)
# Clear entire DLQ
await handler.clear_dlq("dlq:detection_queue")
DLQ Circuit Breaker Management¶
# Check circuit breaker status
status = handler.get_dlq_circuit_breaker_status()
print(f"State: {status['state']}, Failures: {status['failure_count']}")
# Check if DLQ writes are blocked
if handler.is_dlq_circuit_open():
print("WARNING: DLQ writes are blocked!")
# Reset after fixing underlying issue
handler.reset_dlq_circuit_breaker()
Reprocessing Patterns¶
Pattern 1: Selective Requeue¶
# Inspect jobs first
jobs = await handler.get_dlq_jobs("dlq:detection_queue")
for job in jobs:
if "temporary" in job.error.lower():
# Transient error - safe to requeue
await handler.move_dlq_job_to_queue(
"dlq:detection_queue",
"detection_queue"
)
else:
# Permanent error - log and skip
logger.warning(f"Skipping unrecoverable job: {job.error}")
Pattern 2: Batch Requeue After Fix¶
# After fixing YOLO26 service:
curl -X POST http://localhost:8000/api/dlq/requeue-all/dlq:detection_queue \
-H "X-API-Key: your-api-key"
Pattern 3: Manual Inspection and Decision¶
# List all failed jobs
curl http://localhost:8000/api/dlq/jobs/dlq:detection_queue
# Review errors, then either:
# 1. Requeue: POST /api/dlq/requeue/dlq:detection_queue
# 2. Clear: DELETE /api/dlq/dlq:detection_queue
Monitoring Recommendations¶
- Alert on DLQ depth: Set threshold (e.g., > 10 jobs)
- Alert on circuit breaker open: Immediate data loss risk
- Regular DLQ review: Schedule periodic inspection
- Track error patterns: Monitor
error_typedistribution
Prometheus Queries¶
# DLQ depth (requires custom metric)
hsi_dlq_depth{queue="detection_queue"}
# Circuit breaker state
hsi_circuit_breaker_state{service="dlq_overflow"}
Best Practices¶
- Don't ignore DLQs: Accumulated jobs indicate systematic issues
- Review before clearing: Understand failures before discarding
- Fix root causes: Don't just requeue without addressing the error
- Test reprocessing: Verify jobs succeed before bulk requeue
- Monitor circuit breaker: Open circuit = data loss risk
Related Documentation¶
- Retry Handler - Retry logic that feeds DLQ
- Circuit Breaker - DLQ overflow protection
- Health Monitoring - Service health tracking
Source: NEM-3458 - Dead-Letter Queue Documentation