Getting closer

This commit is contained in:
Jonas Linter
2025-11-18 13:32:29 +01:00
parent 5a660507d2
commit e7757c8c51
4 changed files with 175 additions and 359 deletions

View File

@@ -4,18 +4,13 @@ import asyncio
from logging.config import fileConfig
from alembic import context
from sqlalchemy import pool
from sqlalchemy import pool, text
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
# Import your models' Base to enable autogenerate
from alpine_bits_python.config_loader import load_config
from alpine_bits_python.db import (
Base,
configure_schema,
get_database_schema,
get_database_url,
)
from alpine_bits_python.db import Base, get_database_schema, get_database_url
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
@@ -39,14 +34,16 @@ if db_url:
config.set_main_option("sqlalchemy.url", db_url)
# Get schema name from application config
schema_name = get_database_schema(app_config)
if schema_name:
# Configure schema for all tables before migrations
configure_schema(schema_name)
SCHEMA = get_database_schema(app_config)
# add your model's MetaData object here for 'autogenerate' support
target_metadata = Base.metadata
# Configure metadata to resolve unqualified table names in the schema
# This is needed so ForeignKey("customers.id") can find "alpinebits.customers"
if SCHEMA:
target_metadata.schema = SCHEMA
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
@@ -65,11 +62,13 @@ def run_migrations_offline() -> None:
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
version_table_schema=schema_name, # Store alembic_version in our schema
include_schemas=True,
)
with context.begin_transaction():
# Set search path for offline mode if schema is configured
if SCHEMA:
print(f"Setting search_path to {SCHEMA}, public")
context.execute(f"SET search_path TO {SCHEMA}, public")
context.run_migrations()
@@ -78,37 +77,38 @@ def do_run_migrations(connection: Connection) -> None:
context.configure(
connection=connection,
target_metadata=target_metadata,
version_table_schema=schema_name, # Store alembic_version in our schema
include_schemas=True, # Allow Alembic to work with non-default schemas
)
with context.begin_transaction():
# Create schema if it doesn't exist
if SCHEMA:
connection.execute(text(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA}"))
# Set search path to our schema
print(f"setting search path to schema {SCHEMA}, ")
connection.execute(text(f"SET search_path TO {SCHEMA}"))
context.run_migrations()
async def run_async_migrations() -> None:
"""Run migrations in 'online' mode using async engine.
In this scenario we need to create an Engine
"""In this scenario we need to create an Engine
and associate a connection with the context.
"""
# Get the config section for sqlalchemy settings
configuration = config.get_section(config.config_ini_section, {})
# Add connect_args for PostgreSQL schema support if needed
if schema_name and "postgresql" in configuration.get("sqlalchemy.url", ""):
configuration["connect_args"] = {
"server_settings": {"search_path": f"{schema_name},public"}
}
# Create async engine
connectable = async_engine_from_config(
configuration,
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
if connection.dialect.name == "postgresql":
# set search path on the connection, which ensures that
# PostgreSQL will emit all CREATE / ALTER / DROP statements
# in terms of this schema by default
connection.execute(text(f"SET search_path TO {SCHEMA}"))
# in SQLAlchemy v2+ the search path change needs to be committed
connection.commit()
await connection.run_sync(do_run_migrations)
await connectable.dispose()

View File

@@ -1,320 +0,0 @@
"""Baseline existing database.
This migration handles the transition from the old manual migration system
to Alembic. It:
1. Detects if the old conversions table schema exists and recreates it with the new schema
2. Acts as a no-op for all other tables (assumes they already exist)
This allows existing databases to migrate to Alembic without data loss.
Revision ID: 94134e512a12
Revises:
Create Date: 2025-11-18 10:46:12.322570
"""
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
from sqlalchemy import inspect
# revision identifiers, used by Alembic.
revision: str = "94134e512a12"
down_revision: str | None = None
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
"""Migrate existing database to Alembic management.
This migration:
- Drops and recreates the conversions/conversion_rooms tables with new schema
- Assumes all other tables already exist (no-op for them)
"""
conn = op.get_bind()
inspector = inspect(conn)
# Get schema from alembic context (set in env.py from config)
from alpine_bits_python.config_loader import load_config
from alpine_bits_python.db import get_database_schema
try:
app_config = load_config()
schema = get_database_schema(app_config)
except Exception:
schema = None
print(f"Using schema: {schema or 'public (default)'}")
# Get tables from the correct schema
existing_tables = set(inspector.get_table_names(schema=schema))
print(f"Found existing tables in schema '{schema}': {existing_tables}")
# Handle conversions table migration
if "conversions" in existing_tables:
columns = [
col["name"] for col in inspector.get_columns("conversions", schema=schema)
]
print(f"Columns in conversions table: {columns}")
columns_set = set(columns)
print(f"DEBUG: Found columns in conversions table: {sorted(columns_set)}")
# Old schema indicators: these columns should NOT be in conversions anymore
old_schema_columns = {
"arrival_date",
"departure_date",
"room_status",
"room_number",
"sale_date",
"revenue_total",
"revenue_logis",
"revenue_board",
}
intersection = old_schema_columns & columns_set
print(f"DEBUG: Old schema columns found: {intersection}")
# If ANY of the old denormalized columns exist, this is the old schema
if intersection:
# Old schema detected, drop and recreate
print(
f"Detected old conversions schema with denormalized room data: {old_schema_columns & columns_set}"
)
# Drop conversion_rooms FIRST if it exists (due to foreign key constraint)
if "conversion_rooms" in existing_tables:
print("Dropping old conversion_rooms table...")
op.execute(
f"DROP TABLE IF EXISTS {schema}.conversion_rooms CASCADE"
if schema
else "DROP TABLE IF EXISTS conversion_rooms CASCADE"
)
print("Dropping old conversions table...")
op.execute(
f"DROP TABLE IF EXISTS {schema}.conversions CASCADE"
if schema
else "DROP TABLE IF EXISTS conversions CASCADE"
)
# Drop any orphaned indexes that may have survived the table drop
print("Dropping any orphaned indexes...")
index_names = [
"ix_conversions_advertising_campagne",
"ix_conversions_advertising_medium",
"ix_conversions_advertising_partner",
"ix_conversions_customer_id",
"ix_conversions_guest_email",
"ix_conversions_guest_first_name",
"ix_conversions_guest_last_name",
"ix_conversions_hashed_customer_id",
"ix_conversions_hotel_id",
"ix_conversions_pms_reservation_id",
"ix_conversions_reservation_id",
"ix_conversion_rooms_arrival_date",
"ix_conversion_rooms_conversion_id",
"ix_conversion_rooms_departure_date",
"ix_conversion_rooms_pms_hotel_reservation_id",
"ix_conversion_rooms_room_number",
]
for idx_name in index_names:
op.execute(
f"DROP INDEX IF EXISTS {schema}.{idx_name}" if schema else f"DROP INDEX IF EXISTS {idx_name}"
)
print("Creating new conversions table with normalized schema...")
create_conversions_table(schema)
create_conversion_rooms_table(schema)
else:
print("Conversions table already has new schema, skipping migration")
else:
# No conversions table exists, create it
print("No conversions table found, creating new schema...")
create_conversions_table(schema)
create_conversion_rooms_table(schema)
print("Baseline migration complete!")
def create_conversions_table(schema=None):
"""Create the conversions table with the new normalized schema."""
op.create_table(
"conversions",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("reservation_id", sa.Integer(), nullable=True),
sa.Column("customer_id", sa.Integer(), nullable=True),
sa.Column("hashed_customer_id", sa.Integer(), nullable=True),
sa.Column("hotel_id", sa.String(), nullable=True),
sa.Column("pms_reservation_id", sa.String(), nullable=True),
sa.Column("reservation_number", sa.String(), nullable=True),
sa.Column("reservation_date", sa.Date(), nullable=True),
sa.Column("creation_time", sa.DateTime(timezone=True), nullable=True),
sa.Column("reservation_type", sa.String(), nullable=True),
sa.Column("booking_channel", sa.String(), nullable=True),
sa.Column("guest_first_name", sa.String(), nullable=True),
sa.Column("guest_last_name", sa.String(), nullable=True),
sa.Column("guest_email", sa.String(), nullable=True),
sa.Column("guest_country_code", sa.String(), nullable=True),
sa.Column("advertising_medium", sa.String(), nullable=True),
sa.Column("advertising_partner", sa.String(), nullable=True),
sa.Column("advertising_campagne", sa.String(), nullable=True),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(
["customer_id"],
["customers.id"],
),
sa.ForeignKeyConstraint(
["hashed_customer_id"],
["hashed_customers.id"],
),
sa.ForeignKeyConstraint(
["reservation_id"],
["reservations.id"],
),
sa.PrimaryKeyConstraint("id"),
schema=schema,
)
# Create indexes
op.create_index(
op.f("ix_conversions_advertising_campagne"),
"conversions",
["advertising_campagne"],
unique=False,
schema=schema,
)
op.create_index(
op.f("ix_conversions_advertising_medium"),
"conversions",
["advertising_medium"],
unique=False,
schema=schema,
)
op.create_index(
op.f("ix_conversions_advertising_partner"),
"conversions",
["advertising_partner"],
unique=False,
schema=schema,
)
op.create_index(
op.f("ix_conversions_customer_id"), "conversions", ["customer_id"], unique=False
)
op.create_index(
op.f("ix_conversions_guest_email"), "conversions", ["guest_email"], unique=False
)
op.create_index(
op.f("ix_conversions_guest_first_name"),
"conversions",
["guest_first_name"],
unique=False,
schema=schema,
)
op.create_index(
op.f("ix_conversions_guest_last_name"),
"conversions",
["guest_last_name"],
unique=False,
schema=schema,
)
op.create_index(
op.f("ix_conversions_hashed_customer_id"),
"conversions",
["hashed_customer_id"],
unique=False,
schema=schema,
)
op.create_index(
op.f("ix_conversions_hotel_id"), "conversions", ["hotel_id"], unique=False
)
op.create_index(
op.f("ix_conversions_pms_reservation_id"),
"conversions",
["pms_reservation_id"],
unique=False,
schema=schema,
)
op.create_index(
op.f("ix_conversions_reservation_id"),
"conversions",
["reservation_id"],
unique=False,
schema=schema,
)
def create_conversion_rooms_table(schema=None):
"""Create the conversion_rooms table with the new normalized schema."""
op.create_table(
"conversion_rooms",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("conversion_id", sa.Integer(), nullable=False),
sa.Column("pms_hotel_reservation_id", sa.String(), nullable=True),
sa.Column("arrival_date", sa.Date(), nullable=True),
sa.Column("departure_date", sa.Date(), nullable=True),
sa.Column("room_status", sa.String(), nullable=True),
sa.Column("room_type", sa.String(), nullable=True),
sa.Column("room_number", sa.String(), nullable=True),
sa.Column("num_adults", sa.Integer(), nullable=True),
sa.Column("rate_plan_code", sa.String(), nullable=True),
sa.Column("connected_room_type", sa.String(), nullable=True),
sa.Column("daily_sales", sa.JSON(), nullable=True),
sa.Column("total_revenue", sa.String(), nullable=True),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(
["conversion_id"], ["conversions.id"], ondelete="CASCADE"
),
sa.PrimaryKeyConstraint("id"),
schema=schema,
)
# Create indexes
op.create_index(
op.f("ix_conversion_rooms_arrival_date"),
"conversion_rooms",
["arrival_date"],
unique=False,
schema=schema,
)
op.create_index(
op.f("ix_conversion_rooms_conversion_id"),
"conversion_rooms",
["conversion_id"],
unique=False,
schema=schema,
)
op.create_index(
op.f("ix_conversion_rooms_departure_date"),
"conversion_rooms",
["departure_date"],
unique=False,
schema=schema,
)
op.create_index(
op.f("ix_conversion_rooms_pms_hotel_reservation_id"),
"conversion_rooms",
["pms_hotel_reservation_id"],
unique=False,
schema=schema,
)
op.create_index(
op.f("ix_conversion_rooms_room_number"),
"conversion_rooms",
["room_number"],
unique=False,
schema=schema,
)
def downgrade() -> None:
"""Downgrade not supported.
This baseline migration drops data (old conversions schema) that can be
recreated from PMS XML imports. Reverting would require re-importing.
"""

View File

@@ -0,0 +1,108 @@
"""Initial migration
Revision ID: 630b0c367dcb
Revises:
Create Date: 2025-11-18 13:19:37.183397
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '630b0c367dcb'
down_revision: Union[str, Sequence[str], None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('conversion_rooms',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('conversion_id', sa.Integer(), nullable=False),
sa.Column('pms_hotel_reservation_id', sa.String(), nullable=True),
sa.Column('arrival_date', sa.Date(), nullable=True),
sa.Column('departure_date', sa.Date(), nullable=True),
sa.Column('room_status', sa.String(), nullable=True),
sa.Column('room_type', sa.String(), nullable=True),
sa.Column('room_number', sa.String(), nullable=True),
sa.Column('num_adults', sa.Integer(), nullable=True),
sa.Column('rate_plan_code', sa.String(), nullable=True),
sa.Column('connected_room_type', sa.String(), nullable=True),
sa.Column('daily_sales', sa.JSON(), nullable=True),
sa.Column('total_revenue', sa.String(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(['conversion_id'], ['alpinebits.conversions.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_conversion_rooms_arrival_date'), 'conversion_rooms', ['arrival_date'], unique=False)
op.create_index(op.f('ix_conversion_rooms_conversion_id'), 'conversion_rooms', ['conversion_id'], unique=False)
op.create_index(op.f('ix_conversion_rooms_departure_date'), 'conversion_rooms', ['departure_date'], unique=False)
op.create_index(op.f('ix_conversion_rooms_pms_hotel_reservation_id'), 'conversion_rooms', ['pms_hotel_reservation_id'], unique=False)
op.create_index(op.f('ix_conversion_rooms_room_number'), 'conversion_rooms', ['room_number'], unique=False)
op.create_index(op.f('ix_acked_requests_username'), 'acked_requests', ['username'], unique=False)
op.add_column('conversions', sa.Column('guest_first_name', sa.String(), nullable=True))
op.add_column('conversions', sa.Column('guest_last_name', sa.String(), nullable=True))
op.add_column('conversions', sa.Column('guest_email', sa.String(), nullable=True))
op.add_column('conversions', sa.Column('guest_country_code', sa.String(), nullable=True))
op.add_column('conversions', sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True))
op.drop_index(op.f('ix_conversions_sale_date'), table_name='conversions')
op.create_index(op.f('ix_conversions_guest_email'), 'conversions', ['guest_email'], unique=False)
op.create_index(op.f('ix_conversions_guest_first_name'), 'conversions', ['guest_first_name'], unique=False)
op.create_index(op.f('ix_conversions_guest_last_name'), 'conversions', ['guest_last_name'], unique=False)
op.drop_column('conversions', 'rate_plan_code')
op.drop_column('conversions', 'revenue_total')
op.drop_column('conversions', 'revenue_other')
op.drop_column('conversions', 'sale_date')
op.drop_column('conversions', 'room_status')
op.drop_column('conversions', 'revenue_board')
op.drop_column('conversions', 'departure_date')
op.drop_column('conversions', 'revenue_spa')
op.drop_column('conversions', 'num_adults')
op.drop_column('conversions', 'room_type')
op.drop_column('conversions', 'revenue_logis')
op.drop_column('conversions', 'room_number')
op.drop_column('conversions', 'arrival_date')
op.drop_column('conversions', 'revenue_fb')
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('conversions', sa.Column('revenue_fb', sa.VARCHAR(), autoincrement=False, nullable=True))
op.add_column('conversions', sa.Column('arrival_date', sa.DATE(), autoincrement=False, nullable=True))
op.add_column('conversions', sa.Column('room_number', sa.VARCHAR(), autoincrement=False, nullable=True))
op.add_column('conversions', sa.Column('revenue_logis', sa.VARCHAR(), autoincrement=False, nullable=True))
op.add_column('conversions', sa.Column('room_type', sa.VARCHAR(), autoincrement=False, nullable=True))
op.add_column('conversions', sa.Column('num_adults', sa.INTEGER(), autoincrement=False, nullable=True))
op.add_column('conversions', sa.Column('revenue_spa', sa.VARCHAR(), autoincrement=False, nullable=True))
op.add_column('conversions', sa.Column('departure_date', sa.DATE(), autoincrement=False, nullable=True))
op.add_column('conversions', sa.Column('revenue_board', sa.VARCHAR(), autoincrement=False, nullable=True))
op.add_column('conversions', sa.Column('room_status', sa.VARCHAR(), autoincrement=False, nullable=True))
op.add_column('conversions', sa.Column('sale_date', sa.DATE(), autoincrement=False, nullable=True))
op.add_column('conversions', sa.Column('revenue_other', sa.VARCHAR(), autoincrement=False, nullable=True))
op.add_column('conversions', sa.Column('revenue_total', sa.VARCHAR(), autoincrement=False, nullable=True))
op.add_column('conversions', sa.Column('rate_plan_code', sa.VARCHAR(), autoincrement=False, nullable=True))
op.drop_index(op.f('ix_conversions_guest_last_name'), table_name='conversions')
op.drop_index(op.f('ix_conversions_guest_first_name'), table_name='conversions')
op.drop_index(op.f('ix_conversions_guest_email'), table_name='conversions')
op.create_index(op.f('ix_conversions_sale_date'), 'conversions', ['sale_date'], unique=False)
op.drop_column('conversions', 'updated_at')
op.drop_column('conversions', 'guest_country_code')
op.drop_column('conversions', 'guest_email')
op.drop_column('conversions', 'guest_last_name')
op.drop_column('conversions', 'guest_first_name')
op.drop_index(op.f('ix_acked_requests_username'), table_name='acked_requests')
op.drop_index(op.f('ix_conversion_rooms_room_number'), table_name='conversion_rooms')
op.drop_index(op.f('ix_conversion_rooms_pms_hotel_reservation_id'), table_name='conversion_rooms')
op.drop_index(op.f('ix_conversion_rooms_departure_date'), table_name='conversion_rooms')
op.drop_index(op.f('ix_conversion_rooms_conversion_id'), table_name='conversion_rooms')
op.drop_index(op.f('ix_conversion_rooms_arrival_date'), table_name='conversion_rooms')
op.drop_table('conversion_rooms')
# ### end Alembic commands ###

View File

@@ -27,7 +27,31 @@ from .logging_config import get_logger
_LOGGER = get_logger(__name__)
Base = declarative_base()
# Load schema from config at module level
# This happens once when the module is imported
try:
from .config_loader import load_config
_app_config = load_config()
_SCHEMA = _app_config.get("database", {}).get("schema")
except (FileNotFoundError, KeyError, ValueError, ImportError):
_SCHEMA = None
# If schema isn't in config, try environment variable
if not _SCHEMA:
_SCHEMA = os.environ.get("DATABASE_SCHEMA")
class Base:
"""Base class that applies schema to all tables."""
# # Set schema on all tables if configured
# if _SCHEMA:
# __table_args__ = {"schema": _SCHEMA}
Base = declarative_base(cls=Base)
# Type variable for async functions
T = TypeVar("T")
@@ -60,26 +84,30 @@ def get_database_schema(config=None):
Schema name string, or None if not configured
"""
# Check environment variable first (takes precedence)
schema = os.environ.get("DATABASE_SCHEMA")
if schema:
return schema
# Fall back to config file
if config and "database" in config and "schema" in config["database"]:
return config["database"]["schema"]
return os.environ.get("DATABASE_SCHEMA")
return None
def configure_schema(schema_name=None):
def configure_schema(schema_name):
"""Configure the database schema for all models.
This should be called before creating tables or running migrations.
For PostgreSQL, this sets the schema for all tables.
For other databases, this is a no-op.
IMPORTANT: This must be called BEFORE any models are imported/defined.
It modifies the Base class to apply schema to all tables.
Args:
schema_name: Name of the schema to use (e.g., "alpinebits")
"""
if schema_name:
# Update the schema for all tables in Base metadata
for table in Base.metadata.tables.values():
table.schema = schema_name
# Set __table_args__ on the Base class to apply schema to all tables
Base.__table_args__ = {"schema": _SCHEMA}
def create_database_engine(config=None, echo=False) -> AsyncEngine:
@@ -102,7 +130,7 @@ def create_database_engine(config=None, echo=False) -> AsyncEngine:
database_url = get_database_url(config)
schema_name = get_database_schema(config)
# Configure schema for all models if specified
# # Configure schema for all models if specified
if schema_name:
configure_schema(schema_name)
_LOGGER.info("Configured database schema: %s", schema_name)