Nemotron Analyzer¶
The NemotronAnalyzer service uses the Nemotron-3-Nano-30B-A3B (via llama.cpp server) to analyze detection batches and generate risk assessments with natural language reasoning.
Source Files¶
- Main Analyzer:
backend/services/nemotron_analyzer.py - Prompt Templates:
backend/services/prompts.py
Architecture Overview¶
%%{init: {
'theme': 'dark',
'themeVariables': {
'primaryColor': '#3B82F6',
'primaryTextColor': '#FFFFFF',
'primaryBorderColor': '#60A5FA',
'secondaryColor': '#A855F7',
'tertiaryColor': '#009688',
'background': '#121212',
'mainBkg': '#1a1a2e',
'lineColor': '#666666'
}
}}%%
flowchart LR
subgraph Input
DB["Detection Batch<br/>(from Redis)"]
end
subgraph Analyzer["NemotronAnalyzer"]
CE[Context Enrichment]
PB[Prompt Building]
RP[Response Parsing]
end
subgraph LLM["Nemotron LLM"]
NEM["Port 8091<br/>llama.cpp server<br/>/completion"]
end
subgraph Output["Event Creation"]
RS["Risk Score 0-100"]
SR["Summary + Reason"]
end
DB --> Analyzer
Analyzer --> LLM
LLM --> Output Class Definition¶
class NemotronAnalyzer:
"""Analyzes detection batches using Nemotron LLM for risk assessment.
Features:
- Retry logic with exponential backoff for transient failures
- Configurable timeouts and retry attempts via settings
- Context enrichment with zone, baseline, and cross-camera data
- Enrichment pipeline for license plates, faces, and OCR
- A/B testing support for prompt experimentation
"""
Configuration¶
| Setting | Default | Description |
|---|---|---|
NEMOTRON_URL | http://ai-llm:8091 | LLM service URL |
NEMOTRON_API_KEY | None | Optional API key |
NEMOTRON_MAX_RETRIES | 3 | Maximum retry attempts |
AI_CONNECT_TIMEOUT | 10.0s | Connection timeout |
NEMOTRON_READ_TIMEOUT | 120.0s | Read timeout for LLM inference |
NEMOTRON_MAX_OUTPUT_TOKENS | 512 | Maximum output tokens |
Analysis Flow¶
1. analyze_batch(batch_id, camera_id, detection_ids)
|
+-- Idempotency check (NEM-1725)
|
2. SESSION 1 (READ): Fetch data
|
+-- Load camera details
+-- Fetch detections
+-- Enrich context (zones, baselines, cross-camera)
+-- Query scene changes for camera health
+-- Fetch auto-tuning context
+-- Extract detection data for enrichment
|
3. EXTERNAL CALLS (NO SESSION):
|
+-- Run enrichment pipeline (license plates, faces, OCR)
+-- Household matching (reduce risk for known members)
+-- Call LLM for risk analysis
|
4. SESSION 2 (WRITE): Persist results
|
+-- Create Event record
+-- Link detections via junction table
+-- Store enrichment data on detections
+-- Set idempotency key
+-- Create audit record
Prompt Building¶
System Prompt¶
The LLM receives a calibrated system prompt emphasizing that most detections are NOT threats:
CRITICAL PRINCIPLE: Most detections are NOT threats. Residents, family members,
delivery workers, and pets represent normal household activity. Your job is to
identify genuine anomalies, not flag everyday life.
CALIBRATION: In a typical day, expect:
- 80% of events to be LOW risk (0-29): Normal activity
- 15% to be MEDIUM risk (30-59): Worth noting but not alarming
- 4% to be HIGH risk (60-84): Genuinely suspicious, warrants review
- 1% to be CRITICAL (85-100): Immediate threats only
Scoring Reference¶
| Scenario | Score | Reasoning |
|----------|-------|-----------|
| Resident arriving home | 5-15 | Expected activity |
| Delivery driver at door | 15-25 | Normal service visit |
| Unknown person on sidewalk | 20-35 | Public area, passive |
| Unknown person lingering | 45-60 | Warrants attention |
| Person testing door handles | 70-85 | Clear suspicious intent |
| Active break-in or violence | 85-100 | Immediate threat |
Prompt Templates¶
Multiple prompt templates support different enrichment levels:
| Template | Use Case |
|---|---|
RISK_ANALYSIS_PROMPT | Basic detection-only analysis |
ENRICHED_RISK_ANALYSIS_PROMPT | With zone/baseline context |
FULL_ENRICHED_RISK_ANALYSIS_PROMPT | With enrichment pipeline data |
MODEL_ZOO_ENHANCED_RISK_ANALYSIS_PROMPT | With all model zoo enrichment |
VISION_ENHANCED_RISK_ANALYSIS_PROMPT | With Florence-2 captions |
Context Sections¶
The prompt includes multiple context sections:
# Zone context
{zone_context} # "Detection in zone 'Front Porch' (entry zone)"
# Baseline deviation
{baseline_context} # "Activity is 3.2x above baseline for this hour"
# Cross-camera activity
{cross_camera_context} # "Same person seen at Back Door 2 minutes ago"
# Camera health alerts
{camera_health_context} # "WARNING: Camera tamper detected 5 minutes ago"
# Household matching
{household_context} # "KNOWN: Matches household member 'John' (confidence: 92%)"
# Enrichment data
{enrichment_context} # Pose, clothing, vehicle type, license plate, etc.
Response Parsing¶
The LLM outputs JSON with optional <think> reasoning blocks:
def _parse_llm_response(self, text: str) -> dict[str, Any]:
"""Parse JSON response from LLM completion.
Handles Nemotron output which includes <think>...</think> reasoning
blocks before the actual JSON response.
"""
# Strip <think>...</think> blocks
cleaned_text = _THINK_PATTERN.sub("", text).strip()
# Handle incomplete think blocks
if "<think>" in cleaned_text:
# Find content after the last </think>
...
# Extract JSON object
matches = _JSON_PATTERN.findall(cleaned_text)
Expected Response Format¶
{
"risk_score": 25,
"risk_level": "low",
"summary": "Delivery driver detected at front door during business hours",
"reasoning": "Amazon delivery uniform visible, package in hand, brief visit typical of deliveries"
}
Risk Score Validation¶
def _validate_risk_data(self, risk_data: dict[str, Any]) -> dict[str, Any]:
"""Validate and normalize risk data."""
# Clamp risk_score to 0-100
risk_score = risk_data.get("risk_score", 50)
risk_score = max(0, min(100, int(risk_score)))
# Determine risk_level from score if not provided
if "risk_level" not in risk_data:
if risk_score < 30:
risk_level = "low"
elif risk_score < 60:
risk_level = "medium"
elif risk_score < 85:
risk_level = "high"
else:
risk_level = "critical"
Risk Scoring Guidelines¶
| Risk Level | Score Range | Description |
|---|---|---|
| Low | 0-29 | Normal household activity |
| Medium | 30-59 | Worth noting, not alarming |
| High | 60-84 | Genuinely suspicious |
| Critical | 85-100 | Immediate threat |
Factors That Lower Risk¶
- Known household member match
- Recognized vehicle (registered plate)
- Delivery uniform detected
- Expected schedule match
- Pet classification (household animal)
- Normal activity zone
Factors That Raise Risk¶
- Unknown person at sensitive location
- Suspicious pose (crouching, hiding)
- Weapon detected
- Unusual time of day
- Tampering detected
- Violence indicators
- Extended loitering
A/B Testing Support¶
The analyzer supports prompt A/B testing and experimentation:
def set_ab_test_config(self, config: ABTestConfig) -> None:
"""Configure A/B testing for prompt versions."""
self._ab_config = config
self._ab_tester = PromptABTester(config)
async def get_prompt_version(self) -> tuple[int, bool]:
"""Get the prompt version to use for this request."""
if self._ab_tester is not None:
return self._ab_tester.select_prompt_version()
return (1, False) # Default version
Shadow Mode¶
Run both V1 and V2 prompts but return V1 results:
async def run_shadow_analysis(self, camera_id: str, context: str) -> dict:
"""Run both prompts, return V1, log V2 for comparison."""
v1_result = await self._call_llm_with_version(context, "v1_original")
v2_result = await self._call_llm_with_version(context, "v2_calibrated")
# Log comparison for analysis
await self._log_shadow_result(camera_id, v1_result, v2_result)
return {"primary_result": v1_result, "shadow_result": v2_result}
Idempotency Handling¶
Prevent duplicate Events on retry:
async def _check_idempotency(self, batch_id: str) -> int | None:
"""Check if Event already exists for this batch."""
key = f"batch_event:{batch_id}"
event_id = await self._redis.get(key)
return int(event_id) if event_id else None
async def _set_idempotency(self, batch_id: str, event_id: int) -> None:
"""Store idempotency key after Event creation."""
key = f"batch_event:{batch_id}"
await self._redis.set(key, str(event_id), expire=3600) # 1 hour TTL
Cold Start and Warmup¶
async def warmup(self) -> bool:
"""Perform model warmup by running a test inference."""
was_cold = self.is_cold()
self._is_warming = True
result = await self.model_readiness_probe() # Simple prompt
if result:
self._track_inference()
if was_cold:
record_model_cold_start("nemotron")
set_model_warmth_state("nemotron", "warm")
return True
return False
async def model_readiness_probe(self) -> bool:
"""Send test prompt to verify model is loaded."""
response = await client.post(
f"{self._llm_url}/v1/completions",
json={
"prompt": self._warmup_prompt,
"max_tokens": 50,
"temperature": 0.1,
},
)
return response.status_code == 200
Metrics¶
# LLM metrics
observe_ai_request_duration("nemotron", llm_duration_seconds)
record_nemotron_tokens(token_count) # If available in response
# Risk metrics
observe_risk_score(event.risk_score)
record_event_by_camera(camera_id)
record_event_by_risk_level(event.risk_level)
record_event_created()
# Prompt metrics
record_prompt_template_used(template_name)
record_prompt_latency(f"v{prompt_version}", latency_seconds)
# Pipeline errors
record_pipeline_error("nemotron_analysis_error")
Error Handling and Fallback¶
When LLM analysis fails:
except Exception as e:
# Create fallback risk data
risk_data = {
"risk_score": 50,
"risk_level": "medium",
"summary": "Analysis unavailable - LLM service error",
"reasoning": "Failed to analyze detections due to service error",
}
Usage Example¶
from backend.services.nemotron_analyzer import NemotronAnalyzer
from backend.core.redis import RedisClient
# Initialize analyzer
redis_client = RedisClient()
analyzer = NemotronAnalyzer(
redis_client=redis_client,
use_enriched_context=True,
use_enrichment_pipeline=True,
)
# Perform warmup on startup
await analyzer.warmup()
# Analyze a batch
event = await analyzer.analyze_batch(
batch_id="batch_abc123",
camera_id="camera1",
detection_ids=[1, 2, 3, 4, 5],
)
print(f"Risk Score: {event.risk_score}")
print(f"Summary: {event.summary}")
print(f"Reasoning: {event.reasoning}")