Files
alpinebits_python/src/alpine_bits_python/db_setup.py
Jonas Linter ccdc66fb9b Looking good
2025-11-18 14:49:44 +01:00

275 lines
9.7 KiB
Python

"""Database setup and initialization.
This module handles all database setup tasks that should run once at startup,
before the application starts accepting requests. It includes:
- Schema migrations via Alembic
- One-time data cleanup/backfill tasks (e.g., hashing existing customers)
"""
import asyncio
from typing import Any
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker
from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT
from .customer_service import CustomerService
from .db import create_database_engine
from .logging_config import get_logger
_LOGGER = get_logger(__name__)
async def setup_database(config: dict[str, Any] | None = None) -> tuple[AsyncEngine, async_sessionmaker]:
"""Set up the database and prepare for application use.
This function should be called once at application startup, after
migrations have been run but before the app starts accepting requests. It:
1. Creates the async engine
2. Creates the sessionmaker
3. Performs one-time startup tasks (e.g., hashing existing customers)
NOTE: Database migrations should be run BEFORE calling this function,
typically using `uv run alembic upgrade head` or via run_migrations.py.
Args:
config: Application configuration dictionary
Returns:
Tuple of (engine, async_sessionmaker) for use in the application
Raises:
Any database-related exceptions that occur during setup
"""
_LOGGER.info("Starting database setup...")
# Create database engine
engine = create_database_engine(config=config, echo=False)
try:
# Create sessionmaker for the application to use
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
# Perform startup tasks (NOT migrations)
_LOGGER.info("Running startup tasks...")
await run_startup_tasks(AsyncSessionLocal, config)
_LOGGER.info("Startup tasks completed successfully")
_LOGGER.info("Database setup completed successfully")
return engine, AsyncSessionLocal
except Exception as e:
_LOGGER.exception("Database setup failed: %s", e)
await engine.dispose()
raise
async def backfill_advertising_account_ids(
engine: AsyncEngine, config: dict[str, Any]
) -> None:
"""Backfill advertising account IDs for existing reservations.
Updates existing reservations to populate meta_account_id and google_account_id
based on the conditional logic:
- If fbclid is present, set meta_account_id from hotel config
- If gclid is present, set google_account_id from hotel config
This is a startup task that runs after schema migrations to ensure
existing data is consistent with config.
Args:
engine: SQLAlchemy async engine
config: Application configuration dict
"""
_LOGGER.info("Backfilling advertising account IDs for existing reservations...")
# Build a mapping of hotel_id -> account IDs from config
hotel_accounts = {}
alpine_bits_auth = config.get("alpine_bits_auth", [])
for hotel in alpine_bits_auth:
hotel_id = hotel.get(CONF_HOTEL_ID)
meta_account = hotel.get(CONF_META_ACCOUNT)
google_account = hotel.get(CONF_GOOGLE_ACCOUNT)
if hotel_id:
hotel_accounts[hotel_id] = {
"meta_account": meta_account,
"google_account": google_account,
}
if not hotel_accounts:
_LOGGER.debug("No hotel accounts found in config, skipping backfill")
return
_LOGGER.info("Found %d hotel(s) with account configurations", len(hotel_accounts))
# Update reservations with meta_account_id where fbclid is present
meta_updated = 0
for hotel_id, accounts in hotel_accounts.items():
if accounts["meta_account"]:
async with engine.begin() as conn:
sql = text(
"UPDATE reservations "
"SET meta_account_id = :meta_account "
"WHERE hotel_code = :hotel_id "
"AND fbclid IS NOT NULL "
"AND fbclid != '' "
"AND (meta_account_id IS NULL OR meta_account_id = '')"
)
result = await conn.execute(
sql,
{"meta_account": accounts["meta_account"], "hotel_id": hotel_id},
)
count = result.rowcount
if count > 0:
_LOGGER.info(
"Updated %d reservations with meta_account_id for hotel %s",
count,
hotel_id,
)
meta_updated += count
# Update reservations with google_account_id where gclid is present
google_updated = 0
for hotel_id, accounts in hotel_accounts.items():
if accounts["google_account"]:
async with engine.begin() as conn:
sql = text(
"UPDATE reservations "
"SET google_account_id = :google_account "
"WHERE hotel_code = :hotel_id "
"AND gclid IS NOT NULL "
"AND gclid != '' "
"AND (google_account_id IS NULL OR google_account_id = '')"
)
result = await conn.execute(
sql,
{
"google_account": accounts["google_account"],
"hotel_id": hotel_id,
},
)
count = result.rowcount
if count > 0:
_LOGGER.info(
"Updated %d reservations with google_account_id for hotel %s",
count,
hotel_id,
)
google_updated += count
if meta_updated > 0 or google_updated > 0:
_LOGGER.info(
"Backfill complete: %d reservations updated with meta_account_id, "
"%d with google_account_id",
meta_updated,
google_updated,
)
async def backfill_acked_requests_username(
engine: AsyncEngine, config: dict[str, Any]
) -> None:
"""Backfill username for existing acked_requests records.
For each acknowledgement, find the corresponding reservation to determine
its hotel_code, then look up the username for that hotel in the config
and update the acked_request record.
This is a startup task that runs after schema migrations to ensure
existing data is consistent with config.
Args:
engine: SQLAlchemy async engine
config: Application configuration dict
"""
_LOGGER.info("Backfilling usernames for existing acked_requests...")
# Build a mapping of hotel_id -> username from config
hotel_usernames = {}
alpine_bits_auth = config.get("alpine_bits_auth", [])
for hotel in alpine_bits_auth:
hotel_id = hotel.get(CONF_HOTEL_ID)
username = hotel.get("username")
if hotel_id and username:
hotel_usernames[hotel_id] = username
if not hotel_usernames:
_LOGGER.debug("No hotel usernames found in config, skipping backfill")
return
_LOGGER.info("Found %d hotel(s) with usernames in config", len(hotel_usernames))
# Update acked_requests with usernames by matching to reservations
total_updated = 0
async with engine.begin() as conn:
for hotel_id, username in hotel_usernames.items():
sql = text(
"""
UPDATE acked_requests
SET username = :username
WHERE unique_id IN (
SELECT md5_unique_id FROM reservations WHERE hotel_code = :hotel_id
)
AND username IS NULL
"""
)
result = await conn.execute(
sql, {"username": username, "hotel_id": hotel_id}
)
count = result.rowcount
if count > 0:
_LOGGER.info(
"Updated %d acknowledgements with username for hotel %s",
count,
hotel_id,
)
total_updated += count
if total_updated > 0:
_LOGGER.info(
"Backfill complete: %d acknowledgements updated with username",
total_updated,
)
async def run_startup_tasks(
sessionmaker: async_sessionmaker,
config: dict[str, Any] | None = None,
engine: AsyncEngine | None = None,
) -> None:
"""Run one-time startup tasks.
These are tasks that need to run at startup but are NOT schema migrations.
Examples: data backfills, hashing existing records, etc.
Args:
sessionmaker: SQLAlchemy async sessionmaker
config: Application configuration dictionary
engine: SQLAlchemy async engine (optional, for backfill tasks)
"""
# Hash any existing customers that don't have hashed data
async with sessionmaker() as session:
customer_service = CustomerService(session)
hashed_count = await customer_service.hash_existing_customers()
if hashed_count > 0:
_LOGGER.info(
"Backfilled hashed data for %d existing customers", hashed_count
)
else:
_LOGGER.debug("All existing customers already have hashed data")
# Backfill advertising account IDs and usernames based on config
# This ensures existing data is consistent with current configuration
if config and engine:
await backfill_advertising_account_ids(engine, config)
await backfill_acked_requests_username(engine, config)
elif config and not engine:
_LOGGER.warning(
"No engine provided to run_startup_tasks, "
"skipping config-based backfill tasks"
)