410 lines
15 KiB
Python
410 lines
15 KiB
Python
"""Database setup and initialization.
|
|
|
|
This module handles all database setup tasks that should run once at startup,
|
|
before the application starts accepting requests. It includes:
|
|
- Schema migrations via Alembic
|
|
- One-time data cleanup/backfill tasks (e.g., hashing existing customers)
|
|
"""
|
|
|
|
import asyncio
|
|
from datetime import UTC, datetime
|
|
from typing import Any
|
|
|
|
from sqlalchemy import select, text
|
|
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT, WebhookStatus
|
|
from .customer_service import CustomerService
|
|
from .db import WebhookEndpoint, WebhookRequest, create_database_engine
|
|
from .logging_config import get_logger
|
|
from .webhook_processor import webhook_registry
|
|
|
|
_LOGGER = get_logger(__name__)
|
|
|
|
|
|
async def setup_database(config: dict[str, Any] | None = None) -> tuple[AsyncEngine, async_sessionmaker]:
|
|
"""Set up the database and prepare for application use.
|
|
|
|
This function should be called once at application startup, after
|
|
migrations have been run but before the app starts accepting requests. It:
|
|
1. Creates the async engine
|
|
2. Creates the sessionmaker
|
|
3. Performs one-time startup tasks (e.g., hashing existing customers)
|
|
|
|
NOTE: Database migrations should be run BEFORE calling this function,
|
|
typically using `uv run alembic upgrade head` or via run_migrations.py.
|
|
|
|
Args:
|
|
config: Application configuration dictionary
|
|
|
|
Returns:
|
|
Tuple of (engine, async_sessionmaker) for use in the application
|
|
|
|
Raises:
|
|
Any database-related exceptions that occur during setup
|
|
"""
|
|
_LOGGER.info("Starting database setup...")
|
|
|
|
# Create database engine
|
|
engine = create_database_engine(config=config, echo=False)
|
|
|
|
try:
|
|
# Create sessionmaker for the application to use
|
|
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
|
|
|
|
# Perform startup tasks (NOT migrations)
|
|
_LOGGER.info("Running startup tasks...")
|
|
await run_startup_tasks(AsyncSessionLocal, config)
|
|
_LOGGER.info("Startup tasks completed successfully")
|
|
|
|
_LOGGER.info("Database setup completed successfully")
|
|
return engine, AsyncSessionLocal
|
|
|
|
except Exception as e:
|
|
_LOGGER.exception("Database setup failed: %s", e)
|
|
await engine.dispose()
|
|
raise
|
|
|
|
|
|
async def backfill_advertising_account_ids(
|
|
engine: AsyncEngine, config: dict[str, Any]
|
|
) -> None:
|
|
"""Backfill advertising account IDs for existing reservations.
|
|
|
|
Updates existing reservations to populate meta_account_id and google_account_id
|
|
based on the conditional logic:
|
|
- If fbclid is present, set meta_account_id from hotel config
|
|
- If gclid is present, set google_account_id from hotel config
|
|
|
|
This is a startup task that runs after schema migrations to ensure
|
|
existing data is consistent with config.
|
|
|
|
Args:
|
|
engine: SQLAlchemy async engine
|
|
config: Application configuration dict
|
|
"""
|
|
_LOGGER.info("Backfilling advertising account IDs for existing reservations...")
|
|
|
|
# Build a mapping of hotel_id -> account IDs from config
|
|
hotel_accounts = {}
|
|
alpine_bits_auth = config.get("alpine_bits_auth", [])
|
|
|
|
for hotel in alpine_bits_auth:
|
|
hotel_id = hotel.get(CONF_HOTEL_ID)
|
|
meta_account = hotel.get(CONF_META_ACCOUNT)
|
|
google_account = hotel.get(CONF_GOOGLE_ACCOUNT)
|
|
|
|
if hotel_id:
|
|
hotel_accounts[hotel_id] = {
|
|
"meta_account": meta_account,
|
|
"google_account": google_account,
|
|
}
|
|
|
|
if not hotel_accounts:
|
|
_LOGGER.debug("No hotel accounts found in config, skipping backfill")
|
|
return
|
|
|
|
_LOGGER.info("Found %d hotel(s) with account configurations", len(hotel_accounts))
|
|
|
|
# Update reservations with meta_account_id where fbclid is present
|
|
meta_updated = 0
|
|
for hotel_id, accounts in hotel_accounts.items():
|
|
if accounts["meta_account"]:
|
|
async with engine.begin() as conn:
|
|
sql = text(
|
|
"UPDATE reservations "
|
|
"SET meta_account_id = :meta_account "
|
|
"WHERE hotel_code = :hotel_id "
|
|
"AND fbclid IS NOT NULL "
|
|
"AND fbclid != '' "
|
|
"AND (meta_account_id IS NULL OR meta_account_id = '')"
|
|
)
|
|
result = await conn.execute(
|
|
sql,
|
|
{"meta_account": accounts["meta_account"], "hotel_id": hotel_id},
|
|
)
|
|
count = result.rowcount
|
|
if count > 0:
|
|
_LOGGER.info(
|
|
"Updated %d reservations with meta_account_id for hotel %s",
|
|
count,
|
|
hotel_id,
|
|
)
|
|
meta_updated += count
|
|
|
|
# Update reservations with google_account_id where gclid is present
|
|
google_updated = 0
|
|
for hotel_id, accounts in hotel_accounts.items():
|
|
if accounts["google_account"]:
|
|
async with engine.begin() as conn:
|
|
sql = text(
|
|
"UPDATE reservations "
|
|
"SET google_account_id = :google_account "
|
|
"WHERE hotel_code = :hotel_id "
|
|
"AND gclid IS NOT NULL "
|
|
"AND gclid != '' "
|
|
"AND (google_account_id IS NULL OR google_account_id = '')"
|
|
)
|
|
result = await conn.execute(
|
|
sql,
|
|
{
|
|
"google_account": accounts["google_account"],
|
|
"hotel_id": hotel_id,
|
|
},
|
|
)
|
|
count = result.rowcount
|
|
if count > 0:
|
|
_LOGGER.info(
|
|
"Updated %d reservations with google_account_id for hotel %s",
|
|
count,
|
|
hotel_id,
|
|
)
|
|
google_updated += count
|
|
|
|
if meta_updated > 0 or google_updated > 0:
|
|
_LOGGER.info(
|
|
"Backfill complete: %d reservations updated with meta_account_id, "
|
|
"%d with google_account_id",
|
|
meta_updated,
|
|
google_updated,
|
|
)
|
|
|
|
|
|
async def backfill_acked_requests_username(
|
|
engine: AsyncEngine, config: dict[str, Any]
|
|
) -> None:
|
|
"""Backfill username for existing acked_requests records.
|
|
|
|
For each acknowledgement, find the corresponding reservation to determine
|
|
its hotel_code, then look up the username for that hotel in the config
|
|
and update the acked_request record.
|
|
|
|
This is a startup task that runs after schema migrations to ensure
|
|
existing data is consistent with config.
|
|
|
|
Args:
|
|
engine: SQLAlchemy async engine
|
|
config: Application configuration dict
|
|
"""
|
|
_LOGGER.info("Backfilling usernames for existing acked_requests...")
|
|
|
|
# Build a mapping of hotel_id -> username from config
|
|
hotel_usernames = {}
|
|
alpine_bits_auth = config.get("alpine_bits_auth", [])
|
|
|
|
for hotel in alpine_bits_auth:
|
|
hotel_id = hotel.get(CONF_HOTEL_ID)
|
|
username = hotel.get("username")
|
|
|
|
if hotel_id and username:
|
|
hotel_usernames[hotel_id] = username
|
|
|
|
if not hotel_usernames:
|
|
_LOGGER.debug("No hotel usernames found in config, skipping backfill")
|
|
return
|
|
|
|
_LOGGER.info("Found %d hotel(s) with usernames in config", len(hotel_usernames))
|
|
|
|
# Update acked_requests with usernames by matching to reservations
|
|
total_updated = 0
|
|
async with engine.begin() as conn:
|
|
for hotel_id, username in hotel_usernames.items():
|
|
sql = text(
|
|
"""
|
|
UPDATE acked_requests
|
|
SET username = :username
|
|
WHERE unique_id IN (
|
|
SELECT md5_unique_id FROM reservations WHERE hotel_code = :hotel_id
|
|
)
|
|
AND username IS NULL
|
|
"""
|
|
)
|
|
result = await conn.execute(
|
|
sql, {"username": username, "hotel_id": hotel_id}
|
|
)
|
|
count = result.rowcount
|
|
if count > 0:
|
|
_LOGGER.info(
|
|
"Updated %d acknowledgements with username for hotel %s",
|
|
count,
|
|
hotel_id,
|
|
)
|
|
total_updated += count
|
|
|
|
if total_updated > 0:
|
|
_LOGGER.info(
|
|
"Backfill complete: %d acknowledgements updated with username",
|
|
total_updated,
|
|
)
|
|
|
|
|
|
async def reprocess_stuck_webhooks(
|
|
sessionmaker: async_sessionmaker,
|
|
config: dict[str, Any] | None = None,
|
|
) -> None:
|
|
"""Reprocess webhooks that were stuck in 'processing' state.
|
|
|
|
Finds webhooks with status='processing' and reprocesses them.
|
|
These are webhooks that were not fully processed in the previous run,
|
|
likely due to a crash or unexpected shutdown.
|
|
|
|
Args:
|
|
sessionmaker: SQLAlchemy async sessionmaker
|
|
config: Application configuration dictionary
|
|
"""
|
|
_LOGGER.info("Checking for stuck webhooks to reprocess...")
|
|
|
|
async with sessionmaker() as session:
|
|
# Find all webhooks stuck in 'processing' state
|
|
result = await session.execute(
|
|
select(WebhookRequest)
|
|
.where(WebhookRequest.status == WebhookStatus.PROCESSING)
|
|
.options(
|
|
selectinload(WebhookRequest.webhook_endpoint).selectinload(
|
|
WebhookEndpoint.hotel
|
|
)
|
|
)
|
|
)
|
|
stuck_webhooks = result.scalars().all()
|
|
|
|
if not stuck_webhooks:
|
|
_LOGGER.info("No stuck webhooks found")
|
|
return
|
|
|
|
_LOGGER.info("Found %d stuck webhooks to reprocess", len(stuck_webhooks))
|
|
|
|
reprocessed_count = 0
|
|
failed_count = 0
|
|
|
|
for webhook_request in stuck_webhooks:
|
|
webhook_id = webhook_request.id
|
|
webhook_endpoint = webhook_request.webhook_endpoint
|
|
|
|
if not webhook_endpoint:
|
|
_LOGGER.error(
|
|
"Webhook request %d has no webhook_endpoint, skipping", webhook_id
|
|
)
|
|
webhook_request.status = WebhookStatus.FAILED
|
|
webhook_request.last_error = (
|
|
"No webhook endpoint found during startup reprocessing"
|
|
)
|
|
webhook_request.processing_completed_at = datetime.now(UTC)
|
|
failed_count += 1
|
|
continue
|
|
|
|
if not webhook_request.payload_json:
|
|
_LOGGER.error(
|
|
"Webhook request %d has no payload (purged?), marking as failed",
|
|
webhook_id,
|
|
)
|
|
webhook_request.status = WebhookStatus.FAILED
|
|
webhook_request.last_error = (
|
|
"No payload available for reprocessing (purged)"
|
|
)
|
|
webhook_request.processing_completed_at = datetime.now(UTC)
|
|
failed_count += 1
|
|
continue
|
|
|
|
try:
|
|
_LOGGER.info(
|
|
"Reprocessing webhook %d (hotel=%s, type=%s)",
|
|
webhook_id,
|
|
webhook_endpoint.hotel_id,
|
|
webhook_endpoint.webhook_type,
|
|
)
|
|
|
|
# Get processor for webhook_type
|
|
processor = webhook_registry.get_processor(
|
|
webhook_endpoint.webhook_type
|
|
)
|
|
if not processor:
|
|
raise ValueError(
|
|
f"No processor for type: {webhook_endpoint.webhook_type}"
|
|
)
|
|
|
|
# Reprocess webhook with simplified interface
|
|
await processor.process(
|
|
webhook_request=webhook_request,
|
|
db_session=session,
|
|
config=config,
|
|
)
|
|
|
|
# Update status to completed
|
|
webhook_request.status = WebhookStatus.COMPLETED
|
|
webhook_request.processing_completed_at = datetime.now(UTC)
|
|
reprocessed_count += 1
|
|
|
|
_LOGGER.info("Successfully reprocessed webhook %d", webhook_id)
|
|
|
|
except Exception as e:
|
|
_LOGGER.exception("Failed to reprocess webhook %d: %s", webhook_id, e)
|
|
webhook_request.status = WebhookStatus.FAILED
|
|
webhook_request.last_error = (
|
|
f"Reprocessing failed during startup: {str(e)[:1950]}"
|
|
)
|
|
webhook_request.processing_completed_at = datetime.now(UTC)
|
|
failed_count += 1
|
|
|
|
# Commit all changes
|
|
await session.commit()
|
|
|
|
_LOGGER.info(
|
|
"Webhook reprocessing complete: %d successful, %d failed",
|
|
reprocessed_count,
|
|
failed_count,
|
|
)
|
|
|
|
|
|
async def run_startup_tasks(
|
|
sessionmaker: async_sessionmaker,
|
|
config: dict[str, Any] | None = None,
|
|
engine: AsyncEngine | None = None,
|
|
) -> None:
|
|
"""Run one-time startup tasks.
|
|
|
|
These are tasks that need to run at startup but are NOT schema migrations.
|
|
Examples: data backfills, hashing existing records, etc.
|
|
|
|
Args:
|
|
sessionmaker: SQLAlchemy async sessionmaker
|
|
config: Application configuration dictionary
|
|
engine: SQLAlchemy async engine (optional, for backfill tasks)
|
|
"""
|
|
# Sync config to database (hotels and webhook endpoints)
|
|
if config:
|
|
from .hotel_service import sync_config_to_database
|
|
async with sessionmaker() as session:
|
|
stats = await sync_config_to_database(session, config)
|
|
_LOGGER.info(
|
|
"Config sync: %d hotels created, %d updated, %d endpoints created",
|
|
stats["hotels_created"],
|
|
stats["hotels_updated"],
|
|
stats["endpoints_created"]
|
|
)
|
|
|
|
# Hash any existing customers that don't have hashed data
|
|
async with sessionmaker() as session:
|
|
customer_service = CustomerService(session)
|
|
hashed_count = await customer_service.hash_existing_customers()
|
|
if hashed_count > 0:
|
|
_LOGGER.info(
|
|
"Backfilled hashed data for %d existing customers", hashed_count
|
|
)
|
|
else:
|
|
_LOGGER.debug("All existing customers already have hashed data")
|
|
|
|
# Backfill advertising account IDs and usernames based on config
|
|
# This ensures existing data is consistent with current configuration
|
|
if config and engine:
|
|
await backfill_advertising_account_ids(engine, config)
|
|
await backfill_acked_requests_username(engine, config)
|
|
elif config and not engine:
|
|
_LOGGER.warning(
|
|
"No engine provided to run_startup_tasks, "
|
|
"skipping config-based backfill tasks"
|
|
)
|
|
|
|
# Reprocess stuck webhooks (those stuck in 'processing' state)
|
|
await reprocess_stuck_webhooks(sessionmaker, config)
|