Skip to content

Enrichment Pipeline

The enrichment pipeline extracts additional context from detections using a multi-model architecture. It runs specialized models for person analysis, vehicle classification, and other enrichment tasks.

Source Files

  • HTTP Client: backend/services/enrichment_client.py
  • Model Manager: ai/enrichment/model_manager.py
  • Model Registry: ai/enrichment/model_registry.py
  • Individual Models: ai/enrichment/models/

Architecture Overview

%%{init: {
  'theme': 'dark',
  'themeVariables': {
    'primaryColor': '#3B82F6',
    'primaryTextColor': '#FFFFFF',
    'primaryBorderColor': '#60A5FA',
    'secondaryColor': '#A855F7',
    'tertiaryColor': '#009688',
    'background': '#121212',
    'mainBkg': '#1a1a2e',
    'lineColor': '#666666'
  }
}}%%
flowchart TB
    EC["EnrichmentClient<br/>HTTP Client"]

    EC -->|"POST /enrich<br/>POST /vehicle-classify<br/>POST /pet-classify<br/>POST /clothing-classify<br/>POST /pose-analyze<br/>POST /action-classify"| AE

    AE["ai-enrichment:8094<br/>FastAPI Server"]
    AE --> MM["OnDemandModelManager<br/>VRAM Budget: 6.8GB<br/>LRU Eviction"]

    MM --> POSE["Pose<br/>Estimator"]
    MM --> CLOTH["Clothes<br/>FashionCLIP"]
    MM --> VEH["Vehicle<br/>Classify"]
    MM --> PET["Pet<br/>Classify"]
    MM --> THREAT["Threat<br/>Detect"]

Service Endpoints

Endpoint Method Purpose
/health GET Service health check
/enrich POST Unified enrichment (all models)
/vehicle-classify POST Vehicle type classification
/pet-classify POST Cat/dog classification
/clothing-classify POST FashionCLIP clothing attributes
/pose-analyze POST ViTPose+ body keypoints
/depth-estimate POST Depth Anything V2 depth map
/object-distance POST Distance from depth map
/action-classify POST X-CLIP temporal action

Enrichment Client

class EnrichmentClient:
    """Client for interacting with combined enrichment service.

    The enrichment service provides:
    - Vehicle type classification (ResNet-50)
    - Pet classification (ResNet-18 cat/dog)
    - Clothing classification (FashionCLIP)
    - Pose analysis (ViTPose+ Small)
    - Action classification (X-CLIP temporal video understanding)
    """

Configuration

Setting Default Description
ENRICHMENT_URL http://ai-enrichment:8094 Service URL
AI_CONNECT_TIMEOUT 10.0s Connection timeout
ENRICHMENT_READ_TIMEOUT 60.0s Read timeout
ENRICHMENT_MAX_RETRIES 3 Maximum retry attempts
ENRICHMENT_CB_FAILURE_THRESHOLD 5 Circuit breaker threshold
ENRICHMENT_CB_RECOVERY_TIMEOUT 60.0s Circuit breaker recovery

Person Enrichment Pipeline

For person detections, the following models are applied:

%%{init: {
  'theme': 'dark',
  'themeVariables': {
    'primaryColor': '#3B82F6',
    'primaryTextColor': '#FFFFFF',
    'primaryBorderColor': '#60A5FA',
    'secondaryColor': '#A855F7',
    'tertiaryColor': '#009688',
    'background': '#121212',
    'mainBkg': '#1a1a2e',
    'lineColor': '#666666'
  }
}}%%
flowchart TB
    PD[Person Detection]

    PD --> TD["Threat Detector<br/><i>CRITICAL priority - always first</i><br/>Weapon detection (guns, knives)"]

    TD --> PE["Pose Estimator (YOLOv8n)<br/><i>HIGH priority</i><br/>17 COCO keypoints<br/>Posture: standing, crouching, etc."]

    PE --> DE["Demographics Estimator (ViT)<br/><i>HIGH priority</i><br/>Age range, gender<br/>From face crops"]

    DE --> CC["Clothing Classifier (FashionCLIP)<br/><i>MEDIUM priority</i><br/>Suspicious attire, uniforms"]

    CC --> PR["Person ReID (OSNet)<br/><i>MEDIUM priority</i><br/>512-dim embeddings<br/>Cross-camera tracking"]

    PR -->|"if suspicious + multiple frames"| AR["Action Recognition (X-CLIP)<br/><i>LOW priority</i><br/>Loitering, running, etc."]

