Files
alpinebits_python/src/alpine_bits_python/db_setup.py
2025-12-03 10:41:34 +01:00

443 lines
17 KiB
Python

"""Database setup and initialization.
This module handles all database setup tasks that should run once at startup,
before the application starts accepting requests. It includes:
- Schema migrations via Alembic
- One-time data cleanup/backfill tasks (e.g., hashing existing customers)
"""
import asyncio
from datetime import UTC, datetime
from typing import Any
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker
from sqlalchemy.orm import selectinload
from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT, WebhookStatus
from .customer_service import CustomerService
from .db import WebhookEndpoint, WebhookRequest, create_database_engine
from .logging_config import get_logger
from .webhook_processor import webhook_registry
_LOGGER = get_logger(__name__)
async def setup_database(config: dict[str, Any] | None = None) -> tuple[AsyncEngine, async_sessionmaker]:
"""Set up the database and prepare for application use.
This function should be called once at application startup, after
migrations have been run but before the app starts accepting requests. It:
1. Creates the async engine
2. Creates the sessionmaker
3. Performs one-time startup tasks (e.g., hashing existing customers)
NOTE: Database migrations should be run BEFORE calling this function,
typically using `uv run alembic upgrade head` or via run_migrations.py.
Args:
config: Application configuration dictionary
Returns:
Tuple of (engine, async_sessionmaker) for use in the application
Raises:
Any database-related exceptions that occur during setup
"""
_LOGGER.info("Starting database setup...")
# Create database engine
engine = create_database_engine(config=config, echo=False)
try:
# Create sessionmaker for the application to use
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
# Perform startup tasks (NOT migrations)
_LOGGER.info("Running startup tasks...")
await run_startup_tasks(AsyncSessionLocal, config)
_LOGGER.info("Startup tasks completed successfully")
_LOGGER.info("Database setup completed successfully")
return engine, AsyncSessionLocal
except Exception as e:
_LOGGER.exception("Database setup failed: %s", e)
await engine.dispose()
raise
async def backfill_advertising_account_ids(
engine: AsyncEngine, config: dict[str, Any]
) -> None:
"""Backfill advertising account IDs for existing reservations.
Updates existing reservations to populate meta_account_id and google_account_id
based on the conditional logic:
- If fbclid is present, set meta_account_id from hotel config
- If gclid is present, set google_account_id from hotel config
This is a startup task that runs after schema migrations to ensure
existing data is consistent with config.
Args:
engine: SQLAlchemy async engine
config: Application configuration dict
"""
_LOGGER.info("Backfilling advertising account IDs for existing reservations...")
# Build a mapping of hotel_id -> account IDs from config
hotel_accounts = {}
alpine_bits_auth = config.get("alpine_bits_auth", [])
for hotel in alpine_bits_auth:
hotel_id = hotel.get(CONF_HOTEL_ID)
meta_account = hotel.get(CONF_META_ACCOUNT)
google_account = hotel.get(CONF_GOOGLE_ACCOUNT)
if hotel_id:
hotel_accounts[hotel_id] = {
"meta_account": meta_account,
"google_account": google_account,
}
if not hotel_accounts:
_LOGGER.debug("No hotel accounts found in config, skipping backfill")
return
_LOGGER.info("Found %d hotel(s) with account configurations", len(hotel_accounts))
# Update reservations with meta_account_id where fbclid is present
meta_updated = 0
for hotel_id, accounts in hotel_accounts.items():
if accounts["meta_account"]:
async with engine.begin() as conn:
sql = text(
"UPDATE reservations "
"SET meta_account_id = :meta_account "
"WHERE hotel_id = :hotel_id "
"AND fbclid IS NOT NULL "
"AND fbclid != '' "
"AND (meta_account_id IS NULL OR meta_account_id = '')"
)
result = await conn.execute(
sql,
{"meta_account": accounts["meta_account"], "hotel_id": hotel_id},
)
count = result.rowcount
if count > 0:
_LOGGER.info(
"Updated %d reservations with meta_account_id for hotel %s",
count,
hotel_id,
)
meta_updated += count
# Update reservations with google_account_id where gclid is present
google_updated = 0
for hotel_id, accounts in hotel_accounts.items():
if accounts["google_account"]:
async with engine.begin() as conn:
sql = text(
"UPDATE reservations "
"SET google_account_id = :google_account "
"WHERE hotel_id = :hotel_id "
"AND gclid IS NOT NULL "
"AND gclid != '' "
"AND (google_account_id IS NULL OR google_account_id = '')"
)
result = await conn.execute(
sql,
{
"google_account": accounts["google_account"],
"hotel_id": hotel_id,
},
)
count = result.rowcount
if count > 0:
_LOGGER.info(
"Updated %d reservations with google_account_id for hotel %s",
count,
hotel_id,
)
google_updated += count
if meta_updated > 0 or google_updated > 0:
_LOGGER.info(
"Backfill complete: %d reservations updated with meta_account_id, "
"%d with google_account_id",
meta_updated,
google_updated,
)
async def backfill_acked_requests_username(
engine: AsyncEngine, config: dict[str, Any]
) -> None:
"""Backfill username for existing acked_requests records.
For each acknowledgement, find the corresponding reservation to determine
its hotel_code, then look up the username for that hotel in the config
and update the acked_request record.
This is a startup task that runs after schema migrations to ensure
existing data is consistent with config.
Args:
engine: SQLAlchemy async engine
config: Application configuration dict
"""
_LOGGER.info("Backfilling usernames for existing acked_requests...")
# Build a mapping of hotel_id -> username from config
hotel_usernames = {}
alpine_bits_auth = config.get("alpine_bits_auth", [])
for hotel in alpine_bits_auth:
hotel_id = hotel.get(CONF_HOTEL_ID)
username = hotel.get("username")
if hotel_id and username:
hotel_usernames[hotel_id] = username
if not hotel_usernames:
_LOGGER.debug("No hotel usernames found in config, skipping backfill")
return
_LOGGER.info("Found %d hotel(s) with usernames in config", len(hotel_usernames))
# Update acked_requests with usernames by matching to reservations
total_updated = 0
async with engine.begin() as conn:
for hotel_id, username in hotel_usernames.items():
sql = text(
"""
UPDATE acked_requests
SET username = :username
WHERE unique_id IN (
SELECT md5_unique_id FROM reservations WHERE hotel_id = :hotel_id
)
AND username IS NULL
"""
)
result = await conn.execute(
sql, {"username": username, "hotel_id": hotel_id}
)
count = result.rowcount
if count > 0:
_LOGGER.info(
"Updated %d acknowledgements with username for hotel %s",
count,
hotel_id,
)
total_updated += count
if total_updated > 0:
_LOGGER.info(
"Backfill complete: %d acknowledgements updated with username",
total_updated,
)
async def reprocess_stuck_webhooks(
sessionmaker: async_sessionmaker,
config: dict[str, Any] | None = None,
) -> None:
"""Reprocess webhooks that were stuck in 'processing' state.
Finds webhooks with status='processing' and reprocesses them.
These are webhooks that were not fully processed in the previous run,
likely due to a crash or unexpected shutdown.
This function is designed to NEVER block application startup.
All errors are caught and logged, but the app will start regardless.
Args:
sessionmaker: SQLAlchemy async sessionmaker
config: Application configuration dictionary
"""
try:
_LOGGER.info("Checking for stuck webhooks to reprocess...")
async with sessionmaker() as session:
# Find all webhooks stuck in 'processing' state
result = await session.execute(
select(WebhookRequest)
.where(WebhookRequest.status == WebhookStatus.PROCESSING)
.options(
selectinload(WebhookRequest.webhook_endpoint).selectinload(
WebhookEndpoint.hotel
)
)
)
stuck_webhooks: list[WebhookRequest] = result.scalars().all()
if not stuck_webhooks:
_LOGGER.info("No stuck webhooks found")
return
_LOGGER.info("Found %d stuck webhooks to reprocess", len(stuck_webhooks))
reprocessed_count = 0
failed_count = 0
for webhook_request in stuck_webhooks:
webhook_id = webhook_request.id
webhook_endpoint = webhook_request.webhook_endpoint
if not webhook_endpoint:
_LOGGER.error(
"Webhook request %d has no webhook_endpoint, skipping", webhook_id
)
webhook_request.status = WebhookStatus.FAILED
webhook_request.last_error = (
"No webhook endpoint found during startup reprocessing"
)
webhook_request.processing_completed_at = datetime.now(UTC)
failed_count += 1
continue
if not webhook_request.payload_json:
_LOGGER.error(
"Webhook request %d has no payload (purged?), marking as failed",
webhook_id,
)
webhook_request.status = WebhookStatus.FAILED
webhook_request.last_error = (
"No payload available for reprocessing (purged)"
)
webhook_request.processing_completed_at = datetime.now(UTC)
failed_count += 1
continue
try:
_LOGGER.info(
"Reprocessing webhook %d (hotel=%s, type=%s)",
webhook_id,
webhook_endpoint.hotel_id,
webhook_endpoint.webhook_type,
)
# Get processor for webhook_type
processor = webhook_registry.get_processor(
webhook_endpoint.webhook_type
)
if not processor:
raise ValueError(
f"No processor for type: {webhook_endpoint.webhook_type}"
)
# Reprocess webhook with simplified interface
result = await processor.process(
webhook_request=webhook_request,
db_session=session,
config=config,
)
# Check result status
result_status = result.get("status") if isinstance(result, dict) else "success"
if result_status == "duplicate":
# Duplicate is not an error - mark as completed and continue
webhook_request.status = WebhookStatus.COMPLETED
webhook_request.processing_completed_at = datetime.now(UTC)
reprocessed_count += 1
_LOGGER.info(
"Webhook %d was a duplicate (already processed), marked as completed",
webhook_id
)
elif result_status in ("success", "completed"):
# Update status to completed
webhook_request.status = WebhookStatus.COMPLETED
webhook_request.processing_completed_at = datetime.now(UTC)
reprocessed_count += 1
_LOGGER.info("Successfully reprocessed webhook %d", webhook_id)
else:
# Unexpected status - treat as failure
_LOGGER.warning(
"Webhook %d returned unexpected status: %s",
webhook_id,
result_status
)
webhook_request.status = WebhookStatus.FAILED
webhook_request.last_error = f"Unexpected status: {result_status}"
webhook_request.processing_completed_at = datetime.now(UTC)
failed_count += 1
except Exception as e:
_LOGGER.exception("Failed to reprocess webhook %d: %s", webhook_id, e)
webhook_request.status = WebhookStatus.FAILED
webhook_request.last_error = (
f"Reprocessing failed during startup: {str(e)[:1950]}"
)
webhook_request.processing_completed_at = datetime.now(UTC)
failed_count += 1
# Commit all changes
await session.commit()
_LOGGER.info(
"Webhook reprocessing complete: %d successful, %d failed",
reprocessed_count,
failed_count,
)
except Exception as e:
# CRITICAL: Never let reprocessing block application startup
_LOGGER.exception(
"CRITICAL ERROR during webhook reprocessing, but allowing app to start: %s",
e
)
async def run_startup_tasks(
sessionmaker: async_sessionmaker,
config: dict[str, Any] | None = None,
engine: AsyncEngine | None = None,
) -> None:
"""Run one-time startup tasks.
These are tasks that need to run at startup but are NOT schema migrations.
Examples: data backfills, hashing existing records, etc.
Args:
sessionmaker: SQLAlchemy async sessionmaker
config: Application configuration dictionary
engine: SQLAlchemy async engine (optional, for backfill tasks)
"""
# Sync config to database (hotels and webhook endpoints)
if config:
from .hotel_service import sync_config_to_database
async with sessionmaker() as session:
stats = await sync_config_to_database(session, config)
_LOGGER.info(
"Config sync: %d hotels created, %d updated, %d endpoints created",
stats["hotels_created"],
stats["hotels_updated"],
stats["endpoints_created"]
)
# Hash any existing customers that don't have hashed data
async with sessionmaker() as session:
customer_service = CustomerService(session)
hashed_count = await customer_service.hash_existing_customers()
if hashed_count > 0:
_LOGGER.info(
"Backfilled hashed data for %d existing customers", hashed_count
)
else:
_LOGGER.debug("All existing customers already have hashed data")
# Backfill advertising account IDs and usernames based on config
# This ensures existing data is consistent with current configuration
if config and engine:
await backfill_advertising_account_ids(engine, config)
await backfill_acked_requests_username(engine, config)
elif config and not engine:
_LOGGER.warning(
"No engine provided to run_startup_tasks, "
"skipping config-based backfill tasks"
)
# Reprocess stuck webhooks (those stuck in 'processing' state)
await reprocess_stuck_webhooks(sessionmaker, config)