diff --git a/alembic/env.py b/alembic/env.py index e1d6275..0c68b47 100644 --- a/alembic/env.py +++ b/alembic/env.py @@ -4,18 +4,13 @@ import asyncio from logging.config import fileConfig from alembic import context -from sqlalchemy import pool +from sqlalchemy import pool, text from sqlalchemy.engine import Connection from sqlalchemy.ext.asyncio import async_engine_from_config # Import your models' Base to enable autogenerate from alpine_bits_python.config_loader import load_config -from alpine_bits_python.db import ( - Base, - configure_schema, - get_database_schema, - get_database_url, -) +from alpine_bits_python.db import Base, get_database_schema, get_database_url # this is the Alembic Config object, which provides # access to the values within the .ini file in use. @@ -39,14 +34,16 @@ if db_url: config.set_main_option("sqlalchemy.url", db_url) # Get schema name from application config -schema_name = get_database_schema(app_config) -if schema_name: - # Configure schema for all tables before migrations - configure_schema(schema_name) +SCHEMA = get_database_schema(app_config) # add your model's MetaData object here for 'autogenerate' support target_metadata = Base.metadata +# Configure metadata to resolve unqualified table names in the schema +# This is needed so ForeignKey("customers.id") can find "alpinebits.customers" +if SCHEMA: + target_metadata.schema = SCHEMA + def run_migrations_offline() -> None: """Run migrations in 'offline' mode. @@ -65,11 +62,13 @@ def run_migrations_offline() -> None: target_metadata=target_metadata, literal_binds=True, dialect_opts={"paramstyle": "named"}, - version_table_schema=schema_name, # Store alembic_version in our schema - include_schemas=True, ) with context.begin_transaction(): + # Set search path for offline mode if schema is configured + if SCHEMA: + print(f"Setting search_path to {SCHEMA}, public") + context.execute(f"SET search_path TO {SCHEMA}, public") context.run_migrations() @@ -78,37 +77,38 @@ def do_run_migrations(connection: Connection) -> None: context.configure( connection=connection, target_metadata=target_metadata, - version_table_schema=schema_name, # Store alembic_version in our schema - include_schemas=True, # Allow Alembic to work with non-default schemas ) with context.begin_transaction(): + # Create schema if it doesn't exist + if SCHEMA: + connection.execute(text(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA}")) + # Set search path to our schema + print(f"setting search path to schema {SCHEMA}, ") + connection.execute(text(f"SET search_path TO {SCHEMA}")) context.run_migrations() async def run_async_migrations() -> None: - """Run migrations in 'online' mode using async engine. - - In this scenario we need to create an Engine + """In this scenario we need to create an Engine and associate a connection with the context. + """ - # Get the config section for sqlalchemy settings - configuration = config.get_section(config.config_ini_section, {}) - - # Add connect_args for PostgreSQL schema support if needed - if schema_name and "postgresql" in configuration.get("sqlalchemy.url", ""): - configuration["connect_args"] = { - "server_settings": {"search_path": f"{schema_name},public"} - } - - # Create async engine connectable = async_engine_from_config( - configuration, + config.get_section(config.config_ini_section, {}), prefix="sqlalchemy.", poolclass=pool.NullPool, ) async with connectable.connect() as connection: + if connection.dialect.name == "postgresql": + # set search path on the connection, which ensures that + # PostgreSQL will emit all CREATE / ALTER / DROP statements + # in terms of this schema by default + + connection.execute(text(f"SET search_path TO {SCHEMA}")) + # in SQLAlchemy v2+ the search path change needs to be committed + connection.commit() await connection.run_sync(do_run_migrations) await connectable.dispose() diff --git a/alembic/versions/2025_11_18_1046-94134e512a12_baseline_existing_database.py b/alembic/versions/2025_11_18_1046-94134e512a12_baseline_existing_database.py deleted file mode 100644 index 341f0c7..0000000 --- a/alembic/versions/2025_11_18_1046-94134e512a12_baseline_existing_database.py +++ /dev/null @@ -1,320 +0,0 @@ -"""Baseline existing database. - -This migration handles the transition from the old manual migration system -to Alembic. It: -1. Detects if the old conversions table schema exists and recreates it with the new schema -2. Acts as a no-op for all other tables (assumes they already exist) - -This allows existing databases to migrate to Alembic without data loss. - -Revision ID: 94134e512a12 -Revises: -Create Date: 2025-11-18 10:46:12.322570 -""" - -from collections.abc import Sequence - -import sqlalchemy as sa -from alembic import op -from sqlalchemy import inspect - -# revision identifiers, used by Alembic. -revision: str = "94134e512a12" -down_revision: str | None = None -branch_labels: str | Sequence[str] | None = None -depends_on: str | Sequence[str] | None = None - - -def upgrade() -> None: - """Migrate existing database to Alembic management. - - This migration: - - Drops and recreates the conversions/conversion_rooms tables with new schema - - Assumes all other tables already exist (no-op for them) - """ - conn = op.get_bind() - inspector = inspect(conn) - - # Get schema from alembic context (set in env.py from config) - from alpine_bits_python.config_loader import load_config - from alpine_bits_python.db import get_database_schema - - try: - app_config = load_config() - schema = get_database_schema(app_config) - except Exception: - schema = None - - print(f"Using schema: {schema or 'public (default)'}") - - # Get tables from the correct schema - existing_tables = set(inspector.get_table_names(schema=schema)) - - print(f"Found existing tables in schema '{schema}': {existing_tables}") - - # Handle conversions table migration - if "conversions" in existing_tables: - columns = [ - col["name"] for col in inspector.get_columns("conversions", schema=schema) - ] - - print(f"Columns in conversions table: {columns}") - columns_set = set(columns) - - print(f"DEBUG: Found columns in conversions table: {sorted(columns_set)}") - - # Old schema indicators: these columns should NOT be in conversions anymore - old_schema_columns = { - "arrival_date", - "departure_date", - "room_status", - "room_number", - "sale_date", - "revenue_total", - "revenue_logis", - "revenue_board", - } - - intersection = old_schema_columns & columns_set - print(f"DEBUG: Old schema columns found: {intersection}") - - # If ANY of the old denormalized columns exist, this is the old schema - if intersection: - # Old schema detected, drop and recreate - print( - f"Detected old conversions schema with denormalized room data: {old_schema_columns & columns_set}" - ) - - # Drop conversion_rooms FIRST if it exists (due to foreign key constraint) - if "conversion_rooms" in existing_tables: - print("Dropping old conversion_rooms table...") - op.execute( - f"DROP TABLE IF EXISTS {schema}.conversion_rooms CASCADE" - if schema - else "DROP TABLE IF EXISTS conversion_rooms CASCADE" - ) - - print("Dropping old conversions table...") - op.execute( - f"DROP TABLE IF EXISTS {schema}.conversions CASCADE" - if schema - else "DROP TABLE IF EXISTS conversions CASCADE" - ) - - # Drop any orphaned indexes that may have survived the table drop - print("Dropping any orphaned indexes...") - index_names = [ - "ix_conversions_advertising_campagne", - "ix_conversions_advertising_medium", - "ix_conversions_advertising_partner", - "ix_conversions_customer_id", - "ix_conversions_guest_email", - "ix_conversions_guest_first_name", - "ix_conversions_guest_last_name", - "ix_conversions_hashed_customer_id", - "ix_conversions_hotel_id", - "ix_conversions_pms_reservation_id", - "ix_conversions_reservation_id", - "ix_conversion_rooms_arrival_date", - "ix_conversion_rooms_conversion_id", - "ix_conversion_rooms_departure_date", - "ix_conversion_rooms_pms_hotel_reservation_id", - "ix_conversion_rooms_room_number", - ] - for idx_name in index_names: - op.execute( - f"DROP INDEX IF EXISTS {schema}.{idx_name}" if schema else f"DROP INDEX IF EXISTS {idx_name}" - ) - - print("Creating new conversions table with normalized schema...") - create_conversions_table(schema) - create_conversion_rooms_table(schema) - else: - print("Conversions table already has new schema, skipping migration") - else: - # No conversions table exists, create it - print("No conversions table found, creating new schema...") - create_conversions_table(schema) - create_conversion_rooms_table(schema) - - print("Baseline migration complete!") - - -def create_conversions_table(schema=None): - """Create the conversions table with the new normalized schema.""" - op.create_table( - "conversions", - sa.Column("id", sa.Integer(), nullable=False), - sa.Column("reservation_id", sa.Integer(), nullable=True), - sa.Column("customer_id", sa.Integer(), nullable=True), - sa.Column("hashed_customer_id", sa.Integer(), nullable=True), - sa.Column("hotel_id", sa.String(), nullable=True), - sa.Column("pms_reservation_id", sa.String(), nullable=True), - sa.Column("reservation_number", sa.String(), nullable=True), - sa.Column("reservation_date", sa.Date(), nullable=True), - sa.Column("creation_time", sa.DateTime(timezone=True), nullable=True), - sa.Column("reservation_type", sa.String(), nullable=True), - sa.Column("booking_channel", sa.String(), nullable=True), - sa.Column("guest_first_name", sa.String(), nullable=True), - sa.Column("guest_last_name", sa.String(), nullable=True), - sa.Column("guest_email", sa.String(), nullable=True), - sa.Column("guest_country_code", sa.String(), nullable=True), - sa.Column("advertising_medium", sa.String(), nullable=True), - sa.Column("advertising_partner", sa.String(), nullable=True), - sa.Column("advertising_campagne", sa.String(), nullable=True), - sa.Column("created_at", sa.DateTime(timezone=True), nullable=True), - sa.Column("updated_at", sa.DateTime(timezone=True), nullable=True), - sa.ForeignKeyConstraint( - ["customer_id"], - ["customers.id"], - ), - sa.ForeignKeyConstraint( - ["hashed_customer_id"], - ["hashed_customers.id"], - ), - sa.ForeignKeyConstraint( - ["reservation_id"], - ["reservations.id"], - ), - sa.PrimaryKeyConstraint("id"), - schema=schema, - ) - - # Create indexes - op.create_index( - op.f("ix_conversions_advertising_campagne"), - "conversions", - ["advertising_campagne"], - unique=False, - schema=schema, - ) - op.create_index( - op.f("ix_conversions_advertising_medium"), - "conversions", - ["advertising_medium"], - unique=False, - schema=schema, - ) - op.create_index( - op.f("ix_conversions_advertising_partner"), - "conversions", - ["advertising_partner"], - unique=False, - schema=schema, - ) - op.create_index( - op.f("ix_conversions_customer_id"), "conversions", ["customer_id"], unique=False - ) - op.create_index( - op.f("ix_conversions_guest_email"), "conversions", ["guest_email"], unique=False - ) - op.create_index( - op.f("ix_conversions_guest_first_name"), - "conversions", - ["guest_first_name"], - unique=False, - schema=schema, - ) - op.create_index( - op.f("ix_conversions_guest_last_name"), - "conversions", - ["guest_last_name"], - unique=False, - schema=schema, - ) - op.create_index( - op.f("ix_conversions_hashed_customer_id"), - "conversions", - ["hashed_customer_id"], - unique=False, - schema=schema, - ) - op.create_index( - op.f("ix_conversions_hotel_id"), "conversions", ["hotel_id"], unique=False - ) - op.create_index( - op.f("ix_conversions_pms_reservation_id"), - "conversions", - ["pms_reservation_id"], - unique=False, - schema=schema, - ) - op.create_index( - op.f("ix_conversions_reservation_id"), - "conversions", - ["reservation_id"], - unique=False, - schema=schema, - ) - - -def create_conversion_rooms_table(schema=None): - """Create the conversion_rooms table with the new normalized schema.""" - op.create_table( - "conversion_rooms", - sa.Column("id", sa.Integer(), nullable=False), - sa.Column("conversion_id", sa.Integer(), nullable=False), - sa.Column("pms_hotel_reservation_id", sa.String(), nullable=True), - sa.Column("arrival_date", sa.Date(), nullable=True), - sa.Column("departure_date", sa.Date(), nullable=True), - sa.Column("room_status", sa.String(), nullable=True), - sa.Column("room_type", sa.String(), nullable=True), - sa.Column("room_number", sa.String(), nullable=True), - sa.Column("num_adults", sa.Integer(), nullable=True), - sa.Column("rate_plan_code", sa.String(), nullable=True), - sa.Column("connected_room_type", sa.String(), nullable=True), - sa.Column("daily_sales", sa.JSON(), nullable=True), - sa.Column("total_revenue", sa.String(), nullable=True), - sa.Column("created_at", sa.DateTime(timezone=True), nullable=True), - sa.Column("updated_at", sa.DateTime(timezone=True), nullable=True), - sa.ForeignKeyConstraint( - ["conversion_id"], ["conversions.id"], ondelete="CASCADE" - ), - sa.PrimaryKeyConstraint("id"), - schema=schema, - ) - - # Create indexes - op.create_index( - op.f("ix_conversion_rooms_arrival_date"), - "conversion_rooms", - ["arrival_date"], - unique=False, - schema=schema, - ) - op.create_index( - op.f("ix_conversion_rooms_conversion_id"), - "conversion_rooms", - ["conversion_id"], - unique=False, - schema=schema, - ) - op.create_index( - op.f("ix_conversion_rooms_departure_date"), - "conversion_rooms", - ["departure_date"], - unique=False, - schema=schema, - ) - op.create_index( - op.f("ix_conversion_rooms_pms_hotel_reservation_id"), - "conversion_rooms", - ["pms_hotel_reservation_id"], - unique=False, - schema=schema, - ) - op.create_index( - op.f("ix_conversion_rooms_room_number"), - "conversion_rooms", - ["room_number"], - unique=False, - schema=schema, - ) - - -def downgrade() -> None: - """Downgrade not supported. - - This baseline migration drops data (old conversions schema) that can be - recreated from PMS XML imports. Reverting would require re-importing. - """ diff --git a/alembic/versions/2025_11_18_1319-630b0c367dcb_initial_migration.py b/alembic/versions/2025_11_18_1319-630b0c367dcb_initial_migration.py new file mode 100644 index 0000000..3c82f28 --- /dev/null +++ b/alembic/versions/2025_11_18_1319-630b0c367dcb_initial_migration.py @@ -0,0 +1,108 @@ +"""Initial migration + +Revision ID: 630b0c367dcb +Revises: +Create Date: 2025-11-18 13:19:37.183397 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '630b0c367dcb' +down_revision: Union[str, Sequence[str], None] = None +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('conversion_rooms', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('conversion_id', sa.Integer(), nullable=False), + sa.Column('pms_hotel_reservation_id', sa.String(), nullable=True), + sa.Column('arrival_date', sa.Date(), nullable=True), + sa.Column('departure_date', sa.Date(), nullable=True), + sa.Column('room_status', sa.String(), nullable=True), + sa.Column('room_type', sa.String(), nullable=True), + sa.Column('room_number', sa.String(), nullable=True), + sa.Column('num_adults', sa.Integer(), nullable=True), + sa.Column('rate_plan_code', sa.String(), nullable=True), + sa.Column('connected_room_type', sa.String(), nullable=True), + sa.Column('daily_sales', sa.JSON(), nullable=True), + sa.Column('total_revenue', sa.String(), nullable=True), + sa.Column('created_at', sa.DateTime(timezone=True), nullable=True), + sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True), + sa.ForeignKeyConstraint(['conversion_id'], ['alpinebits.conversions.id'], ), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_conversion_rooms_arrival_date'), 'conversion_rooms', ['arrival_date'], unique=False) + op.create_index(op.f('ix_conversion_rooms_conversion_id'), 'conversion_rooms', ['conversion_id'], unique=False) + op.create_index(op.f('ix_conversion_rooms_departure_date'), 'conversion_rooms', ['departure_date'], unique=False) + op.create_index(op.f('ix_conversion_rooms_pms_hotel_reservation_id'), 'conversion_rooms', ['pms_hotel_reservation_id'], unique=False) + op.create_index(op.f('ix_conversion_rooms_room_number'), 'conversion_rooms', ['room_number'], unique=False) + op.create_index(op.f('ix_acked_requests_username'), 'acked_requests', ['username'], unique=False) + op.add_column('conversions', sa.Column('guest_first_name', sa.String(), nullable=True)) + op.add_column('conversions', sa.Column('guest_last_name', sa.String(), nullable=True)) + op.add_column('conversions', sa.Column('guest_email', sa.String(), nullable=True)) + op.add_column('conversions', sa.Column('guest_country_code', sa.String(), nullable=True)) + op.add_column('conversions', sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True)) + op.drop_index(op.f('ix_conversions_sale_date'), table_name='conversions') + op.create_index(op.f('ix_conversions_guest_email'), 'conversions', ['guest_email'], unique=False) + op.create_index(op.f('ix_conversions_guest_first_name'), 'conversions', ['guest_first_name'], unique=False) + op.create_index(op.f('ix_conversions_guest_last_name'), 'conversions', ['guest_last_name'], unique=False) + op.drop_column('conversions', 'rate_plan_code') + op.drop_column('conversions', 'revenue_total') + op.drop_column('conversions', 'revenue_other') + op.drop_column('conversions', 'sale_date') + op.drop_column('conversions', 'room_status') + op.drop_column('conversions', 'revenue_board') + op.drop_column('conversions', 'departure_date') + op.drop_column('conversions', 'revenue_spa') + op.drop_column('conversions', 'num_adults') + op.drop_column('conversions', 'room_type') + op.drop_column('conversions', 'revenue_logis') + op.drop_column('conversions', 'room_number') + op.drop_column('conversions', 'arrival_date') + op.drop_column('conversions', 'revenue_fb') + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('conversions', sa.Column('revenue_fb', sa.VARCHAR(), autoincrement=False, nullable=True)) + op.add_column('conversions', sa.Column('arrival_date', sa.DATE(), autoincrement=False, nullable=True)) + op.add_column('conversions', sa.Column('room_number', sa.VARCHAR(), autoincrement=False, nullable=True)) + op.add_column('conversions', sa.Column('revenue_logis', sa.VARCHAR(), autoincrement=False, nullable=True)) + op.add_column('conversions', sa.Column('room_type', sa.VARCHAR(), autoincrement=False, nullable=True)) + op.add_column('conversions', sa.Column('num_adults', sa.INTEGER(), autoincrement=False, nullable=True)) + op.add_column('conversions', sa.Column('revenue_spa', sa.VARCHAR(), autoincrement=False, nullable=True)) + op.add_column('conversions', sa.Column('departure_date', sa.DATE(), autoincrement=False, nullable=True)) + op.add_column('conversions', sa.Column('revenue_board', sa.VARCHAR(), autoincrement=False, nullable=True)) + op.add_column('conversions', sa.Column('room_status', sa.VARCHAR(), autoincrement=False, nullable=True)) + op.add_column('conversions', sa.Column('sale_date', sa.DATE(), autoincrement=False, nullable=True)) + op.add_column('conversions', sa.Column('revenue_other', sa.VARCHAR(), autoincrement=False, nullable=True)) + op.add_column('conversions', sa.Column('revenue_total', sa.VARCHAR(), autoincrement=False, nullable=True)) + op.add_column('conversions', sa.Column('rate_plan_code', sa.VARCHAR(), autoincrement=False, nullable=True)) + op.drop_index(op.f('ix_conversions_guest_last_name'), table_name='conversions') + op.drop_index(op.f('ix_conversions_guest_first_name'), table_name='conversions') + op.drop_index(op.f('ix_conversions_guest_email'), table_name='conversions') + op.create_index(op.f('ix_conversions_sale_date'), 'conversions', ['sale_date'], unique=False) + op.drop_column('conversions', 'updated_at') + op.drop_column('conversions', 'guest_country_code') + op.drop_column('conversions', 'guest_email') + op.drop_column('conversions', 'guest_last_name') + op.drop_column('conversions', 'guest_first_name') + op.drop_index(op.f('ix_acked_requests_username'), table_name='acked_requests') + op.drop_index(op.f('ix_conversion_rooms_room_number'), table_name='conversion_rooms') + op.drop_index(op.f('ix_conversion_rooms_pms_hotel_reservation_id'), table_name='conversion_rooms') + op.drop_index(op.f('ix_conversion_rooms_departure_date'), table_name='conversion_rooms') + op.drop_index(op.f('ix_conversion_rooms_conversion_id'), table_name='conversion_rooms') + op.drop_index(op.f('ix_conversion_rooms_arrival_date'), table_name='conversion_rooms') + op.drop_table('conversion_rooms') + # ### end Alembic commands ### diff --git a/src/alpine_bits_python/db.py b/src/alpine_bits_python/db.py index 1bca0ce..4348e52 100644 --- a/src/alpine_bits_python/db.py +++ b/src/alpine_bits_python/db.py @@ -27,7 +27,31 @@ from .logging_config import get_logger _LOGGER = get_logger(__name__) -Base = declarative_base() + +# Load schema from config at module level +# This happens once when the module is imported +try: + from .config_loader import load_config + + _app_config = load_config() + _SCHEMA = _app_config.get("database", {}).get("schema") +except (FileNotFoundError, KeyError, ValueError, ImportError): + _SCHEMA = None + +# If schema isn't in config, try environment variable +if not _SCHEMA: + _SCHEMA = os.environ.get("DATABASE_SCHEMA") + + +class Base: + """Base class that applies schema to all tables.""" + + # # Set schema on all tables if configured + # if _SCHEMA: + # __table_args__ = {"schema": _SCHEMA} + + +Base = declarative_base(cls=Base) # Type variable for async functions T = TypeVar("T") @@ -60,26 +84,30 @@ def get_database_schema(config=None): Schema name string, or None if not configured """ + # Check environment variable first (takes precedence) + schema = os.environ.get("DATABASE_SCHEMA") + if schema: + return schema + # Fall back to config file if config and "database" in config and "schema" in config["database"]: return config["database"]["schema"] - return os.environ.get("DATABASE_SCHEMA") + return None -def configure_schema(schema_name=None): +def configure_schema(schema_name): """Configure the database schema for all models. - This should be called before creating tables or running migrations. - For PostgreSQL, this sets the schema for all tables. - For other databases, this is a no-op. + IMPORTANT: This must be called BEFORE any models are imported/defined. + It modifies the Base class to apply schema to all tables. Args: schema_name: Name of the schema to use (e.g., "alpinebits") """ if schema_name: - # Update the schema for all tables in Base metadata - for table in Base.metadata.tables.values(): - table.schema = schema_name + # Set __table_args__ on the Base class to apply schema to all tables + + Base.__table_args__ = {"schema": _SCHEMA} def create_database_engine(config=None, echo=False) -> AsyncEngine: @@ -102,7 +130,7 @@ def create_database_engine(config=None, echo=False) -> AsyncEngine: database_url = get_database_url(config) schema_name = get_database_schema(config) - # Configure schema for all models if specified + # # Configure schema for all models if specified if schema_name: configure_schema(schema_name) _LOGGER.info("Configured database schema: %s", schema_name)