Result Types

The enrichment pipeline uses result types defined in backend/services/enrichment_client.py:

# backend/services/enrichment_client.py
@dataclass(slots=True)
class ClothingClassificationResult:
    clothing_type: str          # hoodie, vest, uniform, etc.
    color: str                  # Primary color
    style: str                  # Overall style classification
    confidence: float           # 0-1
    top_category: str           # Top matched category from prompts
    description: str            # Human-readable description
    is_suspicious: bool         # Dark hoodie, face mask, etc.
    is_service_uniform: bool    # Service/delivery uniform detected
    inference_time_ms: float

# Pose results from backend/services/vitpose_loader.py
@dataclass(slots=True)
class PoseResult:
    keypoints: list[dict]       # [{name, x, y, confidence}, ...]
    pose_class: str             # standing, crouching, bending_over, etc.
    confidence: float           # 0-1
    is_suspicious: bool         # True if crouching, lying_down, etc.

# Violence detection from backend/services/violence_loader.py
@dataclass(slots=True)
class ViolenceDetectionResult:
    is_violent: bool
    confidence: float
    category: str | None

Vehicle Enrichment Pipeline

For vehicle detections (car, truck, bus, motorcycle, bicycle):

%%{init: {
  'theme': 'dark',
  'themeVariables': {
    'primaryColor': '#3B82F6',
    'primaryTextColor': '#FFFFFF',
    'primaryBorderColor': '#60A5FA',
    'secondaryColor': '#A855F7',
    'tertiaryColor': '#009688',
    'background': '#121212',
    'mainBkg': '#1a1a2e',
    'lineColor': '#666666'
  }
}}%%
flowchart TB
    VD[Vehicle Detection]

    VD --> VC["Vehicle Classifier (ResNet-50)<br/><i>MEDIUM priority</i><br/>MIO-TCD dataset<br/>11 types: sedan, pickup, SUV, etc."]

    VC --> LP["License Plate Detection<br/><i>From Model Zoo (backend)</i><br/>YOLO11 license plate detection<br/>+ PaddleOCR text extraction"]

    LP --> DE["Depth Estimation (DAv2)<br/><i>LOW priority</i><br/>Depth Anything V2<br/>Distance estimation"]

Result Types

Vehicle classification result from backend/services/enrichment_client.py:

@dataclass(slots=True)
class VehicleClassificationResult:
    vehicle_type: str       # "pickup_truck", "sedan", etc.
    display_name: str       # Human-readable name
    confidence: float       # 0-1
    is_commercial: bool     # Delivery van, truck, etc.
    all_scores: dict[str, float]  # Top class scores
    inference_time_ms: float

    def to_context_string(self) -> str:
        """Generate context string for LLM prompt."""

Pet/Animal Enrichment Pipeline

For animal detections (cat, dog):

%%{init: {
  'theme': 'dark',
  'themeVariables': {
    'primaryColor': '#3B82F6',
    'primaryTextColor': '#FFFFFF',
    'primaryBorderColor': '#60A5FA',
    'secondaryColor': '#A855F7',
    'tertiaryColor': '#009688',
    'background': '#121212',
    'mainBkg': '#1a1a2e',
    'lineColor': '#666666'
  }
}}%%
flowchart TB
    AD[Animal Detection]

    AD --> PC["Pet Classifier (ResNet-18)<br/><i>MEDIUM priority</i><br/>Cat vs Dog"]

Result Types

Pet classification result from backend/services/enrichment_client.py:

@dataclass(slots=True)
class PetClassificationResult:
    pet_type: str           # "cat" or "dog"
    breed: str              # Breed if identifiable
    confidence: float       # 0-1
    is_household_pet: bool  # Always True for this classifier
    inference_time_ms: float

    def to_context_string(self) -> str:
        """Generate context string for LLM prompt."""

Parallel Processing

The enrichment pipeline processes multiple detections in parallel:

