"""DEPRECATED: Legacy database migrations for AlpineBits. ⚠️ WARNING: This module is deprecated and no longer used. ⚠️ SCHEMA MIGRATIONS are now handled by Alembic (see alembic/versions/). STARTUP TASKS (data backfills) are now in db_setup.py. Migration History: - migrate_add_room_types: Schema migration (should be in Alembic) - migrate_add_advertising_account_ids: Schema + backfill (split into Alembic + db_setup.py) - migrate_add_username_to_acked_requests: Schema + backfill (split into Alembic + db_setup.py) - migrate_normalize_conversions: Schema migration (should be in Alembic) Current Status: - All schema changes are now managed via Alembic migrations - All data backfills are now in db_setup.py as startup tasks - This file is kept for reference but is no longer executed Do not add new migrations here. Instead: 1. For schema changes: Create Alembic migration with `uv run alembic revision --autogenerate -m "description"` 2. For data backfills: Add to db_setup.py as a startup task """ from typing import Any from sqlalchemy import inspect, text from sqlalchemy.ext.asyncio import AsyncEngine from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT from .logging_config import get_logger _LOGGER = get_logger(__name__) async def check_column_exists( engine: AsyncEngine, table_name: str, column_name: str ) -> bool: """Check if a column exists in a table. Args: engine: SQLAlchemy async engine table_name: Name of the table to check column_name: Name of the column to check Returns: True if column exists, False otherwise """ async with engine.connect() as conn: def _check(connection): inspector = inspect(connection) columns = [col["name"] for col in inspector.get_columns(table_name)] return column_name in columns result = await conn.run_sync(_check) return result async def add_column_if_not_exists( engine: AsyncEngine, table_name: str, column_name: str, column_type: str = "VARCHAR" ) -> bool: """Add a column to a table if it doesn't already exist. Args: engine: SQLAlchemy async engine table_name: Name of the table column_name: Name of the column to add column_type: SQL type of the column (default: VARCHAR) Returns: True if column was added, False if it already existed """ exists = await check_column_exists(engine, table_name, column_name) if exists: _LOGGER.debug("Column %s.%s already exists, skipping", table_name, column_name) return False _LOGGER.info("Adding column %s.%s (%s)", table_name, column_name, column_type) async with engine.begin() as conn: sql = f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_type}" await conn.execute(text(sql)) _LOGGER.info("Successfully added column %s.%s", table_name, column_name) return True async def migrate_add_room_types(engine: AsyncEngine) -> None: """Migration: Add RoomTypes fields to reservations table. This migration adds three optional fields: - room_type_code: String (max 8 chars) - room_classification_code: String (numeric pattern) - room_type: String (enum: 1-5) Safe to run multiple times - will skip if columns already exist. """ _LOGGER.info("Running migration: add_room_types") added_count = 0 # Add each column if it doesn't exist if await add_column_if_not_exists( engine, "reservations", "room_type_code", "VARCHAR" ): added_count += 1 if await add_column_if_not_exists( engine, "reservations", "room_classification_code", "VARCHAR" ): added_count += 1 if await add_column_if_not_exists(engine, "reservations", "room_type", "VARCHAR"): added_count += 1 if added_count > 0: _LOGGER.info("Migration add_room_types: Added %d columns", added_count) else: _LOGGER.info("Migration add_room_types: No changes needed (already applied)") async def migrate_add_advertising_account_ids( engine: AsyncEngine, config: dict[str, Any] | None = None ) -> None: """Migration: Add advertising account ID fields to reservations table. This migration adds two optional fields: - meta_account_id: String (Meta/Facebook advertising account ID) - google_account_id: String (Google advertising account ID) These fields are populated conditionally based on fbclid/gclid presence. For existing reservations, backfills account IDs from config based on hotel_code and fbclid/gclid. Safe to run multiple times - will skip if columns already exist. Args: engine: SQLAlchemy async engine config: Application configuration dict containing hotel account IDs """ _LOGGER.info("Running migration: add_advertising_account_ids") added_count = 0 # Add each column if it doesn't exist if await add_column_if_not_exists( engine, "reservations", "meta_account_id", "VARCHAR" ): added_count += 1 if await add_column_if_not_exists( engine, "reservations", "google_account_id", "VARCHAR" ): added_count += 1 if added_count > 0: _LOGGER.info( "Migration add_advertising_account_ids: Added %d columns", added_count ) else: _LOGGER.info("Migration add_advertising_account_ids: Columns already exist") # Backfill existing reservations with account IDs based on config and fbclid/gclid presence if config: await _backfill_advertising_account_ids(engine, config) else: _LOGGER.warning( "No config provided, skipping backfill of advertising account IDs" ) async def _backfill_advertising_account_ids( engine: AsyncEngine, config: dict[str, Any] ) -> None: """Backfill advertising account IDs for existing reservations. Updates existing reservations to populate meta_account_id and google_account_id based on the conditional logic: - If fbclid is present, set meta_account_id from hotel config - If gclid is present, set google_account_id from hotel config Args: engine: SQLAlchemy async engine config: Application configuration dict """ _LOGGER.info("Backfilling advertising account IDs for existing reservations...") # Build a mapping of hotel_id -> account IDs from config hotel_accounts = {} alpine_bits_auth = config.get("alpine_bits_auth", []) for hotel in alpine_bits_auth: hotel_id = hotel.get(CONF_HOTEL_ID) meta_account = hotel.get(CONF_META_ACCOUNT) google_account = hotel.get(CONF_GOOGLE_ACCOUNT) if hotel_id: hotel_accounts[hotel_id] = { "meta_account": meta_account, "google_account": google_account, } if not hotel_accounts: _LOGGER.info("No hotel accounts found in config, skipping backfill") return _LOGGER.info("Found %d hotel(s) with account configurations", len(hotel_accounts)) # Update reservations with meta_account_id where fbclid is present meta_updated = 0 for hotel_id, accounts in hotel_accounts.items(): if accounts["meta_account"]: async with engine.begin() as conn: sql = text( "UPDATE reservations " "SET meta_account_id = :meta_account " "WHERE hotel_code = :hotel_id " "AND fbclid IS NOT NULL " "AND fbclid != '' " "AND (meta_account_id IS NULL OR meta_account_id = '')" ) result = await conn.execute( sql, {"meta_account": accounts["meta_account"], "hotel_id": hotel_id}, ) count = result.rowcount if count > 0: _LOGGER.info( "Updated %d reservations with meta_account_id for hotel %s", count, hotel_id, ) meta_updated += count # Update reservations with google_account_id where gclid is present google_updated = 0 for hotel_id, accounts in hotel_accounts.items(): if accounts["google_account"]: async with engine.begin() as conn: sql = text( "UPDATE reservations " "SET google_account_id = :google_account " "WHERE hotel_code = :hotel_id " "AND gclid IS NOT NULL " "AND gclid != '' " "AND (google_account_id IS NULL OR google_account_id = '')" ) result = await conn.execute( sql, { "google_account": accounts["google_account"], "hotel_id": hotel_id, }, ) count = result.rowcount if count > 0: _LOGGER.info( "Updated %d reservations with google_account_id for hotel %s", count, hotel_id, ) google_updated += count _LOGGER.info( "Backfill complete: %d reservations updated with meta_account_id, %d with google_account_id", meta_updated, google_updated, ) async def migrate_add_username_to_acked_requests( engine: AsyncEngine, config: dict[str, Any] | None = None ) -> None: """Migration: Add username column to acked_requests table and backfill with hotel usernames. This migration adds a username column to acked_requests to track acknowledgements by username instead of just client_id. This improves consistency since client_ids can change but usernames are stable. For existing acknowledgements, this migration queries reservations to determine the hotel_code, then looks up the corresponding username from the config and populates the new column. Safe to run multiple times - will skip if column already exists. Args: engine: SQLAlchemy async engine config: Application configuration dict containing hotel usernames """ _LOGGER.info("Running migration: add_username_to_acked_requests") # Add the username column if it doesn't exist if await add_column_if_not_exists(engine, "acked_requests", "username", "VARCHAR"): _LOGGER.info("Added username column to acked_requests table") else: _LOGGER.info("Username column already exists in acked_requests, skipping") return # Backfill existing acknowledgements with username from config if config: await _backfill_acked_requests_username(engine, config) else: _LOGGER.warning( "No config provided, skipping backfill of acked_requests usernames" ) async def _backfill_acked_requests_username( engine: AsyncEngine, config: dict[str, Any] ) -> None: """Backfill username for existing acked_requests records. For each acknowledgement, find the corresponding reservation to determine its hotel_code, then look up the username for that hotel in the config and update the acked_request record. Args: engine: SQLAlchemy async engine config: Application configuration dict """ _LOGGER.info("Backfilling usernames for existing acked_requests...") # Build a mapping of hotel_id -> username from config hotel_usernames = {} alpine_bits_auth = config.get("alpine_bits_auth", []) for hotel in alpine_bits_auth: hotel_id = hotel.get(CONF_HOTEL_ID) username = hotel.get("username") if hotel_id and username: hotel_usernames[hotel_id] = username if not hotel_usernames: _LOGGER.info("No hotel usernames found in config, skipping backfill") return _LOGGER.info("Found %d hotel(s) with usernames in config", len(hotel_usernames)) # Update acked_requests with usernames by matching to reservations total_updated = 0 async with engine.begin() as conn: for hotel_id, username in hotel_usernames.items(): sql = text(""" UPDATE acked_requests SET username = :username WHERE unique_id IN ( SELECT md5_unique_id FROM reservations WHERE hotel_code = :hotel_id ) AND username IS NULL """) result = await conn.execute( sql, {"username": username, "hotel_id": hotel_id} ) count = result.rowcount if count > 0: _LOGGER.info( "Updated %d acknowledgements with username for hotel %s", count, hotel_id, ) total_updated += count _LOGGER.info( "Backfill complete: %d acknowledgements updated with username", total_updated ) async def table_exists(engine: AsyncEngine, table_name: str) -> bool: """Check if a table exists in the database. Args: engine: SQLAlchemy async engine table_name: Name of the table to check Returns: True if table exists, False otherwise """ async with engine.connect() as conn: def _check(connection): inspector = inspect(connection) return table_name in inspector.get_table_names() return await conn.run_sync(_check) async def drop_table(engine: AsyncEngine, table_name: str) -> None: """Drop a table from the database. Args: engine: SQLAlchemy async engine table_name: Name of the table to drop """ async with engine.begin() as conn: await conn.execute(text(f"DROP TABLE IF EXISTS {table_name}")) _LOGGER.info("Dropped table: %s", table_name) async def migrate_normalize_conversions(engine: AsyncEngine) -> None: """Migration: Normalize conversions and room reservations structure. This migration redesigns the conversion data structure: - conversions: One row per PMS reservation (with guest/advertising metadata) - conversion_rooms: One row per room reservation (linked to conversion) - daily_sales: JSON array of daily sales within each room reservation - total_revenue: Extracted sum of all daily sales for efficiency Old structure: One row per daily sale (denormalized, lots of duplication) New structure: One row per room reservation, daily sales as JSON with extracted total This allows: - Upserts on room reservations (same room doesn't get duplicated) - Better tracking of room data separate from daily sales data - Efficient querying via extracted total_revenue field - All daily sales details preserved in JSON for analysis The new tables are created via Base.metadata.create_all() at startup. This migration handles cleanup of old schema versions. Safe to run multiple times - idempotent. """ _LOGGER.info("Running migration: normalize_conversions") # Check if the old conversions table exists with the old schema # If the table exists but doesn't match our current schema definition, drop it old_conversions_exists = await table_exists(engine, "conversions") if old_conversions_exists: # Check if this is the old-style table (we'll look for unexpected columns) # The old table would not have the new structure we've defined async with engine.connect() as conn: def _get_columns(connection): inspector = inspect(connection) return [col["name"] for col in inspector.get_columns("conversions")] old_columns = await conn.run_sync(_get_columns) # Expected columns in the new schema (defined in db.py) # If the table is missing key columns from our schema, it's the old version expected_columns = { "id", "reservation_id", "customer_id", "hashed_customer_id", "hotel_id", "pms_reservation_id", "reservation_number", "reservation_date", "creation_time", "reservation_type", "booking_channel", "guest_first_name", "guest_last_name", "guest_email", "guest_country_code", "advertising_medium", "advertising_partner", "advertising_campagne", "created_at", "updated_at", } old_columns_set = set(old_columns) # If we're missing critical new columns, this is the old schema if not expected_columns.issubset(old_columns_set): _LOGGER.info( "Found old conversions table with incompatible schema. " "Old columns: %s. Expected new columns: %s", old_columns, expected_columns, ) await drop_table(engine, "conversions") _LOGGER.info( "Dropped old conversions table to allow creation of new schema" ) else: _LOGGER.info( "Conversions table exists with compatible schema, no migration needed" ) # Check for the old conversion_rooms table (which should not exist in the new schema) old_conversion_rooms_exists = await table_exists(engine, "conversion_rooms") if old_conversion_rooms_exists: await drop_table(engine, "conversion_rooms") _LOGGER.info("Dropped old conversion_rooms table") _LOGGER.info( "Migration normalize_conversions: Conversion data structure normalized. " "New tables (conversions + conversion_rooms) will be created/updated via " "Base.metadata.create_all()" ) async def run_all_migrations( engine: AsyncEngine, config: dict[str, Any] | None = None ) -> None: """Run all pending migrations. This function should be called at app startup, after Base.metadata.create_all. Each migration function should be idempotent (safe to run multiple times). Args: engine: SQLAlchemy async engine config: Application configuration dict (optional, but required for some migrations) """ _LOGGER.info("Starting database migrations...") try: # Add new migrations here in chronological order await migrate_add_room_types(engine) await migrate_add_advertising_account_ids(engine, config) await migrate_add_username_to_acked_requests(engine, config) await migrate_normalize_conversions(engine) _LOGGER.info("Database migrations completed successfully") except Exception as e: _LOGGER.exception("Migration failed: %s", e) raise