Integration Testing¶
Integration tests verify multi-component workflows with real database connections. They ensure that services, repositories, and APIs work together correctly.
Overview¶
Integration tests form the middle layer of the test pyramid (~15% of tests). They:
- Test multi-component interactions
- Use real PostgreSQL database
- Use mocked Redis (worker-isolated)
- Support parallel execution with worker-isolated databases
Location: backend/tests/integration/ (109+ test files)
Test Organization¶
Directory Structure¶
backend/tests/integration/
conftest.py # Integration-specific fixtures
repositories/ # Repository pattern tests (4 files)
test_admin_*.py # Admin API tests
test_alerts_*.py # Alert system tests
test_analytics_*.py # Analytics API tests
test_cameras_*.py # Camera management tests
test_events_*.py # Event processing tests
test_websocket_*.py # WebSocket tests
test_pipeline_*.py # AI pipeline tests
...
Test Categories¶
| Category | Files | Description |
|---|---|---|
| API endpoints | 24 | Admin, alerts, analytics, cameras, events, etc. |
| WebSocket | 4 | Connection handling, broadcasting, cleanup |
| Services | 14 | Batch aggregation, detector client, orchestrator |
| Models/Database | 10 | Alert models, cascades, partitions |
| Error Handling | 6 | API errors, transaction rollback |
| Pipeline | 5 | Circuit breaker, enrichment, E2E |
Parallel Execution¶
Integration tests support parallel execution via pytest-xdist with worker-isolated databases.
How It Works¶
From backend/tests/integration/conftest.py:248-268:
def get_worker_id(request: pytest.FixtureRequest) -> str:
"""Get the pytest-xdist worker ID ('gw0', 'gw1', etc.) or 'master'."""
return xdist.get_xdist_worker_id(request)
def get_worker_db_name(worker_id: str) -> str:
"""Generate a unique database name for the xdist worker.
Args:
worker_id: The xdist worker ID ('gw0', 'gw1', 'master')
Returns:
Database name like 'security_test_gw0' or 'security_test' for master
"""
if worker_id == "master":
return "security_test"
return f"security_test_{worker_id}"
Each worker creates its own database:
gw0->security_test_gw0gw1->security_test_gw1gw2->security_test_gw2- etc.
Running Parallel Tests¶
# Parallel with 8 workers (recommended)
uv run pytest backend/tests/integration/ -n8 --dist=worksteal
# Serial execution (legacy, slower)
uv run pytest backend/tests/integration/ -n0
# Performance comparison
# Serial: ~169s for 1575 tests
# 8 workers: ~33s (5.1x speedup)
Fixtures¶
Integration-Specific Fixtures (backend/tests/integration/conftest.py)¶
PostgreSQL Container¶
From backend/tests/integration/conftest.py:313-349:
@pytest.fixture(scope="session")
def postgres_container() -> Generator[PostgresContainer | LocalPostgresService]:
"""Provide a session-scoped PostgreSQL service.
Uses local PostgreSQL if available (development with Podman),
otherwise starts a testcontainer for full isolation.
"""
# Check for explicit environment variable override
if os.environ.get("TEST_DATABASE_URL"):
yield LocalPostgresService()
return
# Check for local PostgreSQL (development environment)
if _check_local_postgres():
yield LocalPostgresService()
return
# Fall back to testcontainer
from testcontainers.postgres import PostgresContainer
container = PostgresContainer(
"postgres:16-alpine",
username="postgres",
password="postgres", # pragma: allowlist secret
dbname="security_test",
driver="asyncpg",
)
container.start()
yield container
container.stop()
Worker Database URL¶
From backend/tests/integration/conftest.py:562-593:
@pytest.fixture(scope="session")
def worker_db_url(
request: pytest.FixtureRequest,
postgres_container: PostgresContainer | LocalPostgresService,
) -> Generator[str]:
"""Create and provide a worker-specific database URL.
Each pytest-xdist worker gets its own database:
- gw0 -> security_test_gw0
- gw1 -> security_test_gw1
- master (serial) -> security_test
"""
worker_id = get_worker_id(request)
db_name = get_worker_db_name(worker_id)
base_url = _get_postgres_url(postgres_container)
# Create the worker database
worker_url = _create_worker_database(base_url, db_name)
try:
yield worker_url
finally:
# Clean up the worker database at session end
_drop_worker_database(base_url, db_name)
Integration Environment¶
From backend/tests/integration/conftest.py:627-696:
@pytest.fixture
def integration_env(
worker_db_url: str,
worker_redis_url: str,
) -> Generator[str]:
"""Set DATABASE_URL/REDIS_URL for integration tests.
Configures environment variables pointing to worker-isolated
PostgreSQL and Redis databases.
"""
original_db_url = os.environ.get("DATABASE_URL")
original_redis_url = os.environ.get("REDIS_URL")
os.environ["DATABASE_URL"] = worker_db_url
os.environ["REDIS_URL"] = worker_redis_url
# Configure pool sizes for integration tests
if os.environ.get("CI"):
os.environ["DATABASE_POOL_SIZE"] = "10"
os.environ["DATABASE_POOL_OVERFLOW"] = "5"
else:
os.environ["DATABASE_POOL_SIZE"] = "5"
os.environ["DATABASE_POOL_OVERFLOW"] = "2"
get_settings.cache_clear()
yield worker_db_url
# Restore original environment...
Database Session¶
From backend/tests/integration/conftest.py:907-924:
@pytest.fixture
async def db_session(integration_db: str):
"""Yield a live AsyncSession bound to the integration test database.
Note: The session uses autocommit=False, so you must call
`await session.commit()` to persist changes.
"""
from backend.core.database import get_session
async with get_session() as session:
yield session
HTTP Test Client¶
From backend/tests/integration/conftest.py:1117-1225:
@pytest.fixture
async def client(integration_db: str, mock_redis: AsyncMock):
"""Async HTTP client bound to the FastAPI app.
Notes:
- DB is pre-initialized by `integration_db`
- All background services are mocked
- Test data is cleaned up before/after each test
"""
await _cleanup_test_data()
from httpx import ASGITransport, AsyncClient
from backend.main import app
# Mock all background services
with (
patch("backend.main.init_db", AsyncMock(return_value=None)),
patch("backend.main.close_db", AsyncMock(return_value=None)),
patch("backend.main.init_redis", AsyncMock(return_value=mock_redis)),
# ... more patches
):
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test"
) as ac:
yield ac
await _cleanup_test_data()
Test Patterns¶
API Endpoint Testing¶
@pytest.mark.asyncio
async def test_get_cameras_returns_list(client, db_session):
"""Test GET /api/cameras returns camera list."""
# Arrange: Create test data
from backend.models.camera import Camera
camera = Camera(
id="test_camera",
name="Test Camera",
folder_path="/export/foscam/test",
status="online",
)
db_session.add(camera)
await db_session.commit()
# Act: Make API request
response = await client.get("/api/cameras")
# Assert: Verify response
assert response.status_code == 200
data = response.json()
assert len(data["cameras"]) == 1
assert data["cameras"][0]["id"] == "test_camera"
Database Transaction Testing¶
@pytest.mark.asyncio
async def test_event_creation_transaction(db_session):
"""Test event creation uses proper transaction handling."""
from backend.models.event import Event
from backend.tests.factories import CameraFactory
# Create camera first (foreign key)
camera = CameraFactory.build()
db_session.add(camera)
await db_session.commit()
# Create event
event = Event(
batch_id="test_batch",
camera_id=camera.id,
started_at=datetime.now(UTC),
risk_score=50,
risk_level="medium",
summary="Test event",
)
db_session.add(event)
await db_session.commit()
# Verify persistence
await db_session.refresh(event)
assert event.id is not None
Isolated Session Testing¶
From backend/tests/integration/conftest.py:927-966:
@pytest.fixture
async def isolated_db_session(integration_db: str):
"""Yield an isolated AsyncSession with transaction rollback.
Uses PostgreSQL savepoints:
1. Create savepoint before test
2. Yield session to test
3. Rollback to savepoint after test
"""
from sqlalchemy import text
from backend.core.database import get_session_factory
factory = get_session_factory()
session = factory()
try:
await session.execute(text("SAVEPOINT test_savepoint"))
yield session
finally:
await session.execute(text("ROLLBACK TO SAVEPOINT test_savepoint"))
await session.close()
Usage:
@pytest.mark.asyncio
async def test_with_rollback(isolated_db_session):
"""Test data is automatically rolled back after test."""
camera = Camera(id="temp", name="Temporary", ...)
isolated_db_session.add(camera)
await isolated_db_session.commit()
# Camera exists during test
assert camera.id is not None
# After test: Camera is rolled back (not persisted)
WebSocket Testing¶
@pytest.mark.asyncio
async def test_websocket_connection(client):
"""Test WebSocket connection and message handling."""
async with client.websocket_connect("/ws/events") as ws:
# Send subscription message
await ws.send_json({"type": "subscribe", "channel": "events"})
# Receive confirmation
response = await ws.receive_json()
assert response["type"] == "subscribed"
Data Cleanup¶
Table Deletion Order¶
Integration tests use FK-safe deletion order computed from schema reflection:
From backend/tests/integration/conftest.py:60-86:
HARDCODED_TABLE_DELETION_ORDER = [
# First: Delete tables with foreign key references (leaf tables)
"alerts",
"event_audits",
"detections",
"activity_baselines",
"class_baselines",
"events",
"scene_changes",
"camera_notification_settings",
"zones",
# Second: Delete tables without FK references
"alert_rules",
"audit_logs",
"gpu_stats",
# ...
# Last: Delete parent tables
"cameras",
]
Cleanup Fixture¶
From backend/tests/integration/conftest.py:863-904:
@pytest.fixture
async def clean_tables(integration_db: str) -> AsyncGenerator[None]:
"""Delete all data from tables before and after test.
Uses DELETE instead of TRUNCATE to avoid AccessExclusiveLock deadlocks.
"""
async def delete_all() -> None:
engine = get_engine()
deletion_order = get_table_deletion_order(engine)
async with get_session() as session:
for table_name in deletion_order:
try:
await session.execute(text(f"DELETE FROM {table_name}"))
except Exception as e:
logger.debug(f"Skipping table {table_name}: {e}")
await session.commit()
await delete_all()
yield
await delete_all()
Error Handling Tests¶
Transaction Rollback¶
@pytest.mark.asyncio
async def test_failed_transaction_rolls_back(db_session):
"""Test that failed transactions are properly rolled back."""
from backend.models.camera import Camera
camera = Camera(id="test", name="Test", ...)
db_session.add(camera)
# Simulate error
try:
await db_session.commit()
raise ValueError("Simulated error")
except ValueError:
await db_session.rollback()
# Verify rollback
result = await db_session.execute(
select(Camera).where(Camera.id == "test")
)
assert result.scalar_one_or_none() is None
API Error Responses¶
@pytest.mark.asyncio
async def test_not_found_returns_404(client):
"""Test that missing resources return 404."""
response = await client.get("/api/cameras/nonexistent")
assert response.status_code == 404
data = response.json()
assert "detail" in data
assert "not found" in data["detail"].lower()
Redis Testing¶
Mock Redis¶
From backend/tests/integration/conftest.py:985-1008:
@pytest.fixture
async def mock_redis() -> AsyncGenerator[AsyncMock]:
"""Mock Redis operations for tests that don't need real Redis."""
mock_redis_client = AsyncMock()
mock_redis_client.health_check.return_value = {
"status": "healthy",
"connected": True,
}
mock_redis_client._client = None
with (
patch("backend.core.redis._redis_client", mock_redis_client),
patch("backend.core.redis.init_redis", return_value=mock_redis_client),
patch("backend.core.redis.close_redis", return_value=None),
):
yield mock_redis_client
Real Redis¶
From backend/tests/integration/conftest.py:1011-1038:
@pytest.fixture
async def real_redis(worker_redis_url: str) -> AsyncGenerator[RedisClient]:
"""Provide a real Redis client for integration tests.
Each xdist worker uses a different Redis database number for isolation.
"""
from backend.core.redis import RedisClient
client = RedisClient(redis_url=worker_redis_url)
await client.connect()
try:
yield client
finally:
await client.disconnect()
Best Practices¶
1. Use Unique IDs¶
From backend/tests/integration/conftest.py:1232-1246:
def unique_id(prefix: str = "test") -> str:
"""Generate a unique ID for test objects to prevent conflicts."""
import uuid
return f"{prefix}_{uuid.uuid4().hex[:8]}"
Usage:
async def test_camera_creation(db_session):
camera_id = unique_id("camera")
camera = Camera(id=camera_id, name=f"Camera {camera_id}", ...)
# No conflicts with parallel tests
2. Clean Up Test Data¶
Always clean up test data to prevent state leakage:
@pytest.fixture
async def test_camera(db_session):
"""Create a camera and clean up after test."""
camera = Camera(id=unique_id("cam"), ...)
db_session.add(camera)
await db_session.commit()
yield camera
# Cleanup
await db_session.delete(camera)
await db_session.commit()
3. Avoid Shared State¶
Don't rely on test execution order:
# Bad: Depends on another test's data
async def test_get_camera():
response = await client.get("/api/cameras/front_door")
# May fail if other test didn't create this camera
# Good: Create own test data
async def test_get_camera(db_session, client):
camera = Camera(id=unique_id("cam"), ...)
db_session.add(camera)
await db_session.commit()
response = await client.get(f"/api/cameras/{camera.id}")
assert response.status_code == 200
Running Integration Tests¶
# Parallel (recommended)
uv run pytest backend/tests/integration/ -n8 --dist=worksteal
# Serial (for debugging)
uv run pytest backend/tests/integration/ -n0
# Specific test file
uv run pytest backend/tests/integration/test_cameras_api.py -v
# With verbose output
uv run pytest backend/tests/integration/ -n8 -v --tb=long
Related Documentation¶
- Unit Testing - Isolated component testing
- Test Fixtures - Factory patterns
- Coverage Requirements - Coverage gates