async def enrich_batch_with_tracking(
    self,
    detections: list[DetectionInput],
    images: dict[int | None, Image | Path | str],
    camera_id: str | None = None,
) -> EnrichmentTrackingResult:
    """Enrich a batch of detections with tracking for partial failures."""

    # Group detections by type for efficient model loading
    person_detections = [d for d in detections if d.class_name == "person"]
    vehicle_detections = [d for d in detections if d.class_name in VEHICLE_CLASSES]
    animal_detections = [d for d in detections if d.class_name in ANIMAL_CLASSES]

    # Process each group with appropriate models
    # Models are loaded on-demand and evicted LRU when VRAM is constrained

Tracking Partial Failures

The pipeline tracks which models succeeded/failed via EnrichmentTrackingResult in backend/services/enrichment_pipeline.py:

class EnrichmentStatus(str, Enum):
    """Status of enrichment pipeline execution."""
    FULL = "full"       # All enabled models succeeded
    PARTIAL = "partial" # Some models succeeded, some failed
    FAILED = "failed"   # All models failed
    SKIPPED = "skipped" # Enrichment not attempted

@dataclass(slots=True)
class EnrichmentTrackingResult:
    status: EnrichmentStatus = EnrichmentStatus.SKIPPED
    successful_models: list[str] = field(default_factory=list)
    failed_models: list[str] = field(default_factory=list)
    errors: dict[str, str] = field(default_factory=dict)
    data: EnrichmentResult | None = None

    @property
    def has_data(self) -> bool:
        """Check if any enrichment data is available."""
        return self.data is not None

    @property
    def is_partial(self) -> bool:
        """True if some models succeeded and some failed."""
        return self.status == EnrichmentStatus.PARTIAL

    @property
    def success_rate(self) -> float:
        """Percentage of models that succeeded (1.0 if no models attempted)."""
        total = len(self.successful_models) + len(self.failed_models)
        return len(self.successful_models) / total if total > 0 else 1.0

Context String Generation

Each result type can generate context strings for the LLM prompt:

# Pose context
def to_context_string(self) -> str:
    lines = [f"Pose: {self.pose_class} (confidence: {self.confidence:.0%})"]
    if self.is_suspicious:
        lines.append("  [ALERT: Suspicious posture detected]")
    return "\n".join(lines)

# Clothing context
def to_context_string(self) -> str:
    lines = [f"Clothing: {self.description}"]
    if self.is_suspicious:
        lines.append("  [ALERT: Potentially suspicious attire detected]")
    elif self.is_service_uniform:
        lines.append("  [Service/delivery worker uniform detected]")
    return "\n".join(lines)

# Threat context
def to_context_string(self) -> str:
    if not self.has_threat:
        return "Threat detection: No threats detected"
    threat_types = [t.get("type", "unknown") for t in self.threats]
    return f"THREAT DETECTED: {', '.join(threat_types)} (severity: {self.max_severity})"

Retry Logic

Exponential backoff for transient failures:

def _calculate_backoff_delay(self, attempt: int) -> float:
    """Calculate exponential backoff delay with jitter."""
    base_delay = float(2**attempt)  # 1s, 2s, 4s, 8s, ...
    jitter = random.uniform(-0.1, 0.1)
    delay = base_delay * (1 + jitter)
    return min(delay, 30.0)  # Cap at 30 seconds

def _is_retryable_error(self, error: Exception) -> bool:
    """Check if error should trigger retry."""
    if isinstance(error, httpx.ConnectError):
        return True
    if isinstance(error, httpx.TimeoutException):
        return True
    if isinstance(error, httpx.HTTPStatusError):
        return error.response.status_code >= 500
    return False

Circuit Breaker

self._circuit_breaker = CircuitBreaker(
    name="enrichment",
    failure_threshold=settings.enrichment_cb_failure_threshold,
    recovery_timeout=settings.enrichment_cb_recovery_timeout,
    half_open_max_calls=settings.enrichment_cb_half_open_max_calls,
)

# Check before request
if not self._circuit_breaker.allow_request():
    raise EnrichmentUnavailableError("Circuit open - requests blocked")

# Record outcome
self._circuit_breaker.record_success()  # On success
self._circuit_breaker.record_failure()  # On failure

