diff --git a/src/alpine_bits_python/api.py b/src/alpine_bits_python/api.py index 1679d94..ba533c5 100644 --- a/src/alpine_bits_python/api.py +++ b/src/alpine_bits_python/api.py @@ -1,6 +1,7 @@ import asyncio import gzip import json +import multiprocessing import os import traceback import urllib.parse @@ -181,6 +182,26 @@ async def push_listener(customer: DBCustomer, reservation: DBReservation, hotel) async def lifespan(app: FastAPI): # Setup DB + # Determine if this is the primary worker + # Only primary runs schedulers/background tasks + # In multi-worker setups, only one worker should run singleton services + worker_id = os.environ.get("APP_WORKER_ID", "0") + is_primary_worker = worker_id == "0" + + # For uvicorn with --workers, detect if we're the main process + if not is_primary_worker: + # Check if running under uvicorn's supervisor + is_primary_worker = ( + multiprocessing.current_process().name == "MainProcess" + ) + + _LOGGER.info( + "Worker startup: process=%s, pid=%d, primary=%s", + multiprocessing.current_process().name, + os.getpid(), + is_primary_worker, + ) + try: config = load_config() except Exception: @@ -194,8 +215,11 @@ async def lifespan(app: FastAPI): email_service = create_email_service(config) # Setup logging from config with email monitoring - email_handler, report_scheduler = setup_logging(config, email_service, loop) - _LOGGER.info("Application startup initiated") + # Only primary worker should have the report scheduler running + email_handler, report_scheduler = setup_logging( + config, email_service, loop, enable_scheduler=is_primary_worker + ) + _LOGGER.info("Application startup initiated (primary_worker=%s)", is_primary_worker) DATABASE_URL = get_database_url(config) engine = create_async_engine(DATABASE_URL, echo=False) @@ -235,18 +259,22 @@ async def lifespan(app: FastAPI): await conn.run_sync(Base.metadata.create_all) _LOGGER.info("Database tables checked/created at startup.") - # Hash any existing customers that don't have hashed versions yet - async with AsyncSessionLocal() as session: - customer_service = CustomerService(session) - hashed_count = await customer_service.hash_existing_customers() - if hashed_count > 0: - _LOGGER.info( - "Backfilled hashed data for %d existing customers", hashed_count - ) - else: - _LOGGER.info("All existing customers already have hashed data") + # Hash any existing customers (only in primary worker to avoid race conditions) + if is_primary_worker: + async with AsyncSessionLocal() as session: + customer_service = CustomerService(session) + hashed_count = await customer_service.hash_existing_customers() + if hashed_count > 0: + _LOGGER.info( + "Backfilled hashed data for %d existing customers", hashed_count + ) + else: + _LOGGER.info("All existing customers already have hashed data") + else: + _LOGGER.info("Skipping customer hashing (non-primary worker)") # Initialize and hook up stats collector for daily reports + # Note: report_scheduler will only exist on the primary worker if report_scheduler: stats_collector = ReservationStatsCollector( async_sessionmaker=AsyncSessionLocal, @@ -254,7 +282,9 @@ async def lifespan(app: FastAPI): ) # Hook up the stats collector to the report scheduler report_scheduler.set_stats_collector(stats_collector.collect_stats) - _LOGGER.info("Stats collector initialized and hooked up to report scheduler") + _LOGGER.info( + "Stats collector initialized and hooked up to report scheduler" + ) # Start daily report scheduler report_scheduler.start() diff --git a/src/alpine_bits_python/logging_config.py b/src/alpine_bits_python/logging_config.py index 252970c..712aae8 100644 --- a/src/alpine_bits_python/logging_config.py +++ b/src/alpine_bits_python/logging_config.py @@ -19,6 +19,7 @@ def setup_logging( config: dict | None = None, email_service: "EmailService | None" = None, loop: asyncio.AbstractEventLoop | None = None, + enable_scheduler: bool = True, ) -> tuple["EmailAlertHandler | None", "DailyReportScheduler | None"]: """Configure logging based on application config. @@ -26,6 +27,8 @@ def setup_logging( config: Application configuration dict with optional 'logger' section email_service: Optional email service for email alerts loop: Optional asyncio event loop for email alerts + enable_scheduler: Whether to enable the daily report scheduler + (should be False for non-primary workers) Returns: Tuple of (email_alert_handler, daily_report_scheduler) if email monitoring @@ -109,9 +112,9 @@ def setup_logging( except Exception: root_logger.exception("Failed to setup email alert handler") - # Setup daily report scheduler + # Setup daily report scheduler (only if enabled and this is primary worker) daily_report_config = monitoring_config.get("daily_report", {}) - if daily_report_config.get("enabled", False): + if daily_report_config.get("enabled", False) and enable_scheduler: try: # Import here to avoid circular dependencies from alpine_bits_python.email_monitoring import DailyReportScheduler @@ -120,9 +123,13 @@ def setup_logging( email_service=email_service, config=daily_report_config, ) - root_logger.info("Daily report scheduler configured") + root_logger.info("Daily report scheduler configured (primary worker)") except Exception: root_logger.exception("Failed to setup daily report scheduler") + elif daily_report_config.get("enabled", False) and not enable_scheduler: + root_logger.info( + "Daily report scheduler disabled (non-primary worker)" + ) return email_handler, report_scheduler