Worker coordination with file locks
This commit is contained in:
@@ -45,6 +45,7 @@ from .rate_limit import (
|
||||
webhook_limiter,
|
||||
)
|
||||
from .reservation_service import ReservationService
|
||||
from .worker_coordination import is_primary_worker
|
||||
|
||||
# Configure logging - will be reconfigured during lifespan with actual config
|
||||
_LOGGER = get_logger(__name__)
|
||||
@@ -182,24 +183,16 @@ async def push_listener(customer: DBCustomer, reservation: DBReservation, hotel)
|
||||
async def lifespan(app: FastAPI):
|
||||
# Setup DB
|
||||
|
||||
# Determine if this is the primary worker
|
||||
# Determine if this is the primary worker using file-based locking
|
||||
# Only primary runs schedulers/background tasks
|
||||
# In multi-worker setups, only one worker should run singleton services
|
||||
worker_id = os.environ.get("APP_WORKER_ID", "0")
|
||||
is_primary_worker = worker_id == "0"
|
||||
|
||||
# For uvicorn with --workers, detect if we're the main process
|
||||
if not is_primary_worker:
|
||||
# Check if running under uvicorn's supervisor
|
||||
is_primary_worker = (
|
||||
multiprocessing.current_process().name == "MainProcess"
|
||||
)
|
||||
is_primary, worker_lock = is_primary_worker()
|
||||
|
||||
_LOGGER.info(
|
||||
"Worker startup: process=%s, pid=%d, primary=%s",
|
||||
multiprocessing.current_process().name,
|
||||
os.getpid(),
|
||||
is_primary_worker,
|
||||
is_primary,
|
||||
)
|
||||
|
||||
try:
|
||||
@@ -217,9 +210,9 @@ async def lifespan(app: FastAPI):
|
||||
# Setup logging from config with email monitoring
|
||||
# Only primary worker should have the report scheduler running
|
||||
email_handler, report_scheduler = setup_logging(
|
||||
config, email_service, loop, enable_scheduler=is_primary_worker
|
||||
config, email_service, loop, enable_scheduler=is_primary
|
||||
)
|
||||
_LOGGER.info("Application startup initiated (primary_worker=%s)", is_primary_worker)
|
||||
_LOGGER.info("Application startup initiated (primary_worker=%s)", is_primary)
|
||||
|
||||
DATABASE_URL = get_database_url(config)
|
||||
engine = create_async_engine(DATABASE_URL, echo=False)
|
||||
@@ -260,7 +253,7 @@ async def lifespan(app: FastAPI):
|
||||
_LOGGER.info("Database tables checked/created at startup.")
|
||||
|
||||
# Hash any existing customers (only in primary worker to avoid race conditions)
|
||||
if is_primary_worker:
|
||||
if is_primary:
|
||||
async with AsyncSessionLocal() as session:
|
||||
customer_service = CustomerService(session)
|
||||
hashed_count = await customer_service.hash_existing_customers()
|
||||
@@ -311,6 +304,10 @@ async def lifespan(app: FastAPI):
|
||||
await engine.dispose()
|
||||
_LOGGER.info("Application shutdown complete")
|
||||
|
||||
# Release worker lock if this was the primary worker
|
||||
if worker_lock:
|
||||
worker_lock.release()
|
||||
|
||||
|
||||
async def get_async_session(request: Request):
|
||||
async_sessionmaker = request.app.state.async_sessionmaker
|
||||
|
||||
119
src/alpine_bits_python/worker_coordination.py
Normal file
119
src/alpine_bits_python/worker_coordination.py
Normal file
@@ -0,0 +1,119 @@
|
||||
"""Worker coordination utilities for multi-worker FastAPI deployments.
|
||||
|
||||
This module provides utilities to ensure singleton services (schedulers, background tasks)
|
||||
run on only one worker when using uvicorn --workers N.
|
||||
"""
|
||||
|
||||
import fcntl
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import ContextManager
|
||||
|
||||
from .logging_config import get_logger
|
||||
|
||||
_LOGGER = get_logger(__name__)
|
||||
|
||||
|
||||
class WorkerLock:
|
||||
"""File-based lock to coordinate worker processes.
|
||||
|
||||
Only one worker can hold the lock at a time. This ensures singleton
|
||||
services like schedulers only run on one worker.
|
||||
"""
|
||||
|
||||
def __init__(self, lock_file: str = "/tmp/alpinebits_primary_worker.lock"):
|
||||
"""Initialize the worker lock.
|
||||
|
||||
Args:
|
||||
lock_file: Path to the lock file
|
||||
"""
|
||||
self.lock_file = Path(lock_file)
|
||||
self.lock_fd = None
|
||||
self.is_primary = False
|
||||
|
||||
def acquire(self) -> bool:
|
||||
"""Try to acquire the primary worker lock.
|
||||
|
||||
Returns:
|
||||
True if lock was acquired (this is the primary worker)
|
||||
False if lock is held by another worker
|
||||
"""
|
||||
try:
|
||||
# Create lock file if it doesn't exist
|
||||
self.lock_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Open lock file
|
||||
self.lock_fd = open(self.lock_file, "w")
|
||||
|
||||
# Try to acquire exclusive lock (non-blocking)
|
||||
fcntl.flock(self.lock_fd.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
|
||||
# Write PID to lock file for debugging
|
||||
self.lock_fd.write(f"{os.getpid()}\n")
|
||||
self.lock_fd.flush()
|
||||
|
||||
self.is_primary = True
|
||||
_LOGGER.info(
|
||||
"Acquired primary worker lock (pid=%d, lock_file=%s)",
|
||||
os.getpid(),
|
||||
self.lock_file,
|
||||
)
|
||||
return True
|
||||
|
||||
except (IOError, OSError) as e:
|
||||
# Lock is held by another process
|
||||
if self.lock_fd:
|
||||
self.lock_fd.close()
|
||||
self.lock_fd = None
|
||||
|
||||
self.is_primary = False
|
||||
_LOGGER.info(
|
||||
"Could not acquire primary worker lock - another worker is primary (pid=%d)",
|
||||
os.getpid(),
|
||||
)
|
||||
return False
|
||||
|
||||
def release(self) -> None:
|
||||
"""Release the primary worker lock."""
|
||||
if self.lock_fd and self.is_primary:
|
||||
try:
|
||||
fcntl.flock(self.lock_fd.fileno(), fcntl.LOCK_UN)
|
||||
self.lock_fd.close()
|
||||
|
||||
# Try to remove lock file (best effort)
|
||||
try:
|
||||
self.lock_file.unlink()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
_LOGGER.info("Released primary worker lock (pid=%d)", os.getpid())
|
||||
except Exception:
|
||||
_LOGGER.exception("Error releasing primary worker lock")
|
||||
finally:
|
||||
self.lock_fd = None
|
||||
self.is_primary = False
|
||||
|
||||
def __enter__(self) -> "WorkerLock":
|
||||
"""Context manager entry."""
|
||||
self.acquire()
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
|
||||
"""Context manager exit."""
|
||||
self.release()
|
||||
|
||||
|
||||
def is_primary_worker() -> tuple[bool, WorkerLock | None]:
|
||||
"""Determine if this worker should run singleton services.
|
||||
|
||||
Uses file-based locking to coordinate between workers.
|
||||
|
||||
Returns:
|
||||
Tuple of (is_primary, lock_object)
|
||||
- is_primary: True if this is the primary worker
|
||||
- lock_object: WorkerLock instance (must be kept alive)
|
||||
"""
|
||||
lock = WorkerLock()
|
||||
is_primary = lock.acquire()
|
||||
|
||||
return is_primary, lock
|
||||
Reference in New Issue
Block a user