"""Worker coordination utilities for multi-worker FastAPI deployments. This module provides utilities to ensure singleton services (schedulers, background tasks) run on only one worker when using uvicorn --workers N. """ import fcntl import os from pathlib import Path from typing import ContextManager from .logging_config import get_logger _LOGGER = get_logger(__name__) class WorkerLock: """File-based lock to coordinate worker processes. Only one worker can hold the lock at a time. This ensures singleton services like schedulers only run on one worker. """ def __init__(self, lock_file: str | None = None): """Initialize the worker lock. Args: lock_file: Path to the lock file. If None, will try /var/run first, falling back to /tmp if /var/run is not writable. """ if lock_file is None: # Try /var/run first (more persistent), fall back to /tmp for candidate in ["/var/run/alpinebits_primary_worker.lock", "/tmp/alpinebits_primary_worker.lock"]: try: candidate_path = Path(candidate) candidate_path.parent.mkdir(parents=True, exist_ok=True) # Test if we can write to this location test_file = candidate_path.parent / ".alpinebits_test" test_file.touch() test_file.unlink() lock_file = candidate break except (PermissionError, OSError): continue else: # If all fail, default to /tmp lock_file = "/tmp/alpinebits_primary_worker.lock" self.lock_file = Path(lock_file) self.lock_fd = None self.is_primary = False def acquire(self) -> bool: """Try to acquire the primary worker lock. Returns: True if lock was acquired (this is the primary worker) False if lock is held by another worker """ try: # Create lock file if it doesn't exist self.lock_file.parent.mkdir(parents=True, exist_ok=True) # Open lock file self.lock_fd = open(self.lock_file, "w") # Try to acquire exclusive lock (non-blocking) fcntl.flock(self.lock_fd.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) # Write PID to lock file for debugging self.lock_fd.write(f"{os.getpid()}\n") self.lock_fd.flush() self.is_primary = True _LOGGER.info( "Acquired primary worker lock (pid=%d, lock_file=%s)", os.getpid(), self.lock_file, ) return True except (IOError, OSError) as e: # Lock is held by another process if self.lock_fd: self.lock_fd.close() self.lock_fd = None self.is_primary = False _LOGGER.info( "Could not acquire primary worker lock - another worker is primary (pid=%d)", os.getpid(), ) return False def release(self) -> None: """Release the primary worker lock.""" if self.lock_fd and self.is_primary: try: fcntl.flock(self.lock_fd.fileno(), fcntl.LOCK_UN) self.lock_fd.close() # Try to remove lock file (best effort) try: self.lock_file.unlink() except Exception: pass _LOGGER.info("Released primary worker lock (pid=%d)", os.getpid()) except Exception: _LOGGER.exception("Error releasing primary worker lock") finally: self.lock_fd = None self.is_primary = False def __enter__(self) -> "WorkerLock": """Context manager entry.""" self.acquire() return self def __exit__(self, exc_type, exc_val, exc_tb) -> None: """Context manager exit.""" self.release() def is_primary_worker() -> tuple[bool, WorkerLock | None]: """Determine if this worker should run singleton services. Uses file-based locking to coordinate between workers. Includes stale lock detection and cleanup. Returns: Tuple of (is_primary, lock_object) - is_primary: True if this is the primary worker - lock_object: WorkerLock instance (must be kept alive) """ lock = WorkerLock() # Check for stale locks from dead processes if lock.lock_file.exists(): try: with open(lock.lock_file, 'r') as f: old_pid_str = f.read().strip() if old_pid_str: old_pid = int(old_pid_str) # Check if the process with this PID still exists try: os.kill(old_pid, 0) # Signal 0 just checks existence _LOGGER.debug("Lock held by active process pid=%d", old_pid) except ProcessLookupError: # Process is dead, remove stale lock _LOGGER.warning( "Removing stale lock file from dead process pid=%d", old_pid ) try: lock.lock_file.unlink() except Exception as e: _LOGGER.warning("Failed to remove stale lock: %s", e) except (ValueError, FileNotFoundError, PermissionError) as e: _LOGGER.warning("Error checking lock file: %s", e) is_primary = lock.acquire() return is_primary, lock