Critical Paths and Latency Optimization¶
This document describes the performance-critical paths through the detection pipeline, latency targets, and optimization strategies.
Latency Targets¶
| Stage | Target | Rationale |
|---|---|---|
| File detection to queue | <500ms | User-perceived responsiveness |
| Detection inference | <100ms | GPU batch efficiency |
| Event broadcast | <50ms | Real-time UI updates |
| Total pipeline | <5s | End-to-end detection to event |
Pipeline Stages with Latency Tracking¶
The pipeline records latency at each stage for observability:
Stage 1: watch_to_detect¶
Source: backend/services/file_watcher.py (lines 767-769)
duration_ms = int((time.time() - start_time) * 1000)
record_pipeline_stage_latency("watch_to_detect", float(duration_ms))
Components:
- File system event detection (watchdog)
- Debounce delay (0.5s default)
- File stability check (2.0s for FTP)
- Image validation (PIL load)
- Deduplication check (Redis)
- Queue enqueue (Redis LPUSH)
Stage 2: detect_to_batch¶
Source: backend/services/pipeline_workers.py (lines 453-455)
duration = time.time() - start_time
observe_stage_duration("detect", duration)
record_pipeline_stage_latency("detect_to_batch", duration * 1000)
Components:
- Queue dequeue (Redis BRPOP)
- Payload validation
- Semaphore acquisition
- YOLO26 HTTP request
- Detection filtering
- Database insert
- Batch aggregator update
Stage 3: batch_to_analyze¶
Source: backend/services/pipeline_workers.py (lines 888-891)
duration = time.time() - start_time
record_pipeline_stage_latency("batch_to_analyze", duration * 1000)
await record_stage_latency(self._redis, "analyze", duration * 1000)
Components:
- Queue dequeue (Redis BRPOP)
- Payload validation
- Detection fetch (PostgreSQL)
- Context enrichment
- Enrichment pipeline (optional)
- Semaphore acquisition
- Nemotron LLM request
- Response parsing
- Event creation
- Database insert
- WebSocket broadcast
Stage 4: total_pipeline¶
Source: backend/services/pipeline_workers.py (lines 894-916)
if pipeline_start_time:
start_dt = datetime.fromisoformat(pipeline_start_time.replace("Z", "+00:00"))
total_duration_ms = (datetime.now(UTC) - start_dt).total_seconds() * 1000
record_pipeline_stage_latency("total_pipeline", total_duration_ms)
Tracked from: First file detection to event creation completion.
Fast Path Optimization¶
For critical detections, the fast path bypasses batch aggregation entirely.
Source: backend/services/batch_aggregator.py (lines 1028-1082)
Criteria (Lines 1028-1048)¶
def _should_use_fast_path(self, confidence: float | None, object_type: str | None) -> bool:
if confidence is None or object_type is None:
return False
if confidence < self._fast_path_threshold: # 0.9 default
return False
return object_type.lower() in [t.lower() for t in self._fast_path_types]
Default criteria:
- Confidence >= 0.9 (90%)
- Object type in
["person"]
Latency Impact¶
| Path | Typical Latency | Notes |
|---|---|---|
| Normal batch | 30-90s | Wait for batch window/idle timeout |
| Fast path | <5s | Immediate analysis |
Trade-offs¶
Benefits:
- Immediate alerts for high-confidence threats
- Reduced time-to-notification for security events
Costs:
- Higher LLM load (more individual requests)
- Less context for LLM (single detection vs batch)
Concurrency Control¶
Inference Semaphore (NEM-1463)¶
Source: backend/services/inference_semaphore.py
Limits concurrent AI inference operations to prevent GPU overload:
Impact on latency:
- Under load, requests queue behind the semaphore
- Trade-off between throughput and latency
- Higher limits increase parallelism but risk GPU OOM
Detector Client Semaphore (NEM-1500)¶
Source: backend/services/detector_client.py (lines 184-204)
@classmethod
def _get_semaphore(cls) -> asyncio.Semaphore:
settings = get_settings()
limit = settings.ai_max_concurrent_inferences
if cls._request_semaphore is None or cls._semaphore_limit != limit:
cls._request_semaphore = asyncio.Semaphore(limit)
Retry and Backoff¶
Exponential Backoff (NEM-1343)¶
Both detector and analyzer use exponential backoff for transient failures:
Retry schedule: | Attempt | Delay | |---------|-------| | 1 | 1s | | 2 | 2s | | 3 | 4s | | 4 | 8s | | 5+ | 30s (capped) |
Impact on Latency¶
Worst-case latency with retries (3 attempts):
Circuit Breaker (NEM-1724)¶
Source: backend/services/detector_client.py (lines 296-309)
Prevents retry storms when services are unavailable:
self._circuit_breaker = CircuitBreaker(
name="yolo26",
config=CircuitBreakerConfig(
failure_threshold=5, # Opens after 5 failures
recovery_timeout=60.0, # 60s before retry
half_open_max_calls=3,
success_threshold=2,
),
)
Latency impact:
- When circuit is open, requests fail immediately
- Prevents cascading delays during outages
Timeout Configuration¶
Detector Timeouts¶
Source: backend/services/detector_client.py (lines 97-99)
DETECTOR_CONNECT_TIMEOUT = 10.0 # Connection establishment
DETECTOR_READ_TIMEOUT = 60.0 # AI inference response
DETECTOR_HEALTH_TIMEOUT = 5.0 # Health check
Analyzer Timeouts¶
Source: backend/services/nemotron_analyzer.py (lines 127-132)
NEMOTRON_CONNECT_TIMEOUT = 10.0 # Connection establishment
NEMOTRON_READ_TIMEOUT = 120.0 # LLM response (complex inference)
NEMOTRON_HEALTH_TIMEOUT = 5.0 # Health check
Defense-in-Depth (NEM-1465)¶
Source: backend/services/detector_client.py (lines 579-582)
# Explicit asyncio.timeout() wrapper
explicit_timeout = settings.yolo26_read_timeout + settings.ai_connect_timeout
async with asyncio.timeout(explicit_timeout):
response = await self._http_client.post(...)
Cold Start Optimization (NEM-1670)¶
Model Warmup¶
Source: backend/services/detector_client.py (lines 488-539)
async def warmup(self) -> bool:
"""Perform model warmup by running a test inference."""
if not self._warmup_enabled:
return True
was_cold = self.is_cold()
self._is_warming = True
result = await self.model_readiness_probe()
if result:
self._track_inference()
set_model_warmth_state("yolo26", "warm")
Configuration:
ai_warmup_enabled = True # Enable warmup on startup
ai_cold_start_threshold_seconds = 300 # 5 minutes idle = cold
Cold Detection¶
def is_cold(self) -> bool:
if self._last_inference_time is None:
return True
seconds_since_last = time.monotonic() - self._last_inference_time
return seconds_since_last > self._cold_start_threshold
Redis Pipelining¶
Batch Timeout Checking¶
Source: backend/services/batch_aggregator.py (lines 582-617)
Uses Redis pipelining to reduce RTTs from O(N * 3) to O(2):
# Phase 1: Fetch all batch IDs in parallel (single RTT)
batch_id_pipe = redis_client.pipeline()
for batch_key in batch_keys:
batch_id_pipe.get(batch_key)
batch_ids = await batch_id_pipe.execute()
# Phase 2: Fetch all metadata in parallel (single RTT)
metadata_pipe = redis_client.pipeline()
for _batch_key, batch_id in valid_batches:
metadata_pipe.get(f"batch:{batch_id}:started_at")
metadata_pipe.get(f"batch:{batch_id}:last_activity")
Atomic Batch Creation (NEM-2014)¶
Source: backend/services/batch_aggregator.py (lines 333-391)
async with client.pipeline(transaction=True) as pipe:
pipe.set(batch_key, batch_id, ex=ttl)
pipe.set(f"batch:{batch_id}:camera_id", camera_id, ex=ttl)
pipe.set(f"batch:{batch_id}:started_at", str(current_time), ex=ttl)
pipe.set(f"batch:{batch_id}:last_activity", str(current_time), ex=ttl)
await pipe.execute()
Async I/O Optimization¶
Thread Pool for Blocking Operations¶
Source: backend/services/file_watcher.py (lines 246-247)
# Run blocking PIL operations in thread pool
return await asyncio.to_thread(_validate_image_sync, file_path)
Parallel Data Fetching¶
Source: backend/services/batch_aggregator.py (lines 788-800)
async with asyncio.TaskGroup() as tg:
tg.create_task(fetch_detections())
tg.create_task(fetch_started_at())
tg.create_task(fetch_pipeline_time())
HTTP Connection Pooling (NEM-1721)¶
Source: backend/services/detector_client.py (lines 284-293)
# Persistent HTTP connection pool
self._http_client = httpx.AsyncClient(
timeout=self._timeout,
limits=httpx.Limits(max_connections=10, max_keepalive_connections=5),
)
Benefits:
- Avoids TCP handshake overhead
- Reuses existing connections
- Prevents connection exhaustion
Free-Threading Support (Python 3.13t/3.14t)¶
Source: backend/services/detector_client.py (lines 107-148)
When running on free-threaded Python (GIL disabled):
def _is_free_threaded() -> bool:
if hasattr(sys, "_is_gil_enabled"):
return not sys._is_gil_enabled()
return False
def _get_default_inference_limit() -> int:
if _is_free_threaded():
return 20 # Higher limit with true parallelism
return 4 # Conservative limit with GIL
Monitoring and Alerts¶
Key Metrics to Monitor¶
| Metric | Alert Threshold | Description |
|---|---|---|
hsi_pipeline_stage_duration_seconds{stage="detect"} | p99 > 1s | Detection taking too long |
hsi_pipeline_stage_duration_seconds{stage="analyze"} | p99 > 60s | Analysis taking too long |
hsi_queue_depth{queue="detection"} | > 100 | Detection backlog |
hsi_queue_depth{queue="analysis"} | > 50 | Analysis backlog |
hsi_pipeline_errors_total | > 10/min | Error rate spike |
Latency Percentiles¶
Track p50, p90, p99 for each stage:
hsi_pipeline_stage_duration_seconds_bucket{stage="detect",le="0.1"}
hsi_pipeline_stage_duration_seconds_bucket{stage="detect",le="0.5"}
hsi_pipeline_stage_duration_seconds_bucket{stage="detect",le="1.0"}
hsi_pipeline_stage_duration_seconds_bucket{stage="detect",le="5.0"}
Related Documentation¶
- FileWatcher: Entry point optimizations
- Detection Queue: Queue processing patterns
- Batch Aggregator: Batching strategy
- Analysis Queue: LLM optimization
- Resilience Patterns: Circuit breakers and retries