Distributed Tracing¶
OpenTelemetry-based distributed tracing with automatic instrumentation, span context propagation, and Jaeger visualization.
Key Files:
backend/core/telemetry.py:1-1007- OpenTelemetry configurationmonitoring/grafana/provisioning/datasources/prometheus.yml:41-101- Jaeger datasourcemonitoring/grafana/dashboards/tracing.json:1-332- Tracing dashboard
Overview¶
The system uses OpenTelemetry for distributed tracing, enabling end-to-end visibility into requests as they flow through the AI pipeline. Traces capture the full lifecycle from image detection through LLM analysis to event creation, with automatic instrumentation for HTTP requests, database queries, and Redis operations.
Traces are exported to Jaeger via OTLP protocol. Jaeger provides storage, search, and visualization of traces. Grafana integrates with Jaeger for trace exploration and correlates traces with metrics and logs through derived fields and trace-to-metrics queries.
The tracing implementation supports both synchronous and asynchronous code paths, with context propagation ensuring spans maintain parent-child relationships across async boundaries.
Architecture¶
graph TD
subgraph "Application Layer"
REQ[HTTP Request] --> MW[Middleware<br/>telemetry.py:567-680]
MW --> SVC[Service Layer]
SVC --> AI[AI Clients<br/>YOLO26, Nemotron]
SVC --> DB[(PostgreSQL)]
SVC --> REDIS[(Redis)]
end
subgraph "Instrumentation"
MW --> SPAN1[Request Span]
SVC --> SPAN2[Service Spans]
AI --> SPAN3[AI Request Spans]
DB --> SPAN4[DB Query Spans]
REDIS --> SPAN5[Redis Command Spans]
end
subgraph "Export"
SPAN1 --> PROC[BatchSpanProcessor<br/>telemetry.py:215-230]
SPAN2 --> PROC
SPAN3 --> PROC
SPAN4 --> PROC
SPAN5 --> PROC
PROC --> EXP[OTLPSpanExporter<br/>telemetry.py:200-210]
EXP --> |OTLP| JAEG[Jaeger Collector]
end
subgraph "Visualization"
JAEG --> UI[Jaeger UI]
JAEG --> GRAF[Grafana<br/>Trace Panel]
end Setup and Configuration¶
Initialization¶
OpenTelemetry is initialized during application startup (backend/core/telemetry.py:135-268):
# From backend/core/telemetry.py:135-180
def setup_telemetry(
service_name: str = "nemotron-backend",
*,
enable_auto_instrumentation: bool = True,
otel_endpoint: str | None = None,
sample_rate: float = 1.0,
) -> None:
"""Initialize OpenTelemetry with tracing and optional metrics."""
if not settings.otel_enabled:
logger.info("OpenTelemetry disabled by configuration")
return
# Create resource with service info
resource = Resource.create({
ResourceAttributes.SERVICE_NAME: service_name,
ResourceAttributes.SERVICE_VERSION: get_app_version(),
ResourceAttributes.DEPLOYMENT_ENVIRONMENT: settings.environment,
})
# Configure trace provider
provider = TracerProvider(resource=resource)
trace.set_tracer_provider(provider)
# Configure exporter
endpoint = otel_endpoint or settings.otel_exporter_otlp_endpoint
exporter = OTLPSpanExporter(endpoint=endpoint)
# Configure batch processor
processor = BatchSpanProcessor(
exporter,
max_queue_size=2048,
max_export_batch_size=512,
schedule_delay_millis=5000,
)
provider.add_span_processor(processor)
Configuration Options¶
| Setting | Type | Default | Description |
|---|---|---|---|
OTEL_ENABLED | bool | False | Master toggle for tracing |
OTEL_SERVICE_NAME | str | "nemotron-backend" | Service name in traces |
OTEL_EXPORTER_OTLP_ENDPOINT | str | "http://localhost:4317" | Jaeger collector endpoint |
OTEL_TRACE_SAMPLE_RATE | float | 1.0 | Sampling rate (0.0-1.0) |
OTEL_AUTO_INSTRUMENTATION | bool | True | Enable auto-instrumentation |
OTEL_PROPAGATORS | str | "tracecontext,baggage" | Context propagators |
Storage Backend¶
Elasticsearch Configuration¶
Jaeger uses Elasticsearch for persistent trace storage with automatic retention management.
| Setting | Value | Purpose |
|---|---|---|
SPAN_STORAGE_TYPE | elasticsearch | Storage backend type |
ES_SERVER_URLS | http://elasticsearch:9200 | ES cluster endpoint |
ES_INDEX_PREFIX | jaeger | Index name prefix |
ES_TAGS_AS_FIELDS_ALL | true | Index all span tags for search |
Index Lifecycle Management¶
Traces are automatically managed with the following lifecycle:
| Phase | Age | Actions |
|---|---|---|
| Hot | 0-1 day | Active writes, high priority |
| Warm | 2-30 days | Shrink to 1 shard, force merge |
| Delete | 30+ days | Automatic deletion |
Resource Requirements¶
| Component | CPU | Memory | Disk |
|---|---|---|---|
| Elasticsearch | 2 cores | 4GB (2GB heap) | 50GB+ SSD |
| Jaeger | 1 core | 512MB | - |
Initialization¶
On first deployment, run the ILM initialization script:
This creates the ILM policy and index template for automatic retention.
Auto-Instrumentation¶
The system auto-instruments common libraries (backend/core/telemetry.py:270-340):
| Library | Instrumentation | What's Traced |
|---|---|---|
httpx | HTTPXClientInstrumentor | Outbound HTTP requests |
aiohttp | AioHttpClientInstrumentor | Async HTTP clients |
sqlalchemy | SQLAlchemyInstrumentor | Database queries |
redis | RedisInstrumentor | Redis commands |
fastapi | FastAPIInstrumentor | Inbound HTTP requests |
logging | Custom integration | Log-trace correlation |
Span Operations¶
Creating Manual Spans¶
For operations not auto-instrumented, create manual spans (backend/core/telemetry.py:380-450):
# From backend/core/telemetry.py:380-420
@contextmanager
def create_span(
name: str,
*,
kind: SpanKind = SpanKind.INTERNAL,
attributes: dict[str, Any] | None = None,
record_exception: bool = True,
) -> Generator[Span, None, None]:
"""Create a traced span for custom operations."""
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span(
name,
kind=kind,
attributes=attributes or {},
) as span:
try:
yield span
except Exception as e:
if record_exception:
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
raise
Usage:
from backend.core.telemetry import create_span
from opentelemetry.trace import SpanKind
async def analyze_image(image_path: str) -> dict:
with create_span(
"analyze_image",
kind=SpanKind.CLIENT,
attributes={"image.path": image_path},
) as span:
result = await run_analysis(image_path)
span.set_attribute("detection.count", len(result.detections))
return result
Async Span Context¶
For async operations, use create_async_span (backend/core/telemetry.py:453-500):
# From backend/core/telemetry.py:453-490
@asynccontextmanager
async def create_async_span(
name: str,
*,
kind: SpanKind = SpanKind.INTERNAL,
attributes: dict[str, Any] | None = None,
) -> AsyncGenerator[Span, None]:
"""Create a traced span for async operations."""
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span(name, kind=kind, attributes=attributes or {}) as span:
try:
yield span
except Exception as e:
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
raise
Span Names and Attributes¶
Pipeline Spans¶
| Span Name | Kind | Key Attributes |
|---|---|---|
detection_processing | INTERNAL | camera_id, image_path, detection_count |
batch_aggregation | INTERNAL | batch_id, batch_size, timeout_triggered |
analysis_processing | INTERNAL | batch_id, model, input_tokens, output_tokens |
llm_inference | CLIENT | model, prompt_length, completion_length, duration_ms |
event_creation | INTERNAL | event_id, risk_score, camera_id |
HTTP Request Spans¶
Auto-instrumented by FastAPI instrumentor:
| Attribute | Description |
|---|---|
http.method | Request method (GET, POST, etc.) |
http.url | Full request URL |
http.status_code | Response status code |
http.route | FastAPI route template |
http.host | Request host header |
http.user_agent | Client user agent |
Database Spans¶
Auto-instrumented by SQLAlchemy instrumentor:
| Attribute | Description |
|---|---|
db.system | postgresql |
db.name | Database name |
db.statement | SQL query (truncated) |
db.operation | Query type (SELECT, INSERT, etc.) |
Redis Spans¶
Auto-instrumented by Redis instrumentor:
| Attribute | Description |
|---|---|
db.system | redis |
db.operation | Redis command (GET, SET, etc.) |
db.redis.database_index | Redis database index |
net.peer.name | Redis host |
net.peer.port | Redis port |
Context Propagation¶
W3C Trace Context¶
The system uses W3C Trace Context headers for propagation (backend/core/telemetry.py:550-565):
traceparent: 00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01
tracestate: vendor1=value1,vendor2=value2
Format: version-trace_id-span_id-flags
W3C Baggage for Cross-Service Context (NEM-3796)¶
The system uses W3C Baggage to propagate application-specific context across service boundaries (backend/api/middleware/baggage.py):
Baggage is automatically propagated alongside trace context via the composite propagator.
Baggage Keys¶
| Key | Description | Valid Values |
|---|---|---|
camera.id | Source camera for detection pipeline | Camera identifier string |
event.priority | Priority level for downstream processing | low, normal, high, critical |
request.source | Origin of request | ui, api, scheduled, internal |
batch.id | Batch identifier for batch processing | Batch identifier string |
Setting Baggage¶
from backend.api.middleware.baggage import set_pipeline_baggage
# At the start of the detection pipeline
set_pipeline_baggage(
camera_id="front_door",
event_priority="high",
request_source="api"
)
Reading Baggage¶
from backend.api.middleware.baggage import (
get_camera_id_from_baggage,
get_event_priority_from_baggage,
get_request_source_from_baggage,
)
# In downstream services
camera_id = get_camera_id_from_baggage()
if get_event_priority_from_baggage() == "high":
# Fast-track processing
pass
Automatic Baggage Extraction¶
The BaggageMiddleware automatically:
- Extracts
request.sourcefromX-Request-Sourceheader (defaults to "api") - Extracts
camera.idfrom URL path patterns like/cameras/{camera_id}/... - Preserves incoming baggage from upstream services
Propagation Across Services¶
Context is automatically propagated to AI services via HTTP headers:
# Context is automatically injected by httpx instrumentation
async def call_yolo26(image_data: bytes) -> dict:
async with httpx.AsyncClient() as client:
# traceparent header is automatically added
response = await client.post(
f"{yolo26_url}/detect",
files={"image": image_data},
)
return response.json()
Manual Propagation¶
For non-HTTP transports (e.g., Redis queues), manually inject/extract context (backend/core/telemetry.py:505-545):
# From backend/core/telemetry.py:505-530
def inject_context_to_dict(carrier: dict[str, str]) -> None:
"""Inject current trace context into a dictionary."""
propagator = get_global_textmap()
propagator.inject(carrier)
def extract_context_from_dict(carrier: dict[str, str]) -> Context:
"""Extract trace context from a dictionary."""
propagator = get_global_textmap()
return propagator.extract(carrier)
Usage for queue messages:
# Producer
message = {"data": payload}
inject_context_to_dict(message) # Adds traceparent, tracestate
await queue.push(message)
# Consumer
message = await queue.pop()
ctx = extract_context_from_dict(message)
with trace.use_span(trace.get_current_span(), end_on_exit=False):
context.attach(ctx)
# Process message with trace context restored
Trace-to-Metrics Correlation¶
Grafana's Jaeger datasource is configured with trace-to-metrics queries (monitoring/grafana/provisioning/datasources/prometheus.yml:52-101):
tracesToMetrics:
datasourceUid: PBFA97CFB590B2093
spanStartTimeShift: '-5m'
spanEndTimeShift: '5m'
tags:
- key: 'service.name'
value: 'service'
- key: 'http.method'
value: 'method'
- key: 'http.status_code'
value: 'status_code'
queries:
- name: 'Pipeline Errors/min'
query: 'rate(hsi_pipeline_errors_total[1m]) * 60'
- name: 'Detection Queue Depth'
query: 'hsi_detection_queue_depth'
- name: 'YOLO26 Latency (p95)'
query: 'histogram_quantile(0.95, rate(yolo26_inference_latency_seconds_bucket[5m]))'
This enables clicking from a trace span to related Prometheus metrics.
Trace-to-Logs Correlation¶
Logs are correlated via trace ID. The Loki datasource extracts trace IDs from logs (monitoring/grafana/provisioning/datasources/prometheus.yml:198-214):
derivedFields:
- name: TraceID
matcherRegex: 'trace_id=([a-f0-9]{32})'
url: '${__value.raw}'
datasourceUid: PC9A941E8F2E49454
urlDisplayLabel: 'View Trace'
Log format includes trace context:
2024-01-15 10:30:45 | INFO | backend.services | trace_id=0af7651916cd43dd8448eb211c80319c span_id=b7ad6b7169203331 | Processing detection
Grafana Tracing Dashboard¶
The tracing dashboard (monitoring/grafana/dashboards/tracing.json) provides:
Pipeline Analysis Traces Panel¶
Shows full pipeline traces (monitoring/grafana/dashboards/tracing.json:65-77):
{
"targets": [
{
"queryType": "search",
"service": "nemotron-backend",
"operation": "analysis_processing",
"limit": 15
}
]
}
Detection Processing Panel¶
Shows YOLO26 detection traces with latency thresholds (monitoring/grafana/dashboards/tracing.json:131-143):
- Green: < 5s (5,000,000 microseconds)
- Yellow: 5-30s
- Red: > 30s
LLM Inference Panel¶
Shows Nemotron LLM traces with latency thresholds (monitoring/grafana/dashboards/tracing.json:194-198):
- Green: < 30s
- Yellow: 30-120s
- Red: > 120s
Error Traces Panel¶
Shows traces with errors (monitoring/grafana/dashboards/tracing.json:241-250):
{
"targets": [
{
"queryType": "search",
"service": "nemotron-backend",
"tags": "error=true",
"limit": 20
}
]
}
Sampling Configuration¶
For high-traffic deployments, configure sampling (backend/core/telemetry.py:185-200):
# From backend/core/telemetry.py:185-200
if sample_rate < 1.0:
sampler = TraceIdRatioBased(sample_rate)
else:
sampler = AlwaysOnSampler()
provider = TracerProvider(
resource=resource,
sampler=sampler,
)
Sampling strategies:
| Rate | Use Case |
|---|---|
1.0 | Development, low traffic |
0.1 | Production, 10% sampling |
0.01 | High-traffic, cost reduction |
Troubleshooting¶
No Traces Appearing¶
- Check
OTEL_ENABLED=Truein configuration - Verify Jaeger collector is reachable at
OTEL_EXPORTER_OTLP_ENDPOINT - Check for export errors in logs:
grep "OTLP" /var/log/hsi/backend.log
Missing Span Relationships¶
- Verify context propagation headers are forwarded
- Check async context managers are used correctly
- Ensure
context.attach()is called for manual propagation
High Trace Cardinality¶
- Avoid dynamic span names
- Use bounded attribute values
- Enable sampling for high-traffic services
Testing¶
Run tracing tests:
| Test | Purpose |
|---|---|
test_setup_telemetry | Initialization |
test_create_span | Manual span creation |
test_context_propagation | Header injection/extraction |
test_span_attributes | Attribute recording |
test_error_recording | Exception capture |
Related Documents¶
- Structured Logging - Log-trace correlation
- Prometheus Metrics - Metrics-trace correlation
- Grafana Dashboards - Trace visualization