email_notifications #7
@@ -1,6 +1,7 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import gzip
|
import gzip
|
||||||
import json
|
import json
|
||||||
|
import multiprocessing
|
||||||
import os
|
import os
|
||||||
import traceback
|
import traceback
|
||||||
import urllib.parse
|
import urllib.parse
|
||||||
@@ -181,6 +182,26 @@ async def push_listener(customer: DBCustomer, reservation: DBReservation, hotel)
|
|||||||
async def lifespan(app: FastAPI):
|
async def lifespan(app: FastAPI):
|
||||||
# Setup DB
|
# Setup DB
|
||||||
|
|
||||||
|
# Determine if this is the primary worker
|
||||||
|
# Only primary runs schedulers/background tasks
|
||||||
|
# In multi-worker setups, only one worker should run singleton services
|
||||||
|
worker_id = os.environ.get("APP_WORKER_ID", "0")
|
||||||
|
is_primary_worker = worker_id == "0"
|
||||||
|
|
||||||
|
# For uvicorn with --workers, detect if we're the main process
|
||||||
|
if not is_primary_worker:
|
||||||
|
# Check if running under uvicorn's supervisor
|
||||||
|
is_primary_worker = (
|
||||||
|
multiprocessing.current_process().name == "MainProcess"
|
||||||
|
)
|
||||||
|
|
||||||
|
_LOGGER.info(
|
||||||
|
"Worker startup: process=%s, pid=%d, primary=%s",
|
||||||
|
multiprocessing.current_process().name,
|
||||||
|
os.getpid(),
|
||||||
|
is_primary_worker,
|
||||||
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
config = load_config()
|
config = load_config()
|
||||||
except Exception:
|
except Exception:
|
||||||
@@ -194,8 +215,11 @@ async def lifespan(app: FastAPI):
|
|||||||
email_service = create_email_service(config)
|
email_service = create_email_service(config)
|
||||||
|
|
||||||
# Setup logging from config with email monitoring
|
# Setup logging from config with email monitoring
|
||||||
email_handler, report_scheduler = setup_logging(config, email_service, loop)
|
# Only primary worker should have the report scheduler running
|
||||||
_LOGGER.info("Application startup initiated")
|
email_handler, report_scheduler = setup_logging(
|
||||||
|
config, email_service, loop, enable_scheduler=is_primary_worker
|
||||||
|
)
|
||||||
|
_LOGGER.info("Application startup initiated (primary_worker=%s)", is_primary_worker)
|
||||||
|
|
||||||
DATABASE_URL = get_database_url(config)
|
DATABASE_URL = get_database_url(config)
|
||||||
engine = create_async_engine(DATABASE_URL, echo=False)
|
engine = create_async_engine(DATABASE_URL, echo=False)
|
||||||
@@ -235,18 +259,22 @@ async def lifespan(app: FastAPI):
|
|||||||
await conn.run_sync(Base.metadata.create_all)
|
await conn.run_sync(Base.metadata.create_all)
|
||||||
_LOGGER.info("Database tables checked/created at startup.")
|
_LOGGER.info("Database tables checked/created at startup.")
|
||||||
|
|
||||||
# Hash any existing customers that don't have hashed versions yet
|
# Hash any existing customers (only in primary worker to avoid race conditions)
|
||||||
async with AsyncSessionLocal() as session:
|
if is_primary_worker:
|
||||||
customer_service = CustomerService(session)
|
async with AsyncSessionLocal() as session:
|
||||||
hashed_count = await customer_service.hash_existing_customers()
|
customer_service = CustomerService(session)
|
||||||
if hashed_count > 0:
|
hashed_count = await customer_service.hash_existing_customers()
|
||||||
_LOGGER.info(
|
if hashed_count > 0:
|
||||||
"Backfilled hashed data for %d existing customers", hashed_count
|
_LOGGER.info(
|
||||||
)
|
"Backfilled hashed data for %d existing customers", hashed_count
|
||||||
else:
|
)
|
||||||
_LOGGER.info("All existing customers already have hashed data")
|
else:
|
||||||
|
_LOGGER.info("All existing customers already have hashed data")
|
||||||
|
else:
|
||||||
|
_LOGGER.info("Skipping customer hashing (non-primary worker)")
|
||||||
|
|
||||||
# Initialize and hook up stats collector for daily reports
|
# Initialize and hook up stats collector for daily reports
|
||||||
|
# Note: report_scheduler will only exist on the primary worker
|
||||||
if report_scheduler:
|
if report_scheduler:
|
||||||
stats_collector = ReservationStatsCollector(
|
stats_collector = ReservationStatsCollector(
|
||||||
async_sessionmaker=AsyncSessionLocal,
|
async_sessionmaker=AsyncSessionLocal,
|
||||||
@@ -254,7 +282,9 @@ async def lifespan(app: FastAPI):
|
|||||||
)
|
)
|
||||||
# Hook up the stats collector to the report scheduler
|
# Hook up the stats collector to the report scheduler
|
||||||
report_scheduler.set_stats_collector(stats_collector.collect_stats)
|
report_scheduler.set_stats_collector(stats_collector.collect_stats)
|
||||||
_LOGGER.info("Stats collector initialized and hooked up to report scheduler")
|
_LOGGER.info(
|
||||||
|
"Stats collector initialized and hooked up to report scheduler"
|
||||||
|
)
|
||||||
|
|
||||||
# Start daily report scheduler
|
# Start daily report scheduler
|
||||||
report_scheduler.start()
|
report_scheduler.start()
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ def setup_logging(
|
|||||||
config: dict | None = None,
|
config: dict | None = None,
|
||||||
email_service: "EmailService | None" = None,
|
email_service: "EmailService | None" = None,
|
||||||
loop: asyncio.AbstractEventLoop | None = None,
|
loop: asyncio.AbstractEventLoop | None = None,
|
||||||
|
enable_scheduler: bool = True,
|
||||||
) -> tuple["EmailAlertHandler | None", "DailyReportScheduler | None"]:
|
) -> tuple["EmailAlertHandler | None", "DailyReportScheduler | None"]:
|
||||||
"""Configure logging based on application config.
|
"""Configure logging based on application config.
|
||||||
|
|
||||||
@@ -26,6 +27,8 @@ def setup_logging(
|
|||||||
config: Application configuration dict with optional 'logger' section
|
config: Application configuration dict with optional 'logger' section
|
||||||
email_service: Optional email service for email alerts
|
email_service: Optional email service for email alerts
|
||||||
loop: Optional asyncio event loop for email alerts
|
loop: Optional asyncio event loop for email alerts
|
||||||
|
enable_scheduler: Whether to enable the daily report scheduler
|
||||||
|
(should be False for non-primary workers)
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Tuple of (email_alert_handler, daily_report_scheduler) if email monitoring
|
Tuple of (email_alert_handler, daily_report_scheduler) if email monitoring
|
||||||
@@ -109,9 +112,9 @@ def setup_logging(
|
|||||||
except Exception:
|
except Exception:
|
||||||
root_logger.exception("Failed to setup email alert handler")
|
root_logger.exception("Failed to setup email alert handler")
|
||||||
|
|
||||||
# Setup daily report scheduler
|
# Setup daily report scheduler (only if enabled and this is primary worker)
|
||||||
daily_report_config = monitoring_config.get("daily_report", {})
|
daily_report_config = monitoring_config.get("daily_report", {})
|
||||||
if daily_report_config.get("enabled", False):
|
if daily_report_config.get("enabled", False) and enable_scheduler:
|
||||||
try:
|
try:
|
||||||
# Import here to avoid circular dependencies
|
# Import here to avoid circular dependencies
|
||||||
from alpine_bits_python.email_monitoring import DailyReportScheduler
|
from alpine_bits_python.email_monitoring import DailyReportScheduler
|
||||||
@@ -120,9 +123,13 @@ def setup_logging(
|
|||||||
email_service=email_service,
|
email_service=email_service,
|
||||||
config=daily_report_config,
|
config=daily_report_config,
|
||||||
)
|
)
|
||||||
root_logger.info("Daily report scheduler configured")
|
root_logger.info("Daily report scheduler configured (primary worker)")
|
||||||
except Exception:
|
except Exception:
|
||||||
root_logger.exception("Failed to setup daily report scheduler")
|
root_logger.exception("Failed to setup daily report scheduler")
|
||||||
|
elif daily_report_config.get("enabled", False) and not enable_scheduler:
|
||||||
|
root_logger.info(
|
||||||
|
"Daily report scheduler disabled (non-primary worker)"
|
||||||
|
)
|
||||||
|
|
||||||
return email_handler, report_scheduler
|
return email_handler, report_scheduler
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user