"""Database setup and initialization. This module handles all database setup tasks that should run once at startup, before the application starts accepting requests. It includes: - Schema migrations via Alembic - One-time data cleanup/backfill tasks (e.g., hashing existing customers) """ import asyncio from datetime import UTC, datetime from typing import Any from sqlalchemy import select, text from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker from sqlalchemy.orm import selectinload from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT, WebhookStatus from .customer_service import CustomerService from .db import WebhookEndpoint, WebhookRequest, create_database_engine from .logging_config import get_logger from .webhook_processor import webhook_registry _LOGGER = get_logger(__name__) async def setup_database(config: dict[str, Any] | None = None) -> tuple[AsyncEngine, async_sessionmaker]: """Set up the database and prepare for application use. This function should be called once at application startup, after migrations have been run but before the app starts accepting requests. It: 1. Creates the async engine 2. Creates the sessionmaker 3. Performs one-time startup tasks (e.g., hashing existing customers) NOTE: Database migrations should be run BEFORE calling this function, typically using `uv run alembic upgrade head` or via run_migrations.py. Args: config: Application configuration dictionary Returns: Tuple of (engine, async_sessionmaker) for use in the application Raises: Any database-related exceptions that occur during setup """ _LOGGER.info("Starting database setup...") # Create database engine engine = create_database_engine(config=config, echo=False) try: # Create sessionmaker for the application to use AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False) # Perform startup tasks (NOT migrations) _LOGGER.info("Running startup tasks...") await run_startup_tasks(AsyncSessionLocal, config) _LOGGER.info("Startup tasks completed successfully") _LOGGER.info("Database setup completed successfully") return engine, AsyncSessionLocal except Exception as e: _LOGGER.exception("Database setup failed: %s", e) await engine.dispose() raise async def backfill_advertising_account_ids( engine: AsyncEngine, config: dict[str, Any] ) -> None: """Backfill advertising account IDs for existing reservations. Updates existing reservations to populate meta_account_id and google_account_id based on the conditional logic: - If fbclid is present, set meta_account_id from hotel config - If gclid is present, set google_account_id from hotel config This is a startup task that runs after schema migrations to ensure existing data is consistent with config. Args: engine: SQLAlchemy async engine config: Application configuration dict """ _LOGGER.info("Backfilling advertising account IDs for existing reservations...") # Build a mapping of hotel_id -> account IDs from config hotel_accounts = {} alpine_bits_auth = config.get("alpine_bits_auth", []) for hotel in alpine_bits_auth: hotel_id = hotel.get(CONF_HOTEL_ID) meta_account = hotel.get(CONF_META_ACCOUNT) google_account = hotel.get(CONF_GOOGLE_ACCOUNT) if hotel_id: hotel_accounts[hotel_id] = { "meta_account": meta_account, "google_account": google_account, } if not hotel_accounts: _LOGGER.debug("No hotel accounts found in config, skipping backfill") return _LOGGER.info("Found %d hotel(s) with account configurations", len(hotel_accounts)) # Update reservations with meta_account_id where fbclid is present meta_updated = 0 for hotel_id, accounts in hotel_accounts.items(): if accounts["meta_account"]: async with engine.begin() as conn: sql = text( "UPDATE reservations " "SET meta_account_id = :meta_account " "WHERE hotel_id = :hotel_id " "AND fbclid IS NOT NULL " "AND fbclid != '' " "AND (meta_account_id IS NULL OR meta_account_id = '')" ) result = await conn.execute( sql, {"meta_account": accounts["meta_account"], "hotel_id": hotel_id}, ) count = result.rowcount if count > 0: _LOGGER.info( "Updated %d reservations with meta_account_id for hotel %s", count, hotel_id, ) meta_updated += count # Update reservations with google_account_id where gclid is present google_updated = 0 for hotel_id, accounts in hotel_accounts.items(): if accounts["google_account"]: async with engine.begin() as conn: sql = text( "UPDATE reservations " "SET google_account_id = :google_account " "WHERE hotel_id = :hotel_id " "AND gclid IS NOT NULL " "AND gclid != '' " "AND (google_account_id IS NULL OR google_account_id = '')" ) result = await conn.execute( sql, { "google_account": accounts["google_account"], "hotel_id": hotel_id, }, ) count = result.rowcount if count > 0: _LOGGER.info( "Updated %d reservations with google_account_id for hotel %s", count, hotel_id, ) google_updated += count if meta_updated > 0 or google_updated > 0: _LOGGER.info( "Backfill complete: %d reservations updated with meta_account_id, " "%d with google_account_id", meta_updated, google_updated, ) async def backfill_acked_requests_username( engine: AsyncEngine, config: dict[str, Any] ) -> None: """Backfill username for existing acked_requests records. For each acknowledgement, find the corresponding reservation to determine its hotel_code, then look up the username for that hotel in the config and update the acked_request record. This is a startup task that runs after schema migrations to ensure existing data is consistent with config. Args: engine: SQLAlchemy async engine config: Application configuration dict """ _LOGGER.info("Backfilling usernames for existing acked_requests...") # Build a mapping of hotel_id -> username from config hotel_usernames = {} alpine_bits_auth = config.get("alpine_bits_auth", []) for hotel in alpine_bits_auth: hotel_id = hotel.get(CONF_HOTEL_ID) username = hotel.get("username") if hotel_id and username: hotel_usernames[hotel_id] = username if not hotel_usernames: _LOGGER.debug("No hotel usernames found in config, skipping backfill") return _LOGGER.info("Found %d hotel(s) with usernames in config", len(hotel_usernames)) # Update acked_requests with usernames by matching to reservations total_updated = 0 async with engine.begin() as conn: for hotel_id, username in hotel_usernames.items(): sql = text( """ UPDATE acked_requests SET username = :username WHERE unique_id IN ( SELECT md5_unique_id FROM reservations WHERE hotel_id = :hotel_id ) AND username IS NULL """ ) result = await conn.execute( sql, {"username": username, "hotel_id": hotel_id} ) count = result.rowcount if count > 0: _LOGGER.info( "Updated %d acknowledgements with username for hotel %s", count, hotel_id, ) total_updated += count if total_updated > 0: _LOGGER.info( "Backfill complete: %d acknowledgements updated with username", total_updated, ) async def reprocess_stuck_webhooks( sessionmaker: async_sessionmaker, config: dict[str, Any] | None = None, ) -> None: """Reprocess webhooks that were stuck in 'processing' state. Finds webhooks with status='processing' and reprocesses them. These are webhooks that were not fully processed in the previous run, likely due to a crash or unexpected shutdown. This function is designed to NEVER block application startup. All errors are caught and logged, but the app will start regardless. Args: sessionmaker: SQLAlchemy async sessionmaker config: Application configuration dictionary """ try: _LOGGER.info("Checking for stuck webhooks to reprocess...") async with sessionmaker() as session: # Find all webhooks stuck in 'processing' state result = await session.execute( select(WebhookRequest) .where(WebhookRequest.status == WebhookStatus.PROCESSING) .options( selectinload(WebhookRequest.webhook_endpoint).selectinload( WebhookEndpoint.hotel ) ) ) stuck_webhooks: list[WebhookRequest] = result.scalars().all() if not stuck_webhooks: _LOGGER.info("No stuck webhooks found") return _LOGGER.info("Found %d stuck webhooks to reprocess", len(stuck_webhooks)) reprocessed_count = 0 failed_count = 0 for webhook_request in stuck_webhooks: webhook_id = webhook_request.id webhook_endpoint = webhook_request.webhook_endpoint if not webhook_endpoint: _LOGGER.error( "Webhook request %d has no webhook_endpoint, skipping", webhook_id ) webhook_request.status = WebhookStatus.FAILED webhook_request.last_error = ( "No webhook endpoint found during startup reprocessing" ) webhook_request.processing_completed_at = datetime.now(UTC) failed_count += 1 continue if not webhook_request.payload_json: _LOGGER.error( "Webhook request %d has no payload (purged?), marking as failed", webhook_id, ) webhook_request.status = WebhookStatus.FAILED webhook_request.last_error = ( "No payload available for reprocessing (purged)" ) webhook_request.processing_completed_at = datetime.now(UTC) failed_count += 1 continue try: _LOGGER.info( "Reprocessing webhook %d (hotel=%s, type=%s)", webhook_id, webhook_endpoint.hotel_id, webhook_endpoint.webhook_type, ) # Get processor for webhook_type processor = webhook_registry.get_processor( webhook_endpoint.webhook_type ) if not processor: raise ValueError( f"No processor for type: {webhook_endpoint.webhook_type}" ) # Reprocess webhook with simplified interface result = await processor.process( webhook_request=webhook_request, db_session=session, config=config, ) # Check result status result_status = result.get("status") if isinstance(result, dict) else "success" if result_status == "duplicate": # Duplicate is not an error - mark as completed and continue webhook_request.status = WebhookStatus.COMPLETED webhook_request.processing_completed_at = datetime.now(UTC) reprocessed_count += 1 _LOGGER.info( "Webhook %d was a duplicate (already processed), marked as completed", webhook_id ) elif result_status in ("success", "completed"): # Update status to completed webhook_request.status = WebhookStatus.COMPLETED webhook_request.processing_completed_at = datetime.now(UTC) reprocessed_count += 1 _LOGGER.info("Successfully reprocessed webhook %d", webhook_id) else: # Unexpected status - treat as failure _LOGGER.warning( "Webhook %d returned unexpected status: %s", webhook_id, result_status ) webhook_request.status = WebhookStatus.FAILED webhook_request.last_error = f"Unexpected status: {result_status}" webhook_request.processing_completed_at = datetime.now(UTC) failed_count += 1 except Exception as e: _LOGGER.exception("Failed to reprocess webhook %d: %s", webhook_id, e) webhook_request.status = WebhookStatus.FAILED webhook_request.last_error = ( f"Reprocessing failed during startup: {str(e)[:1950]}" ) webhook_request.processing_completed_at = datetime.now(UTC) failed_count += 1 # Commit all changes await session.commit() _LOGGER.info( "Webhook reprocessing complete: %d successful, %d failed", reprocessed_count, failed_count, ) except Exception as e: # CRITICAL: Never let reprocessing block application startup _LOGGER.exception( "CRITICAL ERROR during webhook reprocessing, but allowing app to start: %s", e ) async def run_startup_tasks( sessionmaker: async_sessionmaker, config: dict[str, Any] | None = None, engine: AsyncEngine | None = None, ) -> None: """Run one-time startup tasks. These are tasks that need to run at startup but are NOT schema migrations. Examples: data backfills, hashing existing records, etc. Args: sessionmaker: SQLAlchemy async sessionmaker config: Application configuration dictionary engine: SQLAlchemy async engine (optional, for backfill tasks) """ # Sync config to database (hotels and webhook endpoints) if config: from .hotel_service import sync_config_to_database async with sessionmaker() as session: stats = await sync_config_to_database(session, config) _LOGGER.info( "Config sync: %d hotels created, %d updated, %d endpoints created", stats["hotels_created"], stats["hotels_updated"], stats["endpoints_created"] ) # Hash any existing customers that don't have hashed data async with sessionmaker() as session: customer_service = CustomerService(session) hashed_count = await customer_service.hash_existing_customers() if hashed_count > 0: _LOGGER.info( "Backfilled hashed data for %d existing customers", hashed_count ) else: _LOGGER.debug("All existing customers already have hashed data") # Backfill advertising account IDs and usernames based on config # This ensures existing data is consistent with current configuration if config and engine: await backfill_advertising_account_ids(engine, config) await backfill_acked_requests_username(engine, config) elif config and not engine: _LOGGER.warning( "No engine provided to run_startup_tasks, " "skipping config-based backfill tasks" ) # Reprocess stuck webhooks (those stuck in 'processing' state) await reprocess_stuck_webhooks(sessionmaker, config)