Batch Aggregator¶
The BatchAggregator groups detections by camera into time-based batches before LLM analysis, optimizing GPU utilization and context coherence.
Overview¶
Source: backend/services/batch_aggregator.py
The aggregator provides:
- Time-window batching: Group detections within 90-second windows
- Idle timeout: Close batches after 30 seconds of inactivity
- Size limits: Prevent memory exhaustion from large batches
- Fast path: Bypass batching for high-confidence person detections
Class Definition¶
Constructor Parameters (Lines 135-161)¶
| Parameter | Type | Default | Description |
|---|---|---|---|
redis_client | RedisClient | None | Redis client for batch storage |
analyzer | NemotronAnalyzer | None | For fast path analysis |
Configured Values (Lines 144-150)¶
settings = get_settings()
self._batch_window = settings.batch_window_seconds # 90s
self._idle_timeout = settings.batch_idle_timeout_seconds # 30s
self._fast_path_threshold = settings.fast_path_confidence_threshold # 0.9
self._fast_path_types = settings.fast_path_object_types # ["person"]
self._batch_max_detections = settings.batch_max_detections
Redis Key Structure¶
All batch keys have 1-hour TTL for orphan cleanup (line 133):
Key Patterns:
| Key | Purpose |
|---|---|
batch:{camera_id}:current | Current batch ID for camera |
batch:{batch_id}:camera_id | Camera ID for batch |
batch:{batch_id}:detections | Redis LIST of detection IDs |
batch:{batch_id}:started_at | Batch start timestamp |
batch:{batch_id}:last_activity | Last activity timestamp |
batch:{batch_id}:pipeline_start_time | First detection timestamp (latency tracking) |
batch:{batch_id}:closing | Closing flag to prevent races |
Batch ID Generation¶
def generate_batch_id() -> str: # Line 63
"""Generate a short, unique batch identifier."""
return f"batch-{uuid.uuid4().hex[:8]}"
# Example: "batch-a1b2c3d4"
Adding Detections¶
Source: Lines 393-547
async def add_detection(
self,
camera_id: str,
detection_id: int | str,
_file_path: str,
confidence: float | None = None,
object_type: str | None = None,
pipeline_start_time: str | None = None,
) -> str:
"""Add detection to batch for camera."""
Fast Path Check (Lines 443-454)¶
# Check if detection meets fast path criteria
if self._should_use_fast_path(confidence, object_type):
logger.info("Fast path triggered for detection")
await self._process_fast_path(camera_id, detection_id_int)
return f"fast_path_{detection_id_int}"
Batch Size Limit (Lines 468-491)¶
if batch_id:
current_size = await self._redis._client.llen(detections_key)
if current_size >= self._batch_max_detections:
logger.info("Batch reached max size, closing")
record_batch_max_reached(camera_id)
# Close current batch and set batch_id to None
await self._close_batch_for_size_limit(batch_id)
batch_id = None
Atomic Batch Creation (Lines 333-391, 493-516)¶
if not batch_id:
# Create new batch with human-readable ID
batch_id = generate_batch_id()
# Atomic transaction using Redis pipeline (MULTI/EXEC)
await self._create_batch_metadata_atomic(
batch_key=batch_key,
batch_id=batch_id,
camera_id=camera_id,
current_time=current_time,
ttl=ttl,
pipeline_start_time=pipeline_start_time,
)
Atomic List Append (Lines 518-535)¶
# Add detection using atomic RPUSH operation
detections_key = f"batch:{batch_id}:detections"
detection_count = await self._atomic_list_append(detections_key, detection_id_int, ttl)
# Update last activity timestamp
await self._redis.set(f"batch:{batch_id}:last_activity", str(current_time), expire=ttl)
Concurrency Control¶
Per-Camera Locks (Lines 154-158, 266-280)¶
# Per-camera locks to prevent race conditions
self._camera_locks: defaultdict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
async def _get_camera_lock(self, camera_id: str) -> asyncio.Lock:
"""Get or create a lock for the specified camera."""
async with self._locks_lock:
return self._camera_locks[camera_id]
Global Batch Close Lock (Lines 158-159)¶
# Global lock for batch timeout checking and closing operations
self._batch_close_lock = asyncio.Lock()
Batch Timeout Checking¶
Source: Lines 549-703
The check_batch_timeouts() method uses Redis pipelining to efficiently check all batches:
async def check_batch_timeouts(self) -> list[str]:
"""Check all active batches for timeouts and close expired ones."""
# Phase 1: Fetch all batch IDs using pipeline (lines 584-586)
batch_id_pipe = redis_client.pipeline()
for batch_key in batch_keys:
batch_id_pipe.get(batch_key)
batch_ids = await batch_id_pipe.execute()
# Phase 2: Fetch all metadata in parallel (lines 613-617)
metadata_pipe = redis_client.pipeline()
for _batch_key, batch_id in valid_batches:
metadata_pipe.get(f"batch:{batch_id}:started_at")
metadata_pipe.get(f"batch:{batch_id}:last_activity")
metadata_results = await metadata_pipe.execute()
# Check timeouts (lines 658-673)
window_elapsed = current_time - started_at
idle_time = current_time - last_activity
if window_elapsed >= self._batch_window:
should_close = True
close_reason = "batch window exceeded"
elif idle_time >= self._idle_timeout:
should_close = True
close_reason = "idle timeout exceeded"
Closing Batches¶
Source: Lines 705-896
async def close_batch(self, batch_id: str) -> dict[str, Any]:
"""Force close a batch and push to analysis queue."""
# Acquire locks (lines 729-740)
async with self._batch_close_lock:
camera_id = await self._redis.get(f"batch:{batch_id}:camera_id")
camera_lock = await self._get_camera_lock(camera_id)
async with camera_lock:
# Set closing flag with TTL (lines 743-747)
await self._redis._client.set(
f"batch:{batch_id}:closing",
"1",
ex=BATCH_CLOSING_FLAG_TTL_SECONDS, # 5 minutes
)
# Fetch batch data in parallel using TaskGroup (lines 788-800)
async with asyncio.TaskGroup() as tg:
tg.create_task(fetch_detections())
tg.create_task(fetch_started_at())
tg.create_task(fetch_pipeline_time())
# Push to analysis queue (lines 815-858)
if detections:
queue_item = {
"batch_id": batch_id,
"camera_id": camera_id,
"detection_ids": detections,
"timestamp": time.time(),
"pipeline_start_time": pipeline_start_time,
}
result = await self._redis.add_to_queue_safe(
self._analysis_queue,
queue_item,
overflow_policy=QueueOverflowPolicy.DLQ,
)
# Cleanup Redis keys (lines 874-882)
await self._redis.delete(
f"batch:{camera_id}:current",
f"batch:{batch_id}:camera_id",
f"batch:{batch_id}:detections",
f"batch:{batch_id}:started_at",
f"batch:{batch_id}:last_activity",
f"batch:{batch_id}:pipeline_start_time",
f"batch:{batch_id}:closing",
)
Fast Path Processing¶
The fast path bypasses batching for critical detections (lines 1028-1082):
Criteria (Lines 1028-1048)¶
def _should_use_fast_path(self, confidence: float | None, object_type: str | None) -> bool:
"""Check if detection meets fast path criteria."""
if confidence is None or object_type is None:
return False
if confidence < self._fast_path_threshold: # 0.9 default
return False
return object_type.lower() in [t.lower() for t in self._fast_path_types]
Processing (Lines 1050-1082)¶
async def _process_fast_path(self, camera_id: str, detection_id: int) -> None:
"""Process detection via fast path (immediate analysis)."""
if not self._analyzer:
from backend.services.nemotron_analyzer import NemotronAnalyzer
self._analyzer = NemotronAnalyzer(redis_client=self._redis)
await self._analyzer.analyze_detection_fast_path(
camera_id=camera_id,
detection_id=detection_id,
)
Size Limit Handling¶
Source: Lines 898-1026
When a batch reaches the max detection limit, it's closed with reason "max_size":
async def _close_batch_for_size_limit(self, batch_id: str) -> dict[str, Any] | None:
"""Close a batch that has reached the max detection size limit."""
summary = {
"batch_id": batch_id,
"camera_id": camera_id,
"detection_ids": detections,
"started_at": started_at,
"ended_at": ended_at,
"reason": "max_size", # NEM-1726
}
result = await self._redis.add_to_queue_safe(
self._analysis_queue,
summary,
overflow_policy=QueueOverflowPolicy.DLQ,
)
Memory Pressure Backpressure¶
Source: Lines 1084-1119
When GPU memory is critical, the aggregator can apply backpressure:
async def should_apply_backpressure(self) -> bool:
"""Check if backpressure should be applied due to GPU memory pressure."""
from backend.services.gpu_monitor import MemoryPressureLevel
pressure_level = await get_memory_pressure_level()
should_throttle = pressure_level == MemoryPressureLevel.CRITICAL
if should_throttle:
logger.warning("Backpressure active due to critical GPU memory pressure")
return should_throttle
WebSocket Broadcasting¶
The aggregator broadcasts events for real-time UI updates (lines 163-264):
Detection New Event (Lines 163-211)¶
async def _broadcast_detection_new(
self, detection_id: int, batch_id: str, camera_id: str, ...
) -> None:
"""Broadcast a detection.new event via WebSocket."""
broadcaster = await get_broadcaster(self._redis)
detection_data = {
"detection_id": detection_id,
"batch_id": batch_id,
"camera_id": camera_id,
"label": label or "unknown",
"confidence": confidence,
"timestamp": datetime.now(UTC).isoformat(),
}
await broadcaster.broadcast_detection_new(detection_data)
Detection Batch Event (Lines 213-264)¶
async def _broadcast_detection_batch(
self, batch_id: str, camera_id: str, detection_ids: list[int], ...
) -> None:
"""Broadcast a detection.batch event via WebSocket."""
batch_data = {
"batch_id": batch_id,
"camera_id": camera_id,
"detection_ids": detection_ids,
"detection_count": len(detection_ids),
"started_at": datetime.fromtimestamp(started_at, tz=UTC).isoformat(),
"closed_at": datetime.fromtimestamp(closed_at, tz=UTC).isoformat(),
"close_reason": close_reason,
}
await broadcaster.broadcast_detection_batch(batch_data)
BatchTimeoutWorker¶
Source: backend/services/pipeline_workers.py (lines 941-1099)
The BatchTimeoutWorker periodically checks for timed-out batches:
class BatchTimeoutWorker: # Line 941
"""Worker that periodically checks and closes timed-out batches."""
def __init__(
self,
redis_client: RedisClient,
batch_aggregator: BatchAggregator | None = None,
check_interval: float = 10.0, # Check every 10 seconds
stop_timeout: float = 10.0,
):
self._aggregator = batch_aggregator or BatchAggregator(redis_client=redis_client)
self._check_interval = check_interval
Processing Loop (Lines 1033-1099)¶
async def _run_loop(self) -> None:
while self._running:
start_time = time.time()
# Check for batch timeouts FIRST
closed_batches = await self._aggregator.check_batch_timeouts()
if closed_batches:
self._stats.items_processed += len(closed_batches)
observe_stage_duration("batch", duration)
record_pipeline_stage_latency("detect_to_batch", duration * 1000)
# Maintain consistent check interval
elapsed = time.time() - start_time
sleep_time = max(0.0, self._check_interval - elapsed)
await asyncio.sleep(sleep_time)
Configuration¶
Batching timing parameters (also documented in the AI Pipeline Architecture):
Additional batch-aggregator settings:
| Setting | Default | Description |
|---|---|---|
batch_max_detections | 50 | Max detections per batch |
fast_path_confidence_threshold | 0.9 | Min confidence for fast path |
fast_path_object_types | ["person"] | Object types for fast path |
Metrics¶
hsi_batch_max_reached_total- Batches closed due to size limithsi_pipeline_stage_duration_seconds{stage="batch"}- Batch processing time
Related Documentation¶
- Detection Queue: Source of detections
- Analysis Queue: Destination for closed batches
- Critical Paths: Fast path optimization