Redis Data Structures¶
Queue formats, batch state, pub/sub keys, and caching patterns.
Overview¶
Redis provides ephemeral storage for the AI pipeline, handling queues, pub/sub messaging, batch state management, and deduplication caching. All Redis data is reconstructable from PostgreSQL, allowing recovery after Redis restarts.
Source: backend/core/redis.py:1-2566
Connection Pools¶
The Redis client supports dedicated connection pools for different workload types (backend/core/redis.py:55-71):
| Pool Type | Purpose | Typical Config |
|---|---|---|
CACHE | Get/set/delete operations | Fast, high avail |
QUEUE | BLPOP/RPUSH operations (can block) | Tolerates blocking |
PUBSUB | Long-lived subscription connections | Dedicated conn |
RATELIMIT | High-frequency rate limiting | Fast, high freq |
DEFAULT | Fallback when dedicated disabled | General purpose |
Processing Queues¶
Detection Queue¶
Key: detection_queue
Type: Redis List (FIFO)
Purpose: Holds image paths waiting for YOLO26 object detection.
Operations:
- Producer: FileWatcher (
RPUSH) - Consumer: DetectionQueueWorker (
BLPOP)
Item Schema:
{
"camera_id": "front_door",
"file_path": "/export/foscam/Front Door/image_001.jpg",
"timestamp": "2026-01-24T10:30:00.000000"
}
Configuration:
| Setting | Default | Description |
|---|---|---|
queue_max_size | 10000 | Maximum items before overflow |
queue_overflow_policy | reject | Overflow handling policy |
queue_backpressure_threshold | 0.8 | Warning threshold (80%) |
Analysis Queue¶
Key: analysis_queue
Type: Redis List (FIFO)
Purpose: Holds completed batches waiting for Nemotron LLM risk analysis.
Operations:
- Producer: BatchAggregator (
RPUSH) - Consumer: AnalysisQueueWorker (
BLPOP)
Item Schema:
{
"batch_id": "abc123-def456",
"camera_id": "front_door",
"detection_ids": [1, 2, 3],
"started_at": "2026-01-24T10:30:00.000000",
"ended_at": "2026-01-24T10:31:30.000000"
}
Dead Letter Queues¶
Failed items are moved to dead letter queues for manual review and reprocessing.
Detection DLQ¶
Key: dlq:detection_queue
Type: Redis List
Purpose: Stores detection jobs that failed after max retries.
Analysis DLQ¶
Key: dlq:analysis_queue
Type: Redis List
Purpose: Stores analysis jobs that failed after max retries.
DLQ Item Schema¶
{
"original_job": {
"camera_id": "front_door",
"file_path": "/export/foscam/Front Door/image_001.jpg",
"timestamp": "2026-01-24T10:30:00.000000"
},
"error": "Connection refused: detector service unavailable",
"attempt_count": 3,
"first_failed_at": "2026-01-24T10:30:05.000000",
"last_failed_at": "2026-01-24T10:30:15.000000",
"queue_name": "detection_queue"
}
Overflow DLQ¶
Key: dlq:overflow:{queue_name}
Type: Redis List
Purpose: Stores items moved from main queues due to overflow when using dlq policy.
Source: backend/core/redis.py:794-836
Queue Overflow Policies¶
Source: backend/core/redis.py:74-79
| Policy | Behavior |
|---|---|
REJECT | Return error when queue full, item NOT added |
DLQ | Move oldest items to DLQ before adding new item |
DROP_OLDEST | Log warning and trim oldest items (legacy) |
Safe Queue Add¶
The add_to_queue_safe() method (backend/core/redis.py:701-875) provides backpressure handling:
result = await redis.add_to_queue_safe(
queue_name="detection_queue",
data={"camera_id": "front_door", "file_path": "/path/to/image.jpg"},
max_size=10000,
overflow_policy="dlq",
)
# result.success, result.queue_length, result.moved_to_dlq_count
Batch Aggregation State¶
Batch state is stored in Redis with TTL protection to prevent orphaned batches.
Key Patterns¶
| Key Pattern | Type | TTL | Description |
|---|---|---|---|
batch:{camera_id}:current | String | 1 hr | Current batch ID for camera |
batch:{batch_id}:camera_id | String | 1 hr | Camera ID for batch |
batch:{batch_id}:detections | String | 1 hr | JSON list of detection IDs |
batch:{batch_id}:started_at | String | 1 hr | Batch start timestamp (float) |
batch:{batch_id}:last_activity | String | 1 hr | Last activity timestamp (float) |
Batch Lifecycle¶
stateDiagram-v2
[*] --> NoBatch: Camera has no active batch
NoBatch --> Active: First detection arrives
Active --> Active: Add detection (update last_activity)
Active --> Closed: Window timeout (90s) OR idle timeout (30s)
Closed --> Queued: Push to analysis_queue
Queued --> [*]: Cleanup Redis keys
note right of Active: TTL protects against orphaned batches Deduplication Cache¶
Prevents duplicate processing of the same image content.
Key Pattern¶
Key: dedupe:{sha256_hash}
Type: String
Value: File path
TTL: 5 minutes (configurable via DEDUPE_TTL_SECONDS)
Usage¶
# Check if image was already processed
hash_key = f"dedupe:{sha256_hash}"
existing = await redis.get(hash_key)
if existing:
# Skip duplicate
return
# Mark as processed
await redis.set(hash_key, file_path, expire=300)
Pub/Sub Channels¶
Redis pub/sub provides real-time event broadcasting to WebSocket clients.
Security Events Channel¶
Channel: security_events
Purpose: Broadcasts new security events after LLM analysis completes.
Message Schema:
{
"type": "event",
"data": {
"id": 1,
"event_id": 1,
"batch_id": "abc123",
"camera_id": "front_door",
"risk_score": 75,
"risk_level": "high",
"summary": "Person detected at front door",
"started_at": "2026-01-24T12:00:00.000000",
"detection_count": 3
}
}
System Status Channel¶
Channel: system_status
Purpose: Broadcasts periodic system health and status updates.
Message Schema:
{
"type": "status",
"data": {
"timestamp": "2026-01-24T12:00:00.000000",
"queue_depths": {
"detection_queue": 5,
"analysis_queue": 2
},
"gpu_utilization": 45.2,
"cameras_online": 4
}
}
Subscribing to Channels¶
Source: backend/core/redis.py:1101-1138
# Shared PubSub (for simple cases)
pubsub = await redis.subscribe("security_events")
# Dedicated PubSub (for long-lived listeners)
pubsub = await redis.subscribe_dedicated("security_events", "system_status")
try:
async for message in redis.listen(pubsub):
handle_message(message)
finally:
await pubsub.unsubscribe("security_events", "system_status")
await pubsub.close()
Compression¶
Large queue payloads are automatically compressed using Zstd (backend/core/redis.py:118-191).
Compression Format¶
- Prefix:
Z:(identifies compressed data) - Encoding: Base64-encoded Zstd compressed data
- Threshold: Configurable (default: payloads > 1KB)
Backward Compatibility¶
The decompression logic (_decompress_payload) automatically handles both compressed and uncompressed data:
def _decompress_payload(self, data: str) -> str:
if data.startswith(self.COMPRESSION_PREFIX):
# Decompress: strip prefix -> base64 decode -> zstd decompress
compressed_b64 = data[len(self.COMPRESSION_PREFIX):]
compressed = base64.b64decode(compressed_b64)
return zstd.decompress(compressed).decode("utf-8")
return data # Return uncompressed data as-is
Rate Limiting Keys¶
Pattern¶
Key: ratelimit:{endpoint}:{client_ip}
Type: Sorted Set (with timestamps as scores)
Purpose: Track request counts for rate limiting.
Cache Patterns¶
Generic Cache Operations¶
Source: backend/core/redis.py:1166-1227
# Set with expiration
await redis.set("cache:key", {"data": "value"}, expire=3600)
# Set only if not exists (NX)
success = await redis.set("lock:key", "1", expire=30, nx=True)
# Get (auto-deserializes JSON)
value = await redis.get("cache:key")
# Delete
await redis.delete("cache:key")
# Check existence
exists = await redis.exists("cache:key")
Sorted Set Operations¶
Sorted sets are used for priority queues and time-based data.
Source: backend/core/redis.py:1243-1391
# Add with score (timestamp)
await redis.zadd("priority_queue", {"item_id": timestamp})
# Pop highest score (most recent)
items = await redis.zpopmax("priority_queue", count=1)
# Get by score range
items = await redis.zrangebyscore("priority_queue", min_score, max_score)
# Remove old items
removed = await redis.zremrangebyscore("priority_queue", "-inf", cutoff_time)
Retry Logic¶
All Redis operations support automatic retry with exponential backoff.
Source: backend/core/redis.py:336-416
# Manual retry wrapper
result = await redis.with_retry(
operation=lambda: redis.get("key"),
operation_name="get_key",
max_retries=3,
)
# Built-in retry methods
value = await redis.get_with_retry("key")
await redis.set_with_retry("key", value, expire=3600)
item = await redis.get_from_queue_with_retry("queue_name", timeout=5)
Backoff Calculation¶
# backend/core/redis.py:336-349
delay = min(base_delay * (2 ** (attempt - 1)), max_delay)
jitter = delay * random.uniform(0, 0.25) # 0-25% jitter
total_delay = delay + jitter
| Attempt | Base Delay | Max Delay | Typical Delay |
|---|---|---|---|
| 1 | 1.0s | 30s | 1.0-1.25s |
| 2 | 2.0s | 30s | 2.0-2.5s |
| 3 | 4.0s | 30s | 4.0-5.0s |
| 4 | 8.0s | 30s | 8.0-10.0s |
| 5 | 16.0s | 30s | 16.0-20.0s |
| 6 | 30.0s | 30s | 30.0-37.5s |
Queue Pressure Monitoring¶
Source: backend/core/redis.py:881-925
metrics = await redis.get_queue_pressure(
queue_name="detection_queue",
max_size=10000,
timeout=5.0, # Prevent hangs
)
# metrics.current_length
# metrics.max_size
# metrics.fill_ratio
# metrics.is_at_pressure_threshold (>80%)
# metrics.is_full
# metrics.overflow_policy
Key Naming Conventions¶
| Pattern | Example | Purpose |
|---|---|---|
{queue_name} | detection_queue | Main processing queue |
dlq:{queue_name} | dlq:detection_queue | Failed item storage |
dlq:overflow:{queue_name} | dlq:overflow:detection_queue | Overflow item storage |
batch:{camera_id}:current | batch:front_door:current | Current batch ID |
batch:{batch_id}:{field} | batch:abc123:detections | Batch state field |
dedupe:{hash} | dedupe:a1b2c3d4... | Deduplication cache |
cache:{namespace}:{key} | cache:camera:front_door | Generic cache entry |
ratelimit:{endpoint}:{ip} | ratelimit:api:192.168.1.1 | Rate limit tracking |
Health Check¶
Source: backend/core/redis.py:677-697
health = await redis.health_check()
# {
# "status": "healthy",
# "connected": True,
# "redis_version": "7.2.0"
# }
Connection Lifecycle¶
Initialization¶
# backend/core/redis.py:2513-2537
from backend.core.redis import init_redis
redis = await init_redis() # Creates singleton, connects with retry
Shutdown¶
# backend/core/redis.py:2558-2566
from backend.core.redis import close_redis
await close_redis() # Disconnects and cleans up
FastAPI Dependency¶
from backend.core.redis import get_redis, get_redis_optional
@app.get("/api/events")
async def get_events(redis: RedisClient = Depends(get_redis)):
# redis is guaranteed to be connected
...
@app.get("/api/health")
async def health(redis: RedisClient | None = Depends(get_redis_optional)):
# redis may be None if unavailable
...