From e479381374bfd8bbd50e420e2e013939065fac29 Mon Sep 17 00:00:00 2001 From: Jonas Linter Date: Thu, 16 Oct 2025 16:56:27 +0200 Subject: [PATCH] Migration script in place --- .gitignore | 3 + config/postgres.yaml.example | 14 + .../util/migrate_sqlite_to_postgres.py | 455 ++++++++++++++++++ 3 files changed, 472 insertions(+) create mode 100644 config/postgres.yaml.example create mode 100644 src/alpine_bits_python/util/migrate_sqlite_to_postgres.py diff --git a/.gitignore b/.gitignore index 7fc53e2..e05a15e 100644 --- a/.gitignore +++ b/.gitignore @@ -25,6 +25,9 @@ logs/* # ignore secrets secrets.yaml +# ignore PostgreSQL config (contains credentials) +config/postgres.yaml + # ignore db alpinebits.db diff --git a/config/postgres.yaml.example b/config/postgres.yaml.example new file mode 100644 index 0000000..d746353 --- /dev/null +++ b/config/postgres.yaml.example @@ -0,0 +1,14 @@ +# PostgreSQL configuration for migration +# Copy this file to postgres.yaml and fill in your PostgreSQL credentials +# This file should NOT be committed to git (add postgres.yaml to .gitignore) + +database: + url: "postgresql+asyncpg://username:password@hostname:5432/database_name" + # Example: "postgresql+asyncpg://alpinebits_user:your_password@localhost:5432/alpinebits" + +# If using annotatedyaml secrets: +# database: +# url: !secret POSTGRES_URL +# +# Then in secrets.yaml: +# POSTGRES_URL: "postgresql+asyncpg://username:password@hostname:5432/database_name" diff --git a/src/alpine_bits_python/util/migrate_sqlite_to_postgres.py b/src/alpine_bits_python/util/migrate_sqlite_to_postgres.py new file mode 100644 index 0000000..39e963b --- /dev/null +++ b/src/alpine_bits_python/util/migrate_sqlite_to_postgres.py @@ -0,0 +1,455 @@ +#!/usr/bin/env python3 +"""Migration script to copy data from SQLite to PostgreSQL. + +This script: +1. Connects to both SQLite and PostgreSQL databases +2. Reads all data from SQLite using SQLAlchemy models +3. Writes data to PostgreSQL using the same models +4. Ensures data integrity and provides progress feedback + +Prerequisites: +- PostgreSQL database must be created and empty (or you can use --drop-tables flag) +- asyncpg must be installed: uv pip install asyncpg +- Configure target PostgreSQL URL in config.yaml or via DATABASE_URL env var + +Usage: + # Dry run (preview what will be migrated) + uv run python -m alpine_bits_python.util.migrate_sqlite_to_postgres --dry-run + + # Actual migration using target config file + uv run python -m alpine_bits_python.util.migrate_sqlite_to_postgres \ + --target-config config/postgres.yaml + + # Drop existing tables first (careful!) + uv run python -m alpine_bits_python.util.migrate_sqlite_to_postgres \ + --target-config config/postgres.yaml --drop-tables + + # Alternative: use DATABASE_URL environment variable + DATABASE_URL="postgresql+asyncpg://user:pass@host/db" \ + uv run python -m alpine_bits_python.util.migrate_sqlite_to_postgres + + # Alternative: specify URLs directly + uv run python -m alpine_bits_python.util.migrate_sqlite_to_postgres \ + --source sqlite+aiosqlite:///old.db \ + --target postgresql+asyncpg://user:pass@localhost/dbname +""" + +import argparse +import asyncio +import sys +from pathlib import Path + +# Add parent directory to path so we can import alpine_bits_python +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine + +from alpine_bits_python.config_loader import load_config +from alpine_bits_python.db import ( + AckedRequest, + Base, + Customer, + HashedCustomer, + Reservation, + get_database_url, +) +from alpine_bits_python.logging_config import get_logger, setup_logging + +_LOGGER = get_logger(__name__) + + +def mask_db_url(url: str) -> str: + """Mask sensitive parts of database URL for logging.""" + if "://" not in url: + return url + protocol, rest = url.split("://", 1) + if "@" in rest: + credentials, location = rest.split("@", 1) + return f"{protocol}://***:***@{location}" + return url + + +async def get_table_counts(session: AsyncSession) -> dict[str, int]: + """Get row counts for all tables.""" + counts = {} + + # Count customers + result = await session.execute(select(Customer)) + counts["customers"] = len(result.scalars().all()) + + # Count hashed_customers + result = await session.execute(select(HashedCustomer)) + counts["hashed_customers"] = len(result.scalars().all()) + + # Count reservations + result = await session.execute(select(Reservation)) + counts["reservations"] = len(result.scalars().all()) + + # Count acked_requests + result = await session.execute(select(AckedRequest)) + counts["acked_requests"] = len(result.scalars().all()) + + return counts + + +async def migrate_data( + source_url: str, + target_url: str, + dry_run: bool = False, + drop_tables: bool = False, +) -> None: + """Migrate data from source database to target database. + + Args: + source_url: Source database URL (SQLite) + target_url: Target database URL (PostgreSQL) + dry_run: If True, only preview what would be migrated + drop_tables: If True, drop existing tables in target before creating + """ + _LOGGER.info("=" * 70) + _LOGGER.info("SQLite to PostgreSQL Migration") + _LOGGER.info("=" * 70) + _LOGGER.info("Source: %s", mask_db_url(source_url)) + _LOGGER.info("Target: %s", mask_db_url(target_url)) + _LOGGER.info("Mode: %s", "DRY RUN" if dry_run else "LIVE MIGRATION") + _LOGGER.info("=" * 70) + + # Create engines + _LOGGER.info("Creating database connections...") + source_engine = create_async_engine(source_url, echo=False) + target_engine = create_async_engine(target_url, echo=False) + + # Create session makers + SourceSession = async_sessionmaker(source_engine, expire_on_commit=False) + TargetSession = async_sessionmaker(target_engine, expire_on_commit=False) + + try: + # Check source database + _LOGGER.info("\nChecking source database...") + async with SourceSession() as source_session: + source_counts = await get_table_counts(source_session) + + _LOGGER.info("Source database contains:") + for table, count in source_counts.items(): + _LOGGER.info(" - %s: %d rows", table, count) + + total_rows = sum(source_counts.values()) + if total_rows == 0: + _LOGGER.warning("Source database is empty. Nothing to migrate.") + return + + if dry_run: + _LOGGER.info("\n" + "=" * 70) + _LOGGER.info("DRY RUN: Would migrate %d total rows", total_rows) + _LOGGER.info("=" * 70) + return + + # Prepare target database + _LOGGER.info("\nPreparing target database...") + + if drop_tables: + _LOGGER.warning("Dropping existing tables in target database...") + async with target_engine.begin() as conn: + await conn.run_sync(Base.metadata.drop_all) + _LOGGER.info("Tables dropped.") + + _LOGGER.info("Creating tables in target database...") + async with target_engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + _LOGGER.info("Tables created.") + + # Check if target already has data + _LOGGER.info("\nChecking target database...") + async with TargetSession() as target_session: + target_counts = await get_table_counts(target_session) + + if sum(target_counts.values()) > 0: + _LOGGER.warning("Target database is not empty:") + for table, count in target_counts.items(): + if count > 0: + _LOGGER.warning(" - %s: %d rows", table, count) + + response = input("\nContinue anyway? This may cause conflicts. (yes/no): ") + if response.lower() != "yes": + _LOGGER.info("Migration cancelled.") + return + + # Migrate data table by table + _LOGGER.info("\n" + "=" * 70) + _LOGGER.info("Starting data migration...") + _LOGGER.info("=" * 70) + + # 1. Migrate Customers first (no dependencies) + _LOGGER.info("\n[1/4] Migrating Customers...") + async with SourceSession() as source_session: + result = await source_session.execute(select(Customer)) + customers = result.scalars().all() + + if customers: + async with TargetSession() as target_session: + for i, customer in enumerate(customers, 1): + # Create new instance with same data + new_customer = Customer( + id=customer.id, + given_name=customer.given_name, + contact_id=customer.contact_id, + surname=customer.surname, + name_prefix=customer.name_prefix, + email_address=customer.email_address, + phone=customer.phone, + email_newsletter=customer.email_newsletter, + address_line=customer.address_line, + city_name=customer.city_name, + postal_code=customer.postal_code, + country_code=customer.country_code, + gender=customer.gender, + birth_date=customer.birth_date, + language=customer.language, + address_catalog=customer.address_catalog, + name_title=customer.name_title, + ) + target_session.add(new_customer) + + if i % 100 == 0: + _LOGGER.info(" Progress: %d/%d customers", i, len(customers)) + + await target_session.commit() + + _LOGGER.info("✓ Migrated %d customers", len(customers)) + + # 2. Migrate HashedCustomers (depends on Customers) + _LOGGER.info("\n[2/4] Migrating HashedCustomers...") + async with SourceSession() as source_session: + result = await source_session.execute(select(HashedCustomer)) + hashed_customers = result.scalars().all() + + if hashed_customers: + async with TargetSession() as target_session: + for i, hashed in enumerate(hashed_customers, 1): + new_hashed = HashedCustomer( + id=hashed.id, + customer_id=hashed.customer_id, + contact_id=hashed.contact_id, + hashed_email=hashed.hashed_email, + hashed_phone=hashed.hashed_phone, + hashed_given_name=hashed.hashed_given_name, + hashed_surname=hashed.hashed_surname, + hashed_city=hashed.hashed_city, + hashed_postal_code=hashed.hashed_postal_code, + hashed_country_code=hashed.hashed_country_code, + hashed_gender=hashed.hashed_gender, + hashed_birth_date=hashed.hashed_birth_date, + created_at=hashed.created_at, + ) + target_session.add(new_hashed) + + if i % 100 == 0: + _LOGGER.info(" Progress: %d/%d hashed customers", i, len(hashed_customers)) + + await target_session.commit() + + _LOGGER.info("✓ Migrated %d hashed customers", len(hashed_customers)) + + # 3. Migrate Reservations (depends on Customers) + _LOGGER.info("\n[3/4] Migrating Reservations...") + async with SourceSession() as source_session: + result = await source_session.execute(select(Reservation)) + reservations = result.scalars().all() + + if reservations: + async with TargetSession() as target_session: + for i, reservation in enumerate(reservations, 1): + new_reservation = Reservation( + id=reservation.id, + customer_id=reservation.customer_id, + unique_id=reservation.unique_id, + md5_unique_id=reservation.md5_unique_id, + start_date=reservation.start_date, + end_date=reservation.end_date, + num_adults=reservation.num_adults, + num_children=reservation.num_children, + children_ages=reservation.children_ages, + offer=reservation.offer, + created_at=reservation.created_at, + utm_source=reservation.utm_source, + utm_medium=reservation.utm_medium, + utm_campaign=reservation.utm_campaign, + utm_term=reservation.utm_term, + utm_content=reservation.utm_content, + user_comment=reservation.user_comment, + fbclid=reservation.fbclid, + gclid=reservation.gclid, + hotel_code=reservation.hotel_code, + hotel_name=reservation.hotel_name, + room_type_code=reservation.room_type_code, + room_classification_code=reservation.room_classification_code, + room_type=reservation.room_type, + ) + target_session.add(new_reservation) + + if i % 100 == 0: + _LOGGER.info(" Progress: %d/%d reservations", i, len(reservations)) + + await target_session.commit() + + _LOGGER.info("✓ Migrated %d reservations", len(reservations)) + + # 4. Migrate AckedRequests (no dependencies) + _LOGGER.info("\n[4/4] Migrating AckedRequests...") + async with SourceSession() as source_session: + result = await source_session.execute(select(AckedRequest)) + acked_requests = result.scalars().all() + + if acked_requests: + async with TargetSession() as target_session: + for i, acked in enumerate(acked_requests, 1): + new_acked = AckedRequest( + id=acked.id, + client_id=acked.client_id, + unique_id=acked.unique_id, + timestamp=acked.timestamp, + ) + target_session.add(new_acked) + + if i % 100 == 0: + _LOGGER.info(" Progress: %d/%d acked requests", i, len(acked_requests)) + + await target_session.commit() + + _LOGGER.info("✓ Migrated %d acked requests", len(acked_requests)) + + # Verify migration + _LOGGER.info("\n" + "=" * 70) + _LOGGER.info("Verifying migration...") + _LOGGER.info("=" * 70) + + async with TargetSession() as target_session: + final_counts = await get_table_counts(target_session) + + _LOGGER.info("Target database now contains:") + all_match = True + for table, count in final_counts.items(): + source_count = source_counts[table] + match = "✓" if count == source_count else "✗" + _LOGGER.info(" %s %s: %d rows (source: %d)", match, table, count, source_count) + if count != source_count: + all_match = False + + if all_match: + _LOGGER.info("\n" + "=" * 70) + _LOGGER.info("✓ Migration completed successfully!") + _LOGGER.info("=" * 70) + _LOGGER.info("\nNext steps:") + _LOGGER.info("1. Test your application with PostgreSQL") + _LOGGER.info("2. Update config.yaml or DATABASE_URL to use PostgreSQL") + _LOGGER.info("3. Keep SQLite backup until you're confident everything works") + else: + _LOGGER.error("\n" + "=" * 70) + _LOGGER.error("✗ Migration completed with mismatches!") + _LOGGER.error("=" * 70) + _LOGGER.error("Please review the counts above and investigate.") + + except Exception as e: + _LOGGER.exception("Migration failed: %s", e) + raise + + finally: + await source_engine.dispose() + await target_engine.dispose() + + +async def main(): + """Run the migration.""" + parser = argparse.ArgumentParser( + description="Migrate data from SQLite to PostgreSQL", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + parser.add_argument( + "--source", + help="Source database URL (default: from config or sqlite+aiosqlite:///alpinebits.db)", + ) + parser.add_argument( + "--target", + help=( + "Target database URL " + "(default: from DATABASE_URL env var or --target-config)" + ), + ) + parser.add_argument( + "--target-config", + help=( + "Path to config file containing target PostgreSQL database URL " + "(keeps password out of bash history)" + ), + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Preview migration without making changes", + ) + parser.add_argument( + "--drop-tables", + action="store_true", + help="Drop existing tables in target database before migration", + ) + + args = parser.parse_args() + + try: + # Load config + config = load_config() + setup_logging(config) + except Exception as e: + _LOGGER.warning("Failed to load config: %s. Using defaults.", e) + config = {} + + # Determine source URL (default to SQLite) + if args.source: + source_url = args.source + else: + source_url = get_database_url(config) + if "sqlite" not in source_url: + _LOGGER.error("Source database must be SQLite. Use --source to specify.") + sys.exit(1) + + # Determine target URL (must be PostgreSQL) + if args.target: + target_url = args.target + elif args.target_config: + # Load target config file + _LOGGER.info("Loading target database config from: %s", args.target_config) + try: + target_config = load_config(args.target_config) + target_url = get_database_url(target_config) + _LOGGER.info("Successfully loaded target config") + except (FileNotFoundError, ValueError, KeyError): + _LOGGER.exception("Failed to load target config") + sys.exit(1) + else: + import os + target_url = os.environ.get("DATABASE_URL") + if not target_url: + _LOGGER.error("Target database URL not specified.") + _LOGGER.error("Specify target database using one of:") + _LOGGER.error(" - --target-config config/postgres.yaml") + _LOGGER.error(" - DATABASE_URL environment variable") + _LOGGER.error(" - --target postgresql+asyncpg://user:pass@host/db") + sys.exit(1) + + if "postgresql" not in target_url and "postgres" not in target_url: + _LOGGER.error("Target database must be PostgreSQL.") + sys.exit(1) + + # Run migration + await migrate_data( + source_url=source_url, + target_url=target_url, + dry_run=args.dry_run, + drop_tables=args.drop_tables, + ) + + +if __name__ == "__main__": + asyncio.run(main())