FileWatcher Service¶
The FileWatcher monitors camera upload directories for new image and video files, validates them, and queues them for AI processing. It uses the watchdog library for filesystem event monitoring with debounce logic to handle incomplete uploads.
Source File: backend/services/file_watcher.py
Overview¶
The FileWatcher service:
- Monitors camera directories for new media uploads (FTP)
- Validates image/video integrity before processing
- Debounces file events to wait for upload completion
- Deduplicates files using content hashes (SHA256)
- Auto-creates camera records for new upload directories
- Queues valid files to Redis for detection processing
Configuration¶
| Environment Variable | Default | Description |
|---|---|---|
FOSCAM_BASE_PATH | /export/foscam | Root directory for camera uploads |
FILE_WATCHER_POLLING | False | Use polling observer (for Docker) |
FILE_WATCHER_POLLING_INTERVAL | 1.0 | Polling interval in seconds |
FILE_WATCHER_MAX_CONCURRENT_QUEUE | 10 | Max concurrent Redis queue operations |
FILE_WATCHER_QUEUE_DELAY_MS | 50 | Delay between queue operations (ms) |
Supported File Types¶
# backend/services/file_watcher.py:67-70
IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png"}
VIDEO_EXTENSIONS = {".mp4", ".mkv", ".avi", ".mov"}
SUPPORTED_EXTENSIONS = IMAGE_EXTENSIONS | VIDEO_EXTENSIONS
Initialization¶
The FileWatcher is initialized with configurable parameters (backend/services/file_watcher.py:351-454):
def __init__(
self,
camera_root: str | None = None, # Root directory containing camera folders
redis_client: Any | None = None, # Redis client for queueing
debounce_delay: float = 0.5, # Delay after last file modification
queue_name: str = DETECTION_QUEUE, # Redis queue name
dedupe_service: DedupeService | None = None, # File deduplication
auto_create_cameras: bool = True, # Auto-create camera records
camera_creator: Callable | None = None, # Callback to create cameras
use_polling: bool | None = None, # Use polling observer
polling_interval: float | None = None, # Polling interval
stability_time: float = 2.0, # File stability wait time
):
Observer Selection¶
The FileWatcher selects the best observer for the platform (backend/services/file_watcher.py:414-420):
%%{init: {'theme': 'dark'}}%%
flowchart TB
Check{use_polling set?}
Polling["Polling Observer"]
Native["Native Observer"]
Inotify["inotify<br/>(Linux)"]
FSEvents["FSEvents<br/>(macOS)"]
Windows["Windows API"]
Check -->|Yes| Polling
Check -->|No| Native
Native --> Inotify
Native --> FSEvents
Native --> Windows - Native Observer: Uses inotify (Linux), FSEvents (macOS), or Windows API
- Polling Observer: For Docker Desktop/macOS mounts or network filesystems
Startup and Shutdown¶
Startup¶
The FileWatcher is started during application lifespan (backend/main.py:579-584):
file_watcher = FileWatcher(
redis_client=redis_client,
camera_creator=create_camera_callback,
)
await file_watcher.start()
The start() method (backend/services/file_watcher.py:875-972):
- Validates running in async context (requires event loop)
- Captures event loop reference for thread-safe task scheduling
- Initializes rate limiting semaphore
- Creates camera root directory if missing
- Schedules recursive directory watch
- Starts watchdog observer
async def start(self) -> None:
if self.running:
logger.warning("FileWatcher already running")
return
# Capture event loop for thread-safe scheduling
self._loop = asyncio.get_running_loop()
# Initialize rate limiting
self._queue_semaphore = asyncio.Semaphore(self._max_concurrent_queue)
# Watch camera root recursively
self.observer.schedule(
self._event_handler,
str(camera_root_path),
recursive=True,
)
self.observer.start()
self.running = True
Shutdown¶
The shutdown sequence (backend/services/file_watcher.py:974-1016):
- Cancels all pending debounce tasks
- Stops watchdog observer
- Joins observer thread (with timeout)
- Shuts down hash executor
async def stop(self) -> None:
if not self.running:
return
# Cancel pending tasks
for task in self._pending_tasks.values():
task.cancel()
await asyncio.gather(*self._pending_tasks.values(), return_exceptions=True)
self._pending_tasks.clear()
# Stop observer
self.observer.stop()
await loop.run_in_executor(None, lambda: self.observer.join(timeout=5))
# Shutdown hash executor
await loop.run_in_executor(
None, lambda: self._hash_executor.shutdown(wait=True, cancel_futures=False)
)
self._loop = None
self.running = False
File Processing Flow¶
%%{init: {'theme': 'dark'}}%%
flowchart TB
WatchdogEvent["Watchdog Event<br/>(create/modify)"]
IsSupported{"is_supported_<br/>media_file?"}
ScheduleProc["_schedule_file_<br/>processing"]
Debounce["Debounce delay<br/>(0.5s default)"]
ProcessFile["_process_file"]
WaitStable["Wait stable"]
ValidateMedia["Validate media"]
ExtractCam["Extract camera_id"]
AutoCreate["Auto-create<br/>camera (if new)"]
QueueDetect["_queue_for_<br/>detection"]
DedupeCheck["Dedupe check"]
RedisQueue["Redis queue"]
WatchdogEvent --> IsSupported
IsSupported -->|Yes| ScheduleProc
ScheduleProc --> Debounce
Debounce --> ProcessFile
ProcessFile --> WaitStable
ProcessFile --> ValidateMedia
WaitStable --> ExtractCam
ValidateMedia --> ExtractCam
ExtractCam --> AutoCreate
AutoCreate --> QueueDetect
QueueDetect --> DedupeCheck
QueueDetect --> RedisQueue Debounce Logic¶
The FileWatcher implements debouncing to handle file write events (backend/services/file_watcher.py:667-712):
Problem¶
Watchdog emits multiple events during file upload:
createdwhen file appears- Multiple
modifiedas data is written - Final
modifiedwhen upload completes
Solution¶
async def _schedule_file_processing(self, file_path: str) -> None:
# Cancel existing pending task for this file
if file_path in self._pending_tasks:
self._pending_tasks[file_path].cancel()
# Create new debounced task
task = asyncio.create_task(self._debounced_process(file_path))
self._pending_tasks[file_path] = task
# Cleanup callback
def cleanup_task(t: asyncio.Task) -> None:
if self._pending_tasks.get(file_path) is t:
self._pending_tasks.pop(file_path, None)
task.add_done_callback(cleanup_task)
Timing Diagram¶
Time -->
Event: create modify modify modify (silence)
| | | | |
| | | | |
Task: [cancel][cancel][cancel][wait 0.5s + stability]
|
v
Process file
File Stability Check¶
For FTP uploads, the FileWatcher waits for file size to stabilize (backend/services/file_watcher.py:560-621):
async def _wait_for_file_stability(
self, file_path: str, stability_time: float | None = None
) -> bool:
last_size: int = -1
stable_since: float | None = None
check_interval: float = 0.5 # Check every 0.5 seconds
max_checks: int = 20 # Maximum ~10 seconds total wait
for _ in range(max_checks):
current_size = Path(file_path).stat().st_size
if current_size == last_size:
# Size unchanged - check if stable long enough
if stable_since is not None and time.monotonic() - stable_since >= stability_time:
return True
else:
# Size changed - reset stability timer
last_size = current_size
stable_since = time.monotonic()
await asyncio.sleep(check_interval)
return False # Never stabilized
File Validation¶
Image Validation¶
Uses PIL to verify image integrity (backend/services/file_watcher.py:131-204):
def _validate_image_sync(file_path: str) -> bool:
# Verify image header
with Image.open(file_path) as img:
img.verify()
# Fully load image to catch truncation
with Image.open(file_path) as img:
img.load() # Forces full decompression
return True
Minimum file size check:
Video Validation¶
Basic existence and size check (backend/services/file_watcher.py:257-288):
def is_valid_video(file_path: str) -> bool:
file_path_obj = Path(file_path)
if not file_path_obj.exists():
return False
file_size = file_path_obj.stat().st_size
if file_size < 1024: # Minimum 1KB
return False
return True
Deduplication¶
File deduplication prevents duplicate processing (backend/services/file_watcher.py:809-818):
if self._dedupe_service:
is_duplicate, file_hash = await self._dedupe_service.is_duplicate_and_mark(file_path)
if is_duplicate:
logger.info(
f"Skipping duplicate file: {file_path} (hash={file_hash[:16]}...)"
)
return
The DedupeService uses SHA256 content hashes stored in Redis with TTL.
Camera ID Contract¶
Camera IDs are derived from upload directory names (backend/services/file_watcher.py:524-558):
def _get_camera_id_from_path(self, file_path: str) -> tuple[str | None, str | None]:
path = Path(file_path)
camera_root_path = Path(self.camera_root)
# Get relative path from camera root
relative_path = path.relative_to(camera_root_path)
# First component is the folder name
folder_name = relative_path.parts[0]
# Normalize to camera ID
camera_id = normalize_camera_id(folder_name)
return camera_id, folder_name
Example:
- Upload path:
/export/foscam/Front Door/image.jpg - folder_name:
"Front Door" - camera_id:
"front_door"(normalized)
Auto-Camera Creation¶
When a new directory is detected, a camera record is auto-created (backend/services/file_watcher.py:623-665):
async def _ensure_camera_exists(self, camera_id: str, folder_name: str) -> None:
# Skip if already processed
if camera_id in self._known_cameras:
return
self._known_cameras.add(camera_id)
if not self._camera_creator:
return
# Create camera instance
folder_path = str(Path(self.camera_root) / folder_name)
camera = Camera.from_folder_name(folder_name, folder_path)
await self._camera_creator(camera)
Rate Limiting¶
Queue operations are rate-limited to prevent Redis exhaustion (backend/services/file_watcher.py:834-848):
# Use semaphore to limit concurrent queue operations
semaphore = self._queue_semaphore or asyncio.Semaphore(self._max_concurrent_queue)
async with semaphore:
result = await self.redis_client.add_to_queue_safe(
self.queue_name,
detection_data,
overflow_policy=QueueOverflowPolicy.DLQ,
)
# Add delay between queue operations
if self._queue_delay_ms > 0:
await asyncio.sleep(self._queue_delay_ms / 1000.0)
Thread Safety¶
Watchdog runs in a separate thread. Task scheduling is thread-safe (backend/services/file_watcher.py:488-520):
def _schedule_async_task(self, file_path: str) -> None:
if self.watcher._loop and self.watcher._loop.is_running():
# Schedule task in event loop (thread-safe from watchdog thread)
asyncio.run_coroutine_threadsafe(
self.watcher._schedule_file_processing(file_path),
self.watcher._loop,
)
else:
logger.error(
f"CRITICAL: Event loop not available for processing {file_path}"
)
Error Handling¶
File processing errors are logged but don't stop the watcher (backend/services/file_watcher.py:779-789):
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
logger.error(
f"Failed to queue {media_type} {file_path}: {e}",
extra={
"camera_id": camera_id,
"file_path": file_path,
"media_type": media_type,
"duration_ms": duration_ms,
},
)
Related Documentation¶
- Batch Aggregator - Next stage in pipeline
- Pipeline Architecture - Full pipeline flow
- Testing Guide - Testing FileWatcher