Alembic experiments
This commit is contained in:
@@ -44,13 +44,13 @@ from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT, HttpSt
|
||||
from .conversion_service import ConversionService
|
||||
from .csv_import import CSVImporter
|
||||
from .customer_service import CustomerService
|
||||
from .db import Base, ResilientAsyncSession, SessionMaker, create_database_engine
|
||||
from .db import ResilientAsyncSession, SessionMaker, create_database_engine
|
||||
from .db import Customer as DBCustomer
|
||||
from .db import Reservation as DBReservation
|
||||
from .db_setup import run_startup_tasks
|
||||
from .email_monitoring import ReservationStatsCollector
|
||||
from .email_service import create_email_service
|
||||
from .logging_config import get_logger, setup_logging
|
||||
from .migrations import run_all_migrations
|
||||
from .pushover_service import create_pushover_service
|
||||
from .rate_limit import (
|
||||
BURST_RATE_LIMIT,
|
||||
@@ -331,31 +331,15 @@ async def lifespan(app: FastAPI):
|
||||
elif hotel_id and not push_endpoint:
|
||||
_LOGGER.info("Hotel %s has no push_endpoint configured", hotel_id)
|
||||
|
||||
# Create tables first (all workers)
|
||||
# This ensures tables exist before migrations try to alter them
|
||||
async with engine.begin() as conn:
|
||||
await conn.run_sync(Base.metadata.create_all)
|
||||
_LOGGER.info("Database tables checked/created at startup.")
|
||||
|
||||
# Run migrations after tables exist (only primary worker for race conditions)
|
||||
# Run startup tasks (only in primary worker to avoid race conditions)
|
||||
# NOTE: Database migrations should already have been run before the app started
|
||||
# via run_migrations.py or `uv run alembic upgrade head`
|
||||
if is_primary:
|
||||
await run_all_migrations(engine, config)
|
||||
_LOGGER.info("Running startup tasks (primary worker)...")
|
||||
await run_startup_tasks(AsyncSessionLocal, config)
|
||||
_LOGGER.info("Startup tasks completed")
|
||||
else:
|
||||
_LOGGER.info("Skipping migrations (non-primary worker)")
|
||||
|
||||
# Hash any existing customers (only in primary worker to avoid race conditions)
|
||||
if is_primary:
|
||||
async with AsyncSessionLocal() as session:
|
||||
customer_service = CustomerService(session)
|
||||
hashed_count = await customer_service.hash_existing_customers()
|
||||
if hashed_count > 0:
|
||||
_LOGGER.info(
|
||||
"Backfilled hashed data for %d existing customers", hashed_count
|
||||
)
|
||||
else:
|
||||
_LOGGER.info("All existing customers already have hashed data")
|
||||
else:
|
||||
_LOGGER.info("Skipping customer hashing (non-primary worker)")
|
||||
_LOGGER.info("Skipping startup tasks (non-primary worker)")
|
||||
|
||||
# Initialize and hook up stats collector for daily reports
|
||||
# Note: report_scheduler will only exist on the primary worker
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
"""Service for handling conversion data from hotel PMS XML files."""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import xml.etree.ElementTree as ET
|
||||
from datetime import datetime
|
||||
from decimal import Decimal
|
||||
@@ -11,7 +10,14 @@ from sqlalchemy import or_, select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.orm import selectinload
|
||||
|
||||
from .db import Conversion, RoomReservation, Customer, HashedCustomer, Reservation, SessionMaker
|
||||
from .db import (
|
||||
Conversion,
|
||||
ConversionRoom,
|
||||
Customer,
|
||||
HashedCustomer,
|
||||
Reservation,
|
||||
SessionMaker,
|
||||
)
|
||||
from .logging_config import get_logger
|
||||
|
||||
_LOGGER = get_logger(__name__)
|
||||
@@ -45,17 +51,23 @@ class ConversionService:
|
||||
# Cache for reservation and customer data within a single XML processing run
|
||||
# Maps hotel_code -> list of (reservation, customer) tuples
|
||||
# This significantly speeds up matching when processing large XML files
|
||||
self._reservation_cache: dict[str | None, list[tuple[Reservation, Customer | None]]] = {}
|
||||
self._reservation_cache: dict[
|
||||
str | None, list[tuple[Reservation, Customer | None]]
|
||||
] = {}
|
||||
self._cache_initialized = False
|
||||
|
||||
if isinstance(session, SessionMaker):
|
||||
self.session_maker = session
|
||||
self.supports_concurrent = True
|
||||
_LOGGER.info("ConversionService initialized in concurrent mode with SessionMaker")
|
||||
_LOGGER.info(
|
||||
"ConversionService initialized in concurrent mode with SessionMaker"
|
||||
)
|
||||
elif isinstance(session, AsyncSession):
|
||||
self.session = session
|
||||
self.supports_concurrent = False
|
||||
_LOGGER.info("ConversionService initialized in sequential mode with single session")
|
||||
_LOGGER.info(
|
||||
"ConversionService initialized in sequential mode with single session"
|
||||
)
|
||||
elif session is not None:
|
||||
raise TypeError(
|
||||
f"session must be AsyncSession or SessionMaker, got {type(session)}"
|
||||
@@ -202,9 +214,7 @@ class ConversionService:
|
||||
async with asyncio.TaskGroup() as tg:
|
||||
for reservation in reservations:
|
||||
tg.create_task(
|
||||
self._process_reservation_safe(
|
||||
reservation, semaphore, stats
|
||||
)
|
||||
self._process_reservation_safe(reservation, semaphore, stats)
|
||||
)
|
||||
|
||||
async def _process_reservations_concurrent(
|
||||
@@ -227,9 +237,7 @@ class ConversionService:
|
||||
async with asyncio.TaskGroup() as tg:
|
||||
for reservation in reservations:
|
||||
tg.create_task(
|
||||
self._process_reservation_safe(
|
||||
reservation, semaphore, stats
|
||||
)
|
||||
self._process_reservation_safe(reservation, semaphore, stats)
|
||||
)
|
||||
|
||||
async def _process_reservation_safe(
|
||||
@@ -247,6 +255,7 @@ class ConversionService:
|
||||
reservation_elem: XML element for the reservation
|
||||
semaphore: Semaphore to limit concurrent operations
|
||||
stats: Shared stats dictionary (thread-safe due to GIL)
|
||||
|
||||
"""
|
||||
pms_reservation_id = reservation_elem.get("id")
|
||||
|
||||
@@ -295,18 +304,19 @@ class ConversionService:
|
||||
if self.session_maker:
|
||||
await session.close()
|
||||
|
||||
async def _handle_deleted_reservation(self, pms_reservation_id: str, session: AsyncSession):
|
||||
async def _handle_deleted_reservation(
|
||||
self, pms_reservation_id: str, session: AsyncSession
|
||||
):
|
||||
"""Handle deleted reservation by marking conversions as deleted or removing them.
|
||||
|
||||
Args:
|
||||
pms_reservation_id: PMS reservation ID to delete
|
||||
session: AsyncSession to use for the operation
|
||||
|
||||
"""
|
||||
# For now, we'll just log it. You could add a 'deleted' flag to the Conversion table
|
||||
# or actually delete the conversion records
|
||||
_LOGGER.info(
|
||||
"Processing deleted reservation: PMS ID %s", pms_reservation_id
|
||||
)
|
||||
_LOGGER.info("Processing deleted reservation: PMS ID %s", pms_reservation_id)
|
||||
|
||||
# Option 1: Delete conversion records
|
||||
result = await session.execute(
|
||||
@@ -337,6 +347,7 @@ class ConversionService:
|
||||
In concurrent mode, each task passes its own session.
|
||||
|
||||
Returns statistics about what was matched.
|
||||
|
||||
"""
|
||||
if session is None:
|
||||
session = self.session
|
||||
@@ -394,9 +405,7 @@ class ConversionService:
|
||||
creation_time_str.replace("Z", "+00:00")
|
||||
)
|
||||
except ValueError:
|
||||
_LOGGER.warning(
|
||||
"Invalid creation time format: %s", creation_time_str
|
||||
)
|
||||
_LOGGER.warning("Invalid creation time format: %s", creation_time_str)
|
||||
|
||||
# Find matching reservation, customer, and hashed_customer using advertising data and guest details
|
||||
matched_reservation = None
|
||||
@@ -515,18 +524,15 @@ class ConversionService:
|
||||
|
||||
# Batch-load existing room reservations to avoid N+1 queries
|
||||
room_numbers = [
|
||||
rm.get("roomNumber")
|
||||
for rm in room_reservations.findall("roomReservation")
|
||||
rm.get("roomNumber") for rm in room_reservations.findall("roomReservation")
|
||||
]
|
||||
pms_hotel_reservation_ids = [
|
||||
f"{pms_reservation_id}_{room_num}" for room_num in room_numbers
|
||||
]
|
||||
|
||||
existing_rooms_result = await session.execute(
|
||||
select(RoomReservation).where(
|
||||
RoomReservation.pms_hotel_reservation_id.in_(
|
||||
pms_hotel_reservation_ids
|
||||
)
|
||||
select(ConversionRoom).where(
|
||||
ConversionRoom.pms_hotel_reservation_id.in_(pms_hotel_reservation_ids)
|
||||
)
|
||||
)
|
||||
existing_rooms = {
|
||||
@@ -556,9 +562,7 @@ class ConversionService:
|
||||
departure_date = None
|
||||
if departure_str:
|
||||
try:
|
||||
departure_date = datetime.strptime(
|
||||
departure_str, "%Y-%m-%d"
|
||||
).date()
|
||||
departure_date = datetime.strptime(departure_str, "%Y-%m-%d").date()
|
||||
except ValueError:
|
||||
_LOGGER.warning("Invalid departure date format: %s", departure_str)
|
||||
|
||||
@@ -576,7 +580,7 @@ class ConversionService:
|
||||
# Process daily sales and extract total revenue
|
||||
daily_sales_elem = room_reservation.find("dailySales")
|
||||
daily_sales_list = []
|
||||
total_revenue = Decimal("0")
|
||||
total_revenue = Decimal(0)
|
||||
|
||||
if daily_sales_elem is not None:
|
||||
for daily_sale in daily_sales_elem.findall("dailySale"):
|
||||
@@ -642,7 +646,7 @@ class ConversionService:
|
||||
)
|
||||
else:
|
||||
# Create new room reservation
|
||||
room_reservation_record = RoomReservation(
|
||||
room_reservation_record = ConversionRoom(
|
||||
conversion_id=conversion.id,
|
||||
pms_hotel_reservation_id=pms_hotel_reservation_id,
|
||||
arrival_date=arrival_date,
|
||||
@@ -734,7 +738,9 @@ class ConversionService:
|
||||
)
|
||||
|
||||
# Strategy 2: If no advertising match, try email/name-based matching
|
||||
if not result["reservation"] and (guest_email or guest_first_name or guest_last_name):
|
||||
if not result["reservation"] and (
|
||||
guest_email or guest_first_name or guest_last_name
|
||||
):
|
||||
matched_reservation = await self._match_by_guest_details(
|
||||
hotel_id, guest_first_name, guest_last_name, guest_email, session
|
||||
)
|
||||
@@ -798,6 +804,7 @@ class ConversionService:
|
||||
|
||||
Returns:
|
||||
Matched Reservation or None
|
||||
|
||||
"""
|
||||
if session is None:
|
||||
session = self.session
|
||||
@@ -882,6 +889,7 @@ class ConversionService:
|
||||
|
||||
Returns:
|
||||
Matched Reservation or None
|
||||
|
||||
"""
|
||||
if session is None:
|
||||
session = self.session
|
||||
@@ -892,9 +900,7 @@ class ConversionService:
|
||||
|
||||
# Get reservations from cache for this hotel
|
||||
if hotel_id and hotel_id in self._reservation_cache:
|
||||
all_reservations = [
|
||||
res for res, _ in self._reservation_cache[hotel_id]
|
||||
]
|
||||
all_reservations = [res for res, _ in self._reservation_cache[hotel_id]]
|
||||
elif not hotel_id:
|
||||
# If no hotel_id specified, use all cached reservations
|
||||
for reservations_list in self._reservation_cache.values():
|
||||
@@ -947,6 +953,7 @@ class ConversionService:
|
||||
|
||||
Returns:
|
||||
Matched Reservation or None
|
||||
|
||||
"""
|
||||
# Filter by guest details
|
||||
candidates = []
|
||||
@@ -1019,6 +1026,7 @@ class ConversionService:
|
||||
|
||||
Returns:
|
||||
Single best-match Reservation, or None if no good match found
|
||||
|
||||
"""
|
||||
candidates = reservations
|
||||
|
||||
|
||||
86
src/alpine_bits_python/db_setup.py
Normal file
86
src/alpine_bits_python/db_setup.py
Normal file
@@ -0,0 +1,86 @@
|
||||
"""Database setup and initialization.
|
||||
|
||||
This module handles all database setup tasks that should run once at startup,
|
||||
before the application starts accepting requests. It includes:
|
||||
- Schema migrations via Alembic
|
||||
- One-time data cleanup/backfill tasks (e.g., hashing existing customers)
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker
|
||||
|
||||
from .customer_service import CustomerService
|
||||
from .db import create_database_engine
|
||||
from .logging_config import get_logger
|
||||
|
||||
_LOGGER = get_logger(__name__)
|
||||
|
||||
|
||||
async def setup_database(config: dict[str, Any] | None = None) -> tuple[AsyncEngine, async_sessionmaker]:
|
||||
"""Set up the database and prepare for application use.
|
||||
|
||||
This function should be called once at application startup, after
|
||||
migrations have been run but before the app starts accepting requests. It:
|
||||
1. Creates the async engine
|
||||
2. Creates the sessionmaker
|
||||
3. Performs one-time startup tasks (e.g., hashing existing customers)
|
||||
|
||||
NOTE: Database migrations should be run BEFORE calling this function,
|
||||
typically using `uv run alembic upgrade head` or via run_migrations.py.
|
||||
|
||||
Args:
|
||||
config: Application configuration dictionary
|
||||
|
||||
Returns:
|
||||
Tuple of (engine, async_sessionmaker) for use in the application
|
||||
|
||||
Raises:
|
||||
Any database-related exceptions that occur during setup
|
||||
"""
|
||||
_LOGGER.info("Starting database setup...")
|
||||
|
||||
# Create database engine
|
||||
engine = create_database_engine(config=config, echo=False)
|
||||
|
||||
try:
|
||||
# Create sessionmaker for the application to use
|
||||
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
|
||||
|
||||
# Perform startup tasks (NOT migrations)
|
||||
_LOGGER.info("Running startup tasks...")
|
||||
await run_startup_tasks(AsyncSessionLocal, config)
|
||||
_LOGGER.info("Startup tasks completed successfully")
|
||||
|
||||
_LOGGER.info("Database setup completed successfully")
|
||||
return engine, AsyncSessionLocal
|
||||
|
||||
except Exception as e:
|
||||
_LOGGER.exception("Database setup failed: %s", e)
|
||||
await engine.dispose()
|
||||
raise
|
||||
|
||||
|
||||
async def run_startup_tasks(
|
||||
sessionmaker: async_sessionmaker, config: dict[str, Any] | None = None
|
||||
) -> None:
|
||||
"""Run one-time startup tasks.
|
||||
|
||||
These are tasks that need to run at startup but are NOT schema migrations.
|
||||
Examples: data backfills, hashing existing records, etc.
|
||||
|
||||
Args:
|
||||
sessionmaker: SQLAlchemy async sessionmaker
|
||||
config: Application configuration dictionary
|
||||
"""
|
||||
# Hash any existing customers that don't have hashed data
|
||||
async with sessionmaker() as session:
|
||||
customer_service = CustomerService(session)
|
||||
hashed_count = await customer_service.hash_existing_customers()
|
||||
if hashed_count > 0:
|
||||
_LOGGER.info(
|
||||
"Backfilled hashed data for %d existing customers", hashed_count
|
||||
)
|
||||
else:
|
||||
_LOGGER.info("All existing customers already have hashed data")
|
||||
@@ -11,12 +11,13 @@ from sqlalchemy.ext.asyncio import AsyncEngine
|
||||
|
||||
from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT
|
||||
from .logging_config import get_logger
|
||||
from .db import Reservation
|
||||
|
||||
_LOGGER = get_logger(__name__)
|
||||
|
||||
|
||||
async def check_column_exists(engine: AsyncEngine, table_name: str, column_name: str) -> bool:
|
||||
async def check_column_exists(
|
||||
engine: AsyncEngine, table_name: str, column_name: str
|
||||
) -> bool:
|
||||
"""Check if a column exists in a table.
|
||||
|
||||
Args:
|
||||
@@ -26,11 +27,13 @@ async def check_column_exists(engine: AsyncEngine, table_name: str, column_name:
|
||||
|
||||
Returns:
|
||||
True if column exists, False otherwise
|
||||
|
||||
"""
|
||||
async with engine.connect() as conn:
|
||||
|
||||
def _check(connection):
|
||||
inspector = inspect(connection)
|
||||
columns = [col['name'] for col in inspector.get_columns(table_name)]
|
||||
columns = [col["name"] for col in inspector.get_columns(table_name)]
|
||||
return column_name in columns
|
||||
|
||||
result = await conn.run_sync(_check)
|
||||
@@ -38,10 +41,7 @@ async def check_column_exists(engine: AsyncEngine, table_name: str, column_name:
|
||||
|
||||
|
||||
async def add_column_if_not_exists(
|
||||
engine: AsyncEngine,
|
||||
table_name: str,
|
||||
column_name: str,
|
||||
column_type: str = "VARCHAR"
|
||||
engine: AsyncEngine, table_name: str, column_name: str, column_type: str = "VARCHAR"
|
||||
) -> bool:
|
||||
"""Add a column to a table if it doesn't already exist.
|
||||
|
||||
@@ -53,6 +53,7 @@ async def add_column_if_not_exists(
|
||||
|
||||
Returns:
|
||||
True if column was added, False if it already existed
|
||||
|
||||
"""
|
||||
exists = await check_column_exists(engine, table_name, column_name)
|
||||
|
||||
@@ -85,10 +86,14 @@ async def migrate_add_room_types(engine: AsyncEngine) -> None:
|
||||
added_count = 0
|
||||
|
||||
# Add each column if it doesn't exist
|
||||
if await add_column_if_not_exists(engine, "reservations", "room_type_code", "VARCHAR"):
|
||||
if await add_column_if_not_exists(
|
||||
engine, "reservations", "room_type_code", "VARCHAR"
|
||||
):
|
||||
added_count += 1
|
||||
|
||||
if await add_column_if_not_exists(engine, "reservations", "room_classification_code", "VARCHAR"):
|
||||
if await add_column_if_not_exists(
|
||||
engine, "reservations", "room_classification_code", "VARCHAR"
|
||||
):
|
||||
added_count += 1
|
||||
|
||||
if await add_column_if_not_exists(engine, "reservations", "room_type", "VARCHAR"):
|
||||
@@ -100,7 +105,9 @@ async def migrate_add_room_types(engine: AsyncEngine) -> None:
|
||||
_LOGGER.info("Migration add_room_types: No changes needed (already applied)")
|
||||
|
||||
|
||||
async def migrate_add_advertising_account_ids(engine: AsyncEngine, config: dict[str, Any] | None = None) -> None:
|
||||
async def migrate_add_advertising_account_ids(
|
||||
engine: AsyncEngine, config: dict[str, Any] | None = None
|
||||
) -> None:
|
||||
"""Migration: Add advertising account ID fields to reservations table.
|
||||
|
||||
This migration adds two optional fields:
|
||||
@@ -114,20 +121,27 @@ async def migrate_add_advertising_account_ids(engine: AsyncEngine, config: dict[
|
||||
Args:
|
||||
engine: SQLAlchemy async engine
|
||||
config: Application configuration dict containing hotel account IDs
|
||||
|
||||
"""
|
||||
_LOGGER.info("Running migration: add_advertising_account_ids")
|
||||
|
||||
added_count = 0
|
||||
|
||||
# Add each column if it doesn't exist
|
||||
if await add_column_if_not_exists(engine, "reservations", "meta_account_id", "VARCHAR"):
|
||||
if await add_column_if_not_exists(
|
||||
engine, "reservations", "meta_account_id", "VARCHAR"
|
||||
):
|
||||
added_count += 1
|
||||
|
||||
if await add_column_if_not_exists(engine, "reservations", "google_account_id", "VARCHAR"):
|
||||
if await add_column_if_not_exists(
|
||||
engine, "reservations", "google_account_id", "VARCHAR"
|
||||
):
|
||||
added_count += 1
|
||||
|
||||
if added_count > 0:
|
||||
_LOGGER.info("Migration add_advertising_account_ids: Added %d columns", added_count)
|
||||
_LOGGER.info(
|
||||
"Migration add_advertising_account_ids: Added %d columns", added_count
|
||||
)
|
||||
else:
|
||||
_LOGGER.info("Migration add_advertising_account_ids: Columns already exist")
|
||||
|
||||
@@ -135,10 +149,14 @@ async def migrate_add_advertising_account_ids(engine: AsyncEngine, config: dict[
|
||||
if config:
|
||||
await _backfill_advertising_account_ids(engine, config)
|
||||
else:
|
||||
_LOGGER.warning("No config provided, skipping backfill of advertising account IDs")
|
||||
_LOGGER.warning(
|
||||
"No config provided, skipping backfill of advertising account IDs"
|
||||
)
|
||||
|
||||
|
||||
async def _backfill_advertising_account_ids(engine: AsyncEngine, config: dict[str, Any]) -> None:
|
||||
async def _backfill_advertising_account_ids(
|
||||
engine: AsyncEngine, config: dict[str, Any]
|
||||
) -> None:
|
||||
"""Backfill advertising account IDs for existing reservations.
|
||||
|
||||
Updates existing reservations to populate meta_account_id and google_account_id
|
||||
@@ -149,6 +167,7 @@ async def _backfill_advertising_account_ids(engine: AsyncEngine, config: dict[st
|
||||
Args:
|
||||
engine: SQLAlchemy async engine
|
||||
config: Application configuration dict
|
||||
|
||||
"""
|
||||
_LOGGER.info("Backfilling advertising account IDs for existing reservations...")
|
||||
|
||||
@@ -164,7 +183,7 @@ async def _backfill_advertising_account_ids(engine: AsyncEngine, config: dict[st
|
||||
if hotel_id:
|
||||
hotel_accounts[hotel_id] = {
|
||||
"meta_account": meta_account,
|
||||
"google_account": google_account
|
||||
"google_account": google_account,
|
||||
}
|
||||
|
||||
if not hotel_accounts:
|
||||
@@ -188,11 +207,15 @@ async def _backfill_advertising_account_ids(engine: AsyncEngine, config: dict[st
|
||||
)
|
||||
result = await conn.execute(
|
||||
sql,
|
||||
{"meta_account": accounts["meta_account"], "hotel_id": hotel_id}
|
||||
{"meta_account": accounts["meta_account"], "hotel_id": hotel_id},
|
||||
)
|
||||
count = result.rowcount
|
||||
if count > 0:
|
||||
_LOGGER.info("Updated %d reservations with meta_account_id for hotel %s", count, hotel_id)
|
||||
_LOGGER.info(
|
||||
"Updated %d reservations with meta_account_id for hotel %s",
|
||||
count,
|
||||
hotel_id,
|
||||
)
|
||||
meta_updated += count
|
||||
|
||||
# Update reservations with google_account_id where gclid is present
|
||||
@@ -210,21 +233,30 @@ async def _backfill_advertising_account_ids(engine: AsyncEngine, config: dict[st
|
||||
)
|
||||
result = await conn.execute(
|
||||
sql,
|
||||
{"google_account": accounts["google_account"], "hotel_id": hotel_id}
|
||||
{
|
||||
"google_account": accounts["google_account"],
|
||||
"hotel_id": hotel_id,
|
||||
},
|
||||
)
|
||||
count = result.rowcount
|
||||
if count > 0:
|
||||
_LOGGER.info("Updated %d reservations with google_account_id for hotel %s", count, hotel_id)
|
||||
_LOGGER.info(
|
||||
"Updated %d reservations with google_account_id for hotel %s",
|
||||
count,
|
||||
hotel_id,
|
||||
)
|
||||
google_updated += count
|
||||
|
||||
_LOGGER.info(
|
||||
"Backfill complete: %d reservations updated with meta_account_id, %d with google_account_id",
|
||||
meta_updated,
|
||||
google_updated
|
||||
google_updated,
|
||||
)
|
||||
|
||||
|
||||
async def migrate_add_username_to_acked_requests(engine: AsyncEngine, config: dict[str, Any] | None = None) -> None:
|
||||
async def migrate_add_username_to_acked_requests(
|
||||
engine: AsyncEngine, config: dict[str, Any] | None = None
|
||||
) -> None:
|
||||
"""Migration: Add username column to acked_requests table and backfill with hotel usernames.
|
||||
|
||||
This migration adds a username column to acked_requests to track acknowledgements by username
|
||||
@@ -238,6 +270,7 @@ async def migrate_add_username_to_acked_requests(engine: AsyncEngine, config: di
|
||||
Args:
|
||||
engine: SQLAlchemy async engine
|
||||
config: Application configuration dict containing hotel usernames
|
||||
|
||||
"""
|
||||
_LOGGER.info("Running migration: add_username_to_acked_requests")
|
||||
|
||||
@@ -252,10 +285,14 @@ async def migrate_add_username_to_acked_requests(engine: AsyncEngine, config: di
|
||||
if config:
|
||||
await _backfill_acked_requests_username(engine, config)
|
||||
else:
|
||||
_LOGGER.warning("No config provided, skipping backfill of acked_requests usernames")
|
||||
_LOGGER.warning(
|
||||
"No config provided, skipping backfill of acked_requests usernames"
|
||||
)
|
||||
|
||||
|
||||
async def _backfill_acked_requests_username(engine: AsyncEngine, config: dict[str, Any]) -> None:
|
||||
async def _backfill_acked_requests_username(
|
||||
engine: AsyncEngine, config: dict[str, Any]
|
||||
) -> None:
|
||||
"""Backfill username for existing acked_requests records.
|
||||
|
||||
For each acknowledgement, find the corresponding reservation to determine its hotel_code,
|
||||
@@ -264,6 +301,7 @@ async def _backfill_acked_requests_username(engine: AsyncEngine, config: dict[st
|
||||
Args:
|
||||
engine: SQLAlchemy async engine
|
||||
config: Application configuration dict
|
||||
|
||||
"""
|
||||
_LOGGER.info("Backfilling usernames for existing acked_requests...")
|
||||
|
||||
@@ -297,15 +335,53 @@ async def _backfill_acked_requests_username(engine: AsyncEngine, config: dict[st
|
||||
AND username IS NULL
|
||||
""")
|
||||
result = await conn.execute(
|
||||
sql,
|
||||
{"username": username, "hotel_id": hotel_id}
|
||||
sql, {"username": username, "hotel_id": hotel_id}
|
||||
)
|
||||
count = result.rowcount
|
||||
if count > 0:
|
||||
_LOGGER.info("Updated %d acknowledgements with username for hotel %s", count, hotel_id)
|
||||
_LOGGER.info(
|
||||
"Updated %d acknowledgements with username for hotel %s",
|
||||
count,
|
||||
hotel_id,
|
||||
)
|
||||
total_updated += count
|
||||
|
||||
_LOGGER.info("Backfill complete: %d acknowledgements updated with username", total_updated)
|
||||
_LOGGER.info(
|
||||
"Backfill complete: %d acknowledgements updated with username", total_updated
|
||||
)
|
||||
|
||||
|
||||
async def table_exists(engine: AsyncEngine, table_name: str) -> bool:
|
||||
"""Check if a table exists in the database.
|
||||
|
||||
Args:
|
||||
engine: SQLAlchemy async engine
|
||||
table_name: Name of the table to check
|
||||
|
||||
Returns:
|
||||
True if table exists, False otherwise
|
||||
|
||||
"""
|
||||
async with engine.connect() as conn:
|
||||
|
||||
def _check(connection):
|
||||
inspector = inspect(connection)
|
||||
return table_name in inspector.get_table_names()
|
||||
|
||||
return await conn.run_sync(_check)
|
||||
|
||||
|
||||
async def drop_table(engine: AsyncEngine, table_name: str) -> None:
|
||||
"""Drop a table from the database.
|
||||
|
||||
Args:
|
||||
engine: SQLAlchemy async engine
|
||||
table_name: Name of the table to drop
|
||||
|
||||
"""
|
||||
async with engine.begin() as conn:
|
||||
await conn.execute(text(f"DROP TABLE IF EXISTS {table_name}"))
|
||||
_LOGGER.info("Dropped table: %s", table_name)
|
||||
|
||||
|
||||
async def migrate_normalize_conversions(engine: AsyncEngine) -> None:
|
||||
@@ -313,7 +389,7 @@ async def migrate_normalize_conversions(engine: AsyncEngine) -> None:
|
||||
|
||||
This migration redesigns the conversion data structure:
|
||||
- conversions: One row per PMS reservation (with guest/advertising metadata)
|
||||
- room_reservations: One row per room reservation (linked to conversion)
|
||||
- conversion_rooms: One row per room reservation (linked to conversion)
|
||||
- daily_sales: JSON array of daily sales within each room reservation
|
||||
- total_revenue: Extracted sum of all daily sales for efficiency
|
||||
|
||||
@@ -326,20 +402,88 @@ async def migrate_normalize_conversions(engine: AsyncEngine) -> None:
|
||||
- Efficient querying via extracted total_revenue field
|
||||
- All daily sales details preserved in JSON for analysis
|
||||
|
||||
The tables are created via Base.metadata.create_all() at startup.
|
||||
The new tables are created via Base.metadata.create_all() at startup.
|
||||
This migration handles cleanup of old schema versions.
|
||||
|
||||
Safe to run multiple times - idempotent.
|
||||
"""
|
||||
_LOGGER.info("Running migration: normalize_conversions")
|
||||
|
||||
# Check if the old conversions table exists with the old schema
|
||||
# If the table exists but doesn't match our current schema definition, drop it
|
||||
old_conversions_exists = await table_exists(engine, "conversions")
|
||||
|
||||
if old_conversions_exists:
|
||||
# Check if this is the old-style table (we'll look for unexpected columns)
|
||||
# The old table would not have the new structure we've defined
|
||||
async with engine.connect() as conn:
|
||||
|
||||
def _get_columns(connection):
|
||||
inspector = inspect(connection)
|
||||
return [col["name"] for col in inspector.get_columns("conversions")]
|
||||
|
||||
old_columns = await conn.run_sync(_get_columns)
|
||||
|
||||
# Expected columns in the new schema (defined in db.py)
|
||||
# If the table is missing key columns from our schema, it's the old version
|
||||
expected_columns = {
|
||||
"id",
|
||||
"reservation_id",
|
||||
"customer_id",
|
||||
"hashed_customer_id",
|
||||
"hotel_id",
|
||||
"pms_reservation_id",
|
||||
"reservation_number",
|
||||
"reservation_date",
|
||||
"creation_time",
|
||||
"reservation_type",
|
||||
"booking_channel",
|
||||
"guest_first_name",
|
||||
"guest_last_name",
|
||||
"guest_email",
|
||||
"guest_country_code",
|
||||
"advertising_medium",
|
||||
"advertising_partner",
|
||||
"advertising_campagne",
|
||||
"created_at",
|
||||
"updated_at",
|
||||
}
|
||||
|
||||
old_columns_set = set(old_columns)
|
||||
|
||||
# If we're missing critical new columns, this is the old schema
|
||||
if not expected_columns.issubset(old_columns_set):
|
||||
_LOGGER.info(
|
||||
"Found old conversions table with incompatible schema. "
|
||||
"Old columns: %s. Expected new columns: %s",
|
||||
old_columns,
|
||||
expected_columns,
|
||||
)
|
||||
await drop_table(engine, "conversions")
|
||||
_LOGGER.info(
|
||||
"Dropped old conversions table to allow creation of new schema"
|
||||
)
|
||||
else:
|
||||
_LOGGER.info(
|
||||
"Conversions table exists with compatible schema, no migration needed"
|
||||
)
|
||||
|
||||
# Check for the old conversion_rooms table (which should not exist in the new schema)
|
||||
old_conversion_rooms_exists = await table_exists(engine, "conversion_rooms")
|
||||
if old_conversion_rooms_exists:
|
||||
await drop_table(engine, "conversion_rooms")
|
||||
_LOGGER.info("Dropped old conversion_rooms table")
|
||||
|
||||
_LOGGER.info(
|
||||
"Conversion data structure redesigned: "
|
||||
"conversions (1 per PMS reservation) + "
|
||||
"room_reservations (1 per room, daily_sales as JSON). "
|
||||
"Tables created/updated via Base.metadata.create_all()"
|
||||
"Migration normalize_conversions: Conversion data structure normalized. "
|
||||
"New tables (conversions + conversion_rooms) will be created/updated via "
|
||||
"Base.metadata.create_all()"
|
||||
)
|
||||
|
||||
|
||||
async def run_all_migrations(engine: AsyncEngine, config: dict[str, Any] | None = None) -> None:
|
||||
async def run_all_migrations(
|
||||
engine: AsyncEngine, config: dict[str, Any] | None = None
|
||||
) -> None:
|
||||
"""Run all pending migrations.
|
||||
|
||||
This function should be called at app startup, after Base.metadata.create_all.
|
||||
@@ -348,6 +492,7 @@ async def run_all_migrations(engine: AsyncEngine, config: dict[str, Any] | None
|
||||
Args:
|
||||
engine: SQLAlchemy async engine
|
||||
config: Application configuration dict (optional, but required for some migrations)
|
||||
|
||||
"""
|
||||
_LOGGER.info("Starting database migrations...")
|
||||
|
||||
|
||||
@@ -1,15 +1,34 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Startup script for the Wix Form Handler API."""
|
||||
"""Startup script for the Wix Form Handler API.
|
||||
|
||||
import os
|
||||
This script:
|
||||
1. Runs database migrations using Alembic
|
||||
2. Starts the FastAPI application with uvicorn
|
||||
|
||||
Database migrations are run BEFORE starting the server to ensure the schema
|
||||
is up to date. This approach works well with multiple workers since migrations
|
||||
complete before any worker starts processing requests.
|
||||
"""
|
||||
|
||||
import sys
|
||||
|
||||
import uvicorn
|
||||
|
||||
if __name__ == "__main__":
|
||||
# db_path = "alpinebits.db" # Adjust path if needed
|
||||
# if os.path.exists(db_path):
|
||||
# os.remove(db_path)
|
||||
from alpine_bits_python.run_migrations import run_migrations
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Run database migrations before starting the server
|
||||
# This ensures the schema is up to date before any workers start
|
||||
print("Running database migrations...")
|
||||
try:
|
||||
run_migrations()
|
||||
print("Database migrations completed successfully")
|
||||
except Exception as e:
|
||||
print(f"Failed to run migrations: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Start the API server
|
||||
print("Starting API server...")
|
||||
uvicorn.run(
|
||||
"alpine_bits_python.api:app",
|
||||
host="0.0.0.0",
|
||||
|
||||
74
src/alpine_bits_python/run_migrations.py
Normal file
74
src/alpine_bits_python/run_migrations.py
Normal file
@@ -0,0 +1,74 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Run database migrations using Alembic.
|
||||
|
||||
This script should be run before starting the application to ensure
|
||||
the database schema is up to date. It can be run standalone or called
|
||||
from run_api.py before starting uvicorn.
|
||||
|
||||
Usage:
|
||||
uv run python -m alpine_bits_python.run_migrations
|
||||
or
|
||||
from alpine_bits_python.run_migrations import run_migrations
|
||||
run_migrations()
|
||||
"""
|
||||
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from .logging_config import get_logger
|
||||
|
||||
_LOGGER = get_logger(__name__)
|
||||
|
||||
|
||||
def run_migrations() -> None:
|
||||
"""Run Alembic migrations to upgrade database to latest schema.
|
||||
|
||||
This function runs 'alembic upgrade head' to apply all pending migrations.
|
||||
It will exit the process if migrations fail.
|
||||
|
||||
Raises:
|
||||
SystemExit: If migrations fail
|
||||
"""
|
||||
_LOGGER.info("Running database migrations...")
|
||||
|
||||
# Get the project root directory (where alembic.ini is located)
|
||||
# Assuming this file is in src/alpine_bits_python/
|
||||
project_root = Path(__file__).parent.parent.parent
|
||||
|
||||
try:
|
||||
# Run alembic upgrade head
|
||||
result = subprocess.run(
|
||||
["alembic", "upgrade", "head"],
|
||||
cwd=project_root,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True,
|
||||
)
|
||||
|
||||
_LOGGER.info("Database migrations completed successfully")
|
||||
_LOGGER.debug("Migration output: %s", result.stdout)
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
_LOGGER.error("Failed to run database migrations:")
|
||||
_LOGGER.error("Exit code: %d", e.returncode)
|
||||
_LOGGER.error("stdout: %s", e.stdout)
|
||||
_LOGGER.error("stderr: %s", e.stderr)
|
||||
sys.exit(1)
|
||||
except FileNotFoundError:
|
||||
_LOGGER.error(
|
||||
"Alembic not found. Please ensure it's installed: uv pip install alembic"
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Configure basic logging if run directly
|
||||
import logging
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
||||
)
|
||||
|
||||
run_migrations()
|
||||
print("Migrations completed successfully!")
|
||||
Reference in New Issue
Block a user