Files
alpinebits_python/src/alpine_bits_python/migrations.py
Jonas Linter ccdc66fb9b Looking good
2025-11-18 14:49:44 +01:00

528 lines
18 KiB
Python

"""DEPRECATED: Legacy database migrations for AlpineBits.
⚠️ WARNING: This module is deprecated and no longer used. ⚠️
SCHEMA MIGRATIONS are now handled by Alembic (see alembic/versions/).
STARTUP TASKS (data backfills) are now in db_setup.py.
Migration History:
- migrate_add_room_types: Schema migration (should be in Alembic)
- migrate_add_advertising_account_ids: Schema + backfill (split into Alembic + db_setup.py)
- migrate_add_username_to_acked_requests: Schema + backfill (split into Alembic + db_setup.py)
- migrate_normalize_conversions: Schema migration (should be in Alembic)
Current Status:
- All schema changes are now managed via Alembic migrations
- All data backfills are now in db_setup.py as startup tasks
- This file is kept for reference but is no longer executed
Do not add new migrations here. Instead:
1. For schema changes: Create Alembic migration with `uv run alembic revision --autogenerate -m "description"`
2. For data backfills: Add to db_setup.py as a startup task
"""
from typing import Any
from sqlalchemy import inspect, text
from sqlalchemy.ext.asyncio import AsyncEngine
from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT
from .logging_config import get_logger
_LOGGER = get_logger(__name__)
async def check_column_exists(
engine: AsyncEngine, table_name: str, column_name: str
) -> bool:
"""Check if a column exists in a table.
Args:
engine: SQLAlchemy async engine
table_name: Name of the table to check
column_name: Name of the column to check
Returns:
True if column exists, False otherwise
"""
async with engine.connect() as conn:
def _check(connection):
inspector = inspect(connection)
columns = [col["name"] for col in inspector.get_columns(table_name)]
return column_name in columns
result = await conn.run_sync(_check)
return result
async def add_column_if_not_exists(
engine: AsyncEngine, table_name: str, column_name: str, column_type: str = "VARCHAR"
) -> bool:
"""Add a column to a table if it doesn't already exist.
Args:
engine: SQLAlchemy async engine
table_name: Name of the table
column_name: Name of the column to add
column_type: SQL type of the column (default: VARCHAR)
Returns:
True if column was added, False if it already existed
"""
exists = await check_column_exists(engine, table_name, column_name)
if exists:
_LOGGER.debug("Column %s.%s already exists, skipping", table_name, column_name)
return False
_LOGGER.info("Adding column %s.%s (%s)", table_name, column_name, column_type)
async with engine.begin() as conn:
sql = f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_type}"
await conn.execute(text(sql))
_LOGGER.info("Successfully added column %s.%s", table_name, column_name)
return True
async def migrate_add_room_types(engine: AsyncEngine) -> None:
"""Migration: Add RoomTypes fields to reservations table.
This migration adds three optional fields:
- room_type_code: String (max 8 chars)
- room_classification_code: String (numeric pattern)
- room_type: String (enum: 1-5)
Safe to run multiple times - will skip if columns already exist.
"""
_LOGGER.info("Running migration: add_room_types")
added_count = 0
# Add each column if it doesn't exist
if await add_column_if_not_exists(
engine, "reservations", "room_type_code", "VARCHAR"
):
added_count += 1
if await add_column_if_not_exists(
engine, "reservations", "room_classification_code", "VARCHAR"
):
added_count += 1
if await add_column_if_not_exists(engine, "reservations", "room_type", "VARCHAR"):
added_count += 1
if added_count > 0:
_LOGGER.info("Migration add_room_types: Added %d columns", added_count)
else:
_LOGGER.info("Migration add_room_types: No changes needed (already applied)")
async def migrate_add_advertising_account_ids(
engine: AsyncEngine, config: dict[str, Any] | None = None
) -> None:
"""Migration: Add advertising account ID fields to reservations table.
This migration adds two optional fields:
- meta_account_id: String (Meta/Facebook advertising account ID)
- google_account_id: String (Google advertising account ID)
These fields are populated conditionally based on fbclid/gclid presence.
For existing reservations, backfills account IDs from config based on hotel_code and fbclid/gclid.
Safe to run multiple times - will skip if columns already exist.
Args:
engine: SQLAlchemy async engine
config: Application configuration dict containing hotel account IDs
"""
_LOGGER.info("Running migration: add_advertising_account_ids")
added_count = 0
# Add each column if it doesn't exist
if await add_column_if_not_exists(
engine, "reservations", "meta_account_id", "VARCHAR"
):
added_count += 1
if await add_column_if_not_exists(
engine, "reservations", "google_account_id", "VARCHAR"
):
added_count += 1
if added_count > 0:
_LOGGER.info(
"Migration add_advertising_account_ids: Added %d columns", added_count
)
else:
_LOGGER.info("Migration add_advertising_account_ids: Columns already exist")
# Backfill existing reservations with account IDs based on config and fbclid/gclid presence
if config:
await _backfill_advertising_account_ids(engine, config)
else:
_LOGGER.warning(
"No config provided, skipping backfill of advertising account IDs"
)
async def _backfill_advertising_account_ids(
engine: AsyncEngine, config: dict[str, Any]
) -> None:
"""Backfill advertising account IDs for existing reservations.
Updates existing reservations to populate meta_account_id and google_account_id
based on the conditional logic:
- If fbclid is present, set meta_account_id from hotel config
- If gclid is present, set google_account_id from hotel config
Args:
engine: SQLAlchemy async engine
config: Application configuration dict
"""
_LOGGER.info("Backfilling advertising account IDs for existing reservations...")
# Build a mapping of hotel_id -> account IDs from config
hotel_accounts = {}
alpine_bits_auth = config.get("alpine_bits_auth", [])
for hotel in alpine_bits_auth:
hotel_id = hotel.get(CONF_HOTEL_ID)
meta_account = hotel.get(CONF_META_ACCOUNT)
google_account = hotel.get(CONF_GOOGLE_ACCOUNT)
if hotel_id:
hotel_accounts[hotel_id] = {
"meta_account": meta_account,
"google_account": google_account,
}
if not hotel_accounts:
_LOGGER.info("No hotel accounts found in config, skipping backfill")
return
_LOGGER.info("Found %d hotel(s) with account configurations", len(hotel_accounts))
# Update reservations with meta_account_id where fbclid is present
meta_updated = 0
for hotel_id, accounts in hotel_accounts.items():
if accounts["meta_account"]:
async with engine.begin() as conn:
sql = text(
"UPDATE reservations "
"SET meta_account_id = :meta_account "
"WHERE hotel_code = :hotel_id "
"AND fbclid IS NOT NULL "
"AND fbclid != '' "
"AND (meta_account_id IS NULL OR meta_account_id = '')"
)
result = await conn.execute(
sql,
{"meta_account": accounts["meta_account"], "hotel_id": hotel_id},
)
count = result.rowcount
if count > 0:
_LOGGER.info(
"Updated %d reservations with meta_account_id for hotel %s",
count,
hotel_id,
)
meta_updated += count
# Update reservations with google_account_id where gclid is present
google_updated = 0
for hotel_id, accounts in hotel_accounts.items():
if accounts["google_account"]:
async with engine.begin() as conn:
sql = text(
"UPDATE reservations "
"SET google_account_id = :google_account "
"WHERE hotel_code = :hotel_id "
"AND gclid IS NOT NULL "
"AND gclid != '' "
"AND (google_account_id IS NULL OR google_account_id = '')"
)
result = await conn.execute(
sql,
{
"google_account": accounts["google_account"],
"hotel_id": hotel_id,
},
)
count = result.rowcount
if count > 0:
_LOGGER.info(
"Updated %d reservations with google_account_id for hotel %s",
count,
hotel_id,
)
google_updated += count
_LOGGER.info(
"Backfill complete: %d reservations updated with meta_account_id, %d with google_account_id",
meta_updated,
google_updated,
)
async def migrate_add_username_to_acked_requests(
engine: AsyncEngine, config: dict[str, Any] | None = None
) -> None:
"""Migration: Add username column to acked_requests table and backfill with hotel usernames.
This migration adds a username column to acked_requests to track acknowledgements by username
instead of just client_id. This improves consistency since client_ids can change but usernames are stable.
For existing acknowledgements, this migration queries reservations to determine the hotel_code,
then looks up the corresponding username from the config and populates the new column.
Safe to run multiple times - will skip if column already exists.
Args:
engine: SQLAlchemy async engine
config: Application configuration dict containing hotel usernames
"""
_LOGGER.info("Running migration: add_username_to_acked_requests")
# Add the username column if it doesn't exist
if await add_column_if_not_exists(engine, "acked_requests", "username", "VARCHAR"):
_LOGGER.info("Added username column to acked_requests table")
else:
_LOGGER.info("Username column already exists in acked_requests, skipping")
return
# Backfill existing acknowledgements with username from config
if config:
await _backfill_acked_requests_username(engine, config)
else:
_LOGGER.warning(
"No config provided, skipping backfill of acked_requests usernames"
)
async def _backfill_acked_requests_username(
engine: AsyncEngine, config: dict[str, Any]
) -> None:
"""Backfill username for existing acked_requests records.
For each acknowledgement, find the corresponding reservation to determine its hotel_code,
then look up the username for that hotel in the config and update the acked_request record.
Args:
engine: SQLAlchemy async engine
config: Application configuration dict
"""
_LOGGER.info("Backfilling usernames for existing acked_requests...")
# Build a mapping of hotel_id -> username from config
hotel_usernames = {}
alpine_bits_auth = config.get("alpine_bits_auth", [])
for hotel in alpine_bits_auth:
hotel_id = hotel.get(CONF_HOTEL_ID)
username = hotel.get("username")
if hotel_id and username:
hotel_usernames[hotel_id] = username
if not hotel_usernames:
_LOGGER.info("No hotel usernames found in config, skipping backfill")
return
_LOGGER.info("Found %d hotel(s) with usernames in config", len(hotel_usernames))
# Update acked_requests with usernames by matching to reservations
total_updated = 0
async with engine.begin() as conn:
for hotel_id, username in hotel_usernames.items():
sql = text("""
UPDATE acked_requests
SET username = :username
WHERE unique_id IN (
SELECT md5_unique_id FROM reservations WHERE hotel_code = :hotel_id
)
AND username IS NULL
""")
result = await conn.execute(
sql, {"username": username, "hotel_id": hotel_id}
)
count = result.rowcount
if count > 0:
_LOGGER.info(
"Updated %d acknowledgements with username for hotel %s",
count,
hotel_id,
)
total_updated += count
_LOGGER.info(
"Backfill complete: %d acknowledgements updated with username", total_updated
)
async def table_exists(engine: AsyncEngine, table_name: str) -> bool:
"""Check if a table exists in the database.
Args:
engine: SQLAlchemy async engine
table_name: Name of the table to check
Returns:
True if table exists, False otherwise
"""
async with engine.connect() as conn:
def _check(connection):
inspector = inspect(connection)
return table_name in inspector.get_table_names()
return await conn.run_sync(_check)
async def drop_table(engine: AsyncEngine, table_name: str) -> None:
"""Drop a table from the database.
Args:
engine: SQLAlchemy async engine
table_name: Name of the table to drop
"""
async with engine.begin() as conn:
await conn.execute(text(f"DROP TABLE IF EXISTS {table_name}"))
_LOGGER.info("Dropped table: %s", table_name)
async def migrate_normalize_conversions(engine: AsyncEngine) -> None:
"""Migration: Normalize conversions and room reservations structure.
This migration redesigns the conversion data structure:
- conversions: One row per PMS reservation (with guest/advertising metadata)
- conversion_rooms: One row per room reservation (linked to conversion)
- daily_sales: JSON array of daily sales within each room reservation
- total_revenue: Extracted sum of all daily sales for efficiency
Old structure: One row per daily sale (denormalized, lots of duplication)
New structure: One row per room reservation, daily sales as JSON with extracted total
This allows:
- Upserts on room reservations (same room doesn't get duplicated)
- Better tracking of room data separate from daily sales data
- Efficient querying via extracted total_revenue field
- All daily sales details preserved in JSON for analysis
The new tables are created via Base.metadata.create_all() at startup.
This migration handles cleanup of old schema versions.
Safe to run multiple times - idempotent.
"""
_LOGGER.info("Running migration: normalize_conversions")
# Check if the old conversions table exists with the old schema
# If the table exists but doesn't match our current schema definition, drop it
old_conversions_exists = await table_exists(engine, "conversions")
if old_conversions_exists:
# Check if this is the old-style table (we'll look for unexpected columns)
# The old table would not have the new structure we've defined
async with engine.connect() as conn:
def _get_columns(connection):
inspector = inspect(connection)
return [col["name"] for col in inspector.get_columns("conversions")]
old_columns = await conn.run_sync(_get_columns)
# Expected columns in the new schema (defined in db.py)
# If the table is missing key columns from our schema, it's the old version
expected_columns = {
"id",
"reservation_id",
"customer_id",
"hashed_customer_id",
"hotel_id",
"pms_reservation_id",
"reservation_number",
"reservation_date",
"creation_time",
"reservation_type",
"booking_channel",
"guest_first_name",
"guest_last_name",
"guest_email",
"guest_country_code",
"advertising_medium",
"advertising_partner",
"advertising_campagne",
"created_at",
"updated_at",
}
old_columns_set = set(old_columns)
# If we're missing critical new columns, this is the old schema
if not expected_columns.issubset(old_columns_set):
_LOGGER.info(
"Found old conversions table with incompatible schema. "
"Old columns: %s. Expected new columns: %s",
old_columns,
expected_columns,
)
await drop_table(engine, "conversions")
_LOGGER.info(
"Dropped old conversions table to allow creation of new schema"
)
else:
_LOGGER.info(
"Conversions table exists with compatible schema, no migration needed"
)
# Check for the old conversion_rooms table (which should not exist in the new schema)
old_conversion_rooms_exists = await table_exists(engine, "conversion_rooms")
if old_conversion_rooms_exists:
await drop_table(engine, "conversion_rooms")
_LOGGER.info("Dropped old conversion_rooms table")
_LOGGER.info(
"Migration normalize_conversions: Conversion data structure normalized. "
"New tables (conversions + conversion_rooms) will be created/updated via "
"Base.metadata.create_all()"
)
async def run_all_migrations(
engine: AsyncEngine, config: dict[str, Any] | None = None
) -> None:
"""Run all pending migrations.
This function should be called at app startup, after Base.metadata.create_all.
Each migration function should be idempotent (safe to run multiple times).
Args:
engine: SQLAlchemy async engine
config: Application configuration dict (optional, but required for some migrations)
"""
_LOGGER.info("Starting database migrations...")
try:
# Add new migrations here in chronological order
await migrate_add_room_types(engine)
await migrate_add_advertising_account_ids(engine, config)
await migrate_add_username_to_acked_requests(engine, config)
await migrate_normalize_conversions(engine)
_LOGGER.info("Database migrations completed successfully")
except Exception as e:
_LOGGER.exception("Migration failed: %s", e)
raise