Model Triggering Logic

Models are loaded based on detection type:

def get_models_for_detection_type(
    detection_type: str,
    is_suspicious: bool = False,
    has_multiple_frames: bool = False,
) -> list[str]:
    """Get model names for a detection type."""

    detection_model_mapping = {
        "person": [
            "threat_detector",   # CRITICAL: always first
            "fashion_clip",
            "pose_estimator",
            "person_reid",
            "depth_estimator",
        ],
        "car": ["vehicle_classifier", "depth_estimator"],
        "truck": ["vehicle_classifier", "depth_estimator"],
        "dog": ["pet_classifier", "depth_estimator"],
        "cat": ["pet_classifier", "depth_estimator"],
    }

    models = detection_model_mapping.get(detection_type.lower(), [])

    # Add action recognition for suspicious persons with video frames
    if detection_type == "person" and is_suspicious and has_multiple_frames:
        models.append("action_recognizer")

    return models

Unified Enrichment Result

The EnrichmentResult class in backend/services/enrichment_pipeline.py aggregates all enrichment outputs:

@dataclass(slots=True)
class EnrichmentResult:
    """Result from the enrichment pipeline.

    Contains all additional context extracted from detections
    for use in the Nemotron LLM prompt.
    """
    license_plates: list[LicensePlateResult] = field(default_factory=list)
    faces: list[FaceResult] = field(default_factory=list)
    vision_extraction: BatchExtractionResult | None = None
    person_reid_matches: dict[str, list[EntityMatch]] = field(default_factory=dict)
    vehicle_reid_matches: dict[str, list[EntityMatch]] = field(default_factory=dict)
    person_household_matches: list[HouseholdMatch] = field(default_factory=list)
    vehicle_household_matches: list[HouseholdMatch] = field(default_factory=list)
    scene_change: SceneChangeResult | None = None
    violence_detection: ViolenceDetectionResult | None = None
    weather_classification: WeatherResult | None = None
    clothing_classifications: dict[str, ClothingClassification] = field(default_factory=dict)
    vehicle_classifications: dict[str, VehicleClassificationResult] = field(default_factory=dict)
    vehicle_damage: dict[str, VehicleDamageResult] = field(default_factory=dict)
    pet_classifications: dict[str, PetClassificationResult] = field(default_factory=dict)
    pose_results: dict[str, PoseResult] = field(default_factory=dict)
    action_results: dict[str, Any] | None = None
    depth_analysis: DepthAnalysisResult | None = None
    image_quality: ImageQualityResult | None = None
    errors: list[str] = field(default_factory=list)
    structured_errors: list[EnrichmentError] = field(default_factory=list)
    processing_time_ms: float = 0.0

    # Helper properties
    @property
    def has_license_plates(self) -> bool: ...
    @property
    def has_faces(self) -> bool: ...
    @property
    def has_reid_matches(self) -> bool: ...
    @property
    def has_household_matches(self) -> bool: ...

Metrics

# Request duration by model type
observe_ai_request_duration("enrichment_vehicle", duration)
observe_ai_request_duration("enrichment_pet", duration)
observe_ai_request_duration("enrichment_clothing", duration)
observe_ai_request_duration("enrichment_pose", duration)

# Retry tracking
increment_enrichment_retry(endpoint_name)

# Pipeline errors
record_pipeline_error("enrichment_circuit_open")
record_pipeline_error("enrichment_vehicle_timeout")
record_pipeline_error("enrichment_unexpected_error")

Usage Example

from backend.services.enrichment_client import EnrichmentClient
from PIL import Image

# Initialize client
client = EnrichmentClient()

# Check health
health = await client.check_health()
if health.get("status") == "healthy":
    # Classify vehicle
    image = Image.open("vehicle.jpg")
    result = await client.classify_vehicle(image)
    print(f"Vehicle type: {result.display_name}")
    print(f"Commercial: {result.is_commercial}")

    # Analyze pose
    person_image = Image.open("person.jpg")
    pose = await client.analyze_pose(person_image)
    print(f"Posture: {pose.posture}")
    print(f"Alerts: {pose.alerts}")

# Clean up
await client.close()