Migration script in place
This commit is contained in:
455
src/alpine_bits_python/util/migrate_sqlite_to_postgres.py
Normal file
455
src/alpine_bits_python/util/migrate_sqlite_to_postgres.py
Normal file
@@ -0,0 +1,455 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Migration script to copy data from SQLite to PostgreSQL.
|
||||
|
||||
This script:
|
||||
1. Connects to both SQLite and PostgreSQL databases
|
||||
2. Reads all data from SQLite using SQLAlchemy models
|
||||
3. Writes data to PostgreSQL using the same models
|
||||
4. Ensures data integrity and provides progress feedback
|
||||
|
||||
Prerequisites:
|
||||
- PostgreSQL database must be created and empty (or you can use --drop-tables flag)
|
||||
- asyncpg must be installed: uv pip install asyncpg
|
||||
- Configure target PostgreSQL URL in config.yaml or via DATABASE_URL env var
|
||||
|
||||
Usage:
|
||||
# Dry run (preview what will be migrated)
|
||||
uv run python -m alpine_bits_python.util.migrate_sqlite_to_postgres --dry-run
|
||||
|
||||
# Actual migration using target config file
|
||||
uv run python -m alpine_bits_python.util.migrate_sqlite_to_postgres \
|
||||
--target-config config/postgres.yaml
|
||||
|
||||
# Drop existing tables first (careful!)
|
||||
uv run python -m alpine_bits_python.util.migrate_sqlite_to_postgres \
|
||||
--target-config config/postgres.yaml --drop-tables
|
||||
|
||||
# Alternative: use DATABASE_URL environment variable
|
||||
DATABASE_URL="postgresql+asyncpg://user:pass@host/db" \
|
||||
uv run python -m alpine_bits_python.util.migrate_sqlite_to_postgres
|
||||
|
||||
# Alternative: specify URLs directly
|
||||
uv run python -m alpine_bits_python.util.migrate_sqlite_to_postgres \
|
||||
--source sqlite+aiosqlite:///old.db \
|
||||
--target postgresql+asyncpg://user:pass@localhost/dbname
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Add parent directory to path so we can import alpine_bits_python
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
||||
|
||||
from alpine_bits_python.config_loader import load_config
|
||||
from alpine_bits_python.db import (
|
||||
AckedRequest,
|
||||
Base,
|
||||
Customer,
|
||||
HashedCustomer,
|
||||
Reservation,
|
||||
get_database_url,
|
||||
)
|
||||
from alpine_bits_python.logging_config import get_logger, setup_logging
|
||||
|
||||
_LOGGER = get_logger(__name__)
|
||||
|
||||
|
||||
def mask_db_url(url: str) -> str:
|
||||
"""Mask sensitive parts of database URL for logging."""
|
||||
if "://" not in url:
|
||||
return url
|
||||
protocol, rest = url.split("://", 1)
|
||||
if "@" in rest:
|
||||
credentials, location = rest.split("@", 1)
|
||||
return f"{protocol}://***:***@{location}"
|
||||
return url
|
||||
|
||||
|
||||
async def get_table_counts(session: AsyncSession) -> dict[str, int]:
|
||||
"""Get row counts for all tables."""
|
||||
counts = {}
|
||||
|
||||
# Count customers
|
||||
result = await session.execute(select(Customer))
|
||||
counts["customers"] = len(result.scalars().all())
|
||||
|
||||
# Count hashed_customers
|
||||
result = await session.execute(select(HashedCustomer))
|
||||
counts["hashed_customers"] = len(result.scalars().all())
|
||||
|
||||
# Count reservations
|
||||
result = await session.execute(select(Reservation))
|
||||
counts["reservations"] = len(result.scalars().all())
|
||||
|
||||
# Count acked_requests
|
||||
result = await session.execute(select(AckedRequest))
|
||||
counts["acked_requests"] = len(result.scalars().all())
|
||||
|
||||
return counts
|
||||
|
||||
|
||||
async def migrate_data(
|
||||
source_url: str,
|
||||
target_url: str,
|
||||
dry_run: bool = False,
|
||||
drop_tables: bool = False,
|
||||
) -> None:
|
||||
"""Migrate data from source database to target database.
|
||||
|
||||
Args:
|
||||
source_url: Source database URL (SQLite)
|
||||
target_url: Target database URL (PostgreSQL)
|
||||
dry_run: If True, only preview what would be migrated
|
||||
drop_tables: If True, drop existing tables in target before creating
|
||||
"""
|
||||
_LOGGER.info("=" * 70)
|
||||
_LOGGER.info("SQLite to PostgreSQL Migration")
|
||||
_LOGGER.info("=" * 70)
|
||||
_LOGGER.info("Source: %s", mask_db_url(source_url))
|
||||
_LOGGER.info("Target: %s", mask_db_url(target_url))
|
||||
_LOGGER.info("Mode: %s", "DRY RUN" if dry_run else "LIVE MIGRATION")
|
||||
_LOGGER.info("=" * 70)
|
||||
|
||||
# Create engines
|
||||
_LOGGER.info("Creating database connections...")
|
||||
source_engine = create_async_engine(source_url, echo=False)
|
||||
target_engine = create_async_engine(target_url, echo=False)
|
||||
|
||||
# Create session makers
|
||||
SourceSession = async_sessionmaker(source_engine, expire_on_commit=False)
|
||||
TargetSession = async_sessionmaker(target_engine, expire_on_commit=False)
|
||||
|
||||
try:
|
||||
# Check source database
|
||||
_LOGGER.info("\nChecking source database...")
|
||||
async with SourceSession() as source_session:
|
||||
source_counts = await get_table_counts(source_session)
|
||||
|
||||
_LOGGER.info("Source database contains:")
|
||||
for table, count in source_counts.items():
|
||||
_LOGGER.info(" - %s: %d rows", table, count)
|
||||
|
||||
total_rows = sum(source_counts.values())
|
||||
if total_rows == 0:
|
||||
_LOGGER.warning("Source database is empty. Nothing to migrate.")
|
||||
return
|
||||
|
||||
if dry_run:
|
||||
_LOGGER.info("\n" + "=" * 70)
|
||||
_LOGGER.info("DRY RUN: Would migrate %d total rows", total_rows)
|
||||
_LOGGER.info("=" * 70)
|
||||
return
|
||||
|
||||
# Prepare target database
|
||||
_LOGGER.info("\nPreparing target database...")
|
||||
|
||||
if drop_tables:
|
||||
_LOGGER.warning("Dropping existing tables in target database...")
|
||||
async with target_engine.begin() as conn:
|
||||
await conn.run_sync(Base.metadata.drop_all)
|
||||
_LOGGER.info("Tables dropped.")
|
||||
|
||||
_LOGGER.info("Creating tables in target database...")
|
||||
async with target_engine.begin() as conn:
|
||||
await conn.run_sync(Base.metadata.create_all)
|
||||
_LOGGER.info("Tables created.")
|
||||
|
||||
# Check if target already has data
|
||||
_LOGGER.info("\nChecking target database...")
|
||||
async with TargetSession() as target_session:
|
||||
target_counts = await get_table_counts(target_session)
|
||||
|
||||
if sum(target_counts.values()) > 0:
|
||||
_LOGGER.warning("Target database is not empty:")
|
||||
for table, count in target_counts.items():
|
||||
if count > 0:
|
||||
_LOGGER.warning(" - %s: %d rows", table, count)
|
||||
|
||||
response = input("\nContinue anyway? This may cause conflicts. (yes/no): ")
|
||||
if response.lower() != "yes":
|
||||
_LOGGER.info("Migration cancelled.")
|
||||
return
|
||||
|
||||
# Migrate data table by table
|
||||
_LOGGER.info("\n" + "=" * 70)
|
||||
_LOGGER.info("Starting data migration...")
|
||||
_LOGGER.info("=" * 70)
|
||||
|
||||
# 1. Migrate Customers first (no dependencies)
|
||||
_LOGGER.info("\n[1/4] Migrating Customers...")
|
||||
async with SourceSession() as source_session:
|
||||
result = await source_session.execute(select(Customer))
|
||||
customers = result.scalars().all()
|
||||
|
||||
if customers:
|
||||
async with TargetSession() as target_session:
|
||||
for i, customer in enumerate(customers, 1):
|
||||
# Create new instance with same data
|
||||
new_customer = Customer(
|
||||
id=customer.id,
|
||||
given_name=customer.given_name,
|
||||
contact_id=customer.contact_id,
|
||||
surname=customer.surname,
|
||||
name_prefix=customer.name_prefix,
|
||||
email_address=customer.email_address,
|
||||
phone=customer.phone,
|
||||
email_newsletter=customer.email_newsletter,
|
||||
address_line=customer.address_line,
|
||||
city_name=customer.city_name,
|
||||
postal_code=customer.postal_code,
|
||||
country_code=customer.country_code,
|
||||
gender=customer.gender,
|
||||
birth_date=customer.birth_date,
|
||||
language=customer.language,
|
||||
address_catalog=customer.address_catalog,
|
||||
name_title=customer.name_title,
|
||||
)
|
||||
target_session.add(new_customer)
|
||||
|
||||
if i % 100 == 0:
|
||||
_LOGGER.info(" Progress: %d/%d customers", i, len(customers))
|
||||
|
||||
await target_session.commit()
|
||||
|
||||
_LOGGER.info("✓ Migrated %d customers", len(customers))
|
||||
|
||||
# 2. Migrate HashedCustomers (depends on Customers)
|
||||
_LOGGER.info("\n[2/4] Migrating HashedCustomers...")
|
||||
async with SourceSession() as source_session:
|
||||
result = await source_session.execute(select(HashedCustomer))
|
||||
hashed_customers = result.scalars().all()
|
||||
|
||||
if hashed_customers:
|
||||
async with TargetSession() as target_session:
|
||||
for i, hashed in enumerate(hashed_customers, 1):
|
||||
new_hashed = HashedCustomer(
|
||||
id=hashed.id,
|
||||
customer_id=hashed.customer_id,
|
||||
contact_id=hashed.contact_id,
|
||||
hashed_email=hashed.hashed_email,
|
||||
hashed_phone=hashed.hashed_phone,
|
||||
hashed_given_name=hashed.hashed_given_name,
|
||||
hashed_surname=hashed.hashed_surname,
|
||||
hashed_city=hashed.hashed_city,
|
||||
hashed_postal_code=hashed.hashed_postal_code,
|
||||
hashed_country_code=hashed.hashed_country_code,
|
||||
hashed_gender=hashed.hashed_gender,
|
||||
hashed_birth_date=hashed.hashed_birth_date,
|
||||
created_at=hashed.created_at,
|
||||
)
|
||||
target_session.add(new_hashed)
|
||||
|
||||
if i % 100 == 0:
|
||||
_LOGGER.info(" Progress: %d/%d hashed customers", i, len(hashed_customers))
|
||||
|
||||
await target_session.commit()
|
||||
|
||||
_LOGGER.info("✓ Migrated %d hashed customers", len(hashed_customers))
|
||||
|
||||
# 3. Migrate Reservations (depends on Customers)
|
||||
_LOGGER.info("\n[3/4] Migrating Reservations...")
|
||||
async with SourceSession() as source_session:
|
||||
result = await source_session.execute(select(Reservation))
|
||||
reservations = result.scalars().all()
|
||||
|
||||
if reservations:
|
||||
async with TargetSession() as target_session:
|
||||
for i, reservation in enumerate(reservations, 1):
|
||||
new_reservation = Reservation(
|
||||
id=reservation.id,
|
||||
customer_id=reservation.customer_id,
|
||||
unique_id=reservation.unique_id,
|
||||
md5_unique_id=reservation.md5_unique_id,
|
||||
start_date=reservation.start_date,
|
||||
end_date=reservation.end_date,
|
||||
num_adults=reservation.num_adults,
|
||||
num_children=reservation.num_children,
|
||||
children_ages=reservation.children_ages,
|
||||
offer=reservation.offer,
|
||||
created_at=reservation.created_at,
|
||||
utm_source=reservation.utm_source,
|
||||
utm_medium=reservation.utm_medium,
|
||||
utm_campaign=reservation.utm_campaign,
|
||||
utm_term=reservation.utm_term,
|
||||
utm_content=reservation.utm_content,
|
||||
user_comment=reservation.user_comment,
|
||||
fbclid=reservation.fbclid,
|
||||
gclid=reservation.gclid,
|
||||
hotel_code=reservation.hotel_code,
|
||||
hotel_name=reservation.hotel_name,
|
||||
room_type_code=reservation.room_type_code,
|
||||
room_classification_code=reservation.room_classification_code,
|
||||
room_type=reservation.room_type,
|
||||
)
|
||||
target_session.add(new_reservation)
|
||||
|
||||
if i % 100 == 0:
|
||||
_LOGGER.info(" Progress: %d/%d reservations", i, len(reservations))
|
||||
|
||||
await target_session.commit()
|
||||
|
||||
_LOGGER.info("✓ Migrated %d reservations", len(reservations))
|
||||
|
||||
# 4. Migrate AckedRequests (no dependencies)
|
||||
_LOGGER.info("\n[4/4] Migrating AckedRequests...")
|
||||
async with SourceSession() as source_session:
|
||||
result = await source_session.execute(select(AckedRequest))
|
||||
acked_requests = result.scalars().all()
|
||||
|
||||
if acked_requests:
|
||||
async with TargetSession() as target_session:
|
||||
for i, acked in enumerate(acked_requests, 1):
|
||||
new_acked = AckedRequest(
|
||||
id=acked.id,
|
||||
client_id=acked.client_id,
|
||||
unique_id=acked.unique_id,
|
||||
timestamp=acked.timestamp,
|
||||
)
|
||||
target_session.add(new_acked)
|
||||
|
||||
if i % 100 == 0:
|
||||
_LOGGER.info(" Progress: %d/%d acked requests", i, len(acked_requests))
|
||||
|
||||
await target_session.commit()
|
||||
|
||||
_LOGGER.info("✓ Migrated %d acked requests", len(acked_requests))
|
||||
|
||||
# Verify migration
|
||||
_LOGGER.info("\n" + "=" * 70)
|
||||
_LOGGER.info("Verifying migration...")
|
||||
_LOGGER.info("=" * 70)
|
||||
|
||||
async with TargetSession() as target_session:
|
||||
final_counts = await get_table_counts(target_session)
|
||||
|
||||
_LOGGER.info("Target database now contains:")
|
||||
all_match = True
|
||||
for table, count in final_counts.items():
|
||||
source_count = source_counts[table]
|
||||
match = "✓" if count == source_count else "✗"
|
||||
_LOGGER.info(" %s %s: %d rows (source: %d)", match, table, count, source_count)
|
||||
if count != source_count:
|
||||
all_match = False
|
||||
|
||||
if all_match:
|
||||
_LOGGER.info("\n" + "=" * 70)
|
||||
_LOGGER.info("✓ Migration completed successfully!")
|
||||
_LOGGER.info("=" * 70)
|
||||
_LOGGER.info("\nNext steps:")
|
||||
_LOGGER.info("1. Test your application with PostgreSQL")
|
||||
_LOGGER.info("2. Update config.yaml or DATABASE_URL to use PostgreSQL")
|
||||
_LOGGER.info("3. Keep SQLite backup until you're confident everything works")
|
||||
else:
|
||||
_LOGGER.error("\n" + "=" * 70)
|
||||
_LOGGER.error("✗ Migration completed with mismatches!")
|
||||
_LOGGER.error("=" * 70)
|
||||
_LOGGER.error("Please review the counts above and investigate.")
|
||||
|
||||
except Exception as e:
|
||||
_LOGGER.exception("Migration failed: %s", e)
|
||||
raise
|
||||
|
||||
finally:
|
||||
await source_engine.dispose()
|
||||
await target_engine.dispose()
|
||||
|
||||
|
||||
async def main():
|
||||
"""Run the migration."""
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Migrate data from SQLite to PostgreSQL",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog=__doc__,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--source",
|
||||
help="Source database URL (default: from config or sqlite+aiosqlite:///alpinebits.db)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--target",
|
||||
help=(
|
||||
"Target database URL "
|
||||
"(default: from DATABASE_URL env var or --target-config)"
|
||||
),
|
||||
)
|
||||
parser.add_argument(
|
||||
"--target-config",
|
||||
help=(
|
||||
"Path to config file containing target PostgreSQL database URL "
|
||||
"(keeps password out of bash history)"
|
||||
),
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help="Preview migration without making changes",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--drop-tables",
|
||||
action="store_true",
|
||||
help="Drop existing tables in target database before migration",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
try:
|
||||
# Load config
|
||||
config = load_config()
|
||||
setup_logging(config)
|
||||
except Exception as e:
|
||||
_LOGGER.warning("Failed to load config: %s. Using defaults.", e)
|
||||
config = {}
|
||||
|
||||
# Determine source URL (default to SQLite)
|
||||
if args.source:
|
||||
source_url = args.source
|
||||
else:
|
||||
source_url = get_database_url(config)
|
||||
if "sqlite" not in source_url:
|
||||
_LOGGER.error("Source database must be SQLite. Use --source to specify.")
|
||||
sys.exit(1)
|
||||
|
||||
# Determine target URL (must be PostgreSQL)
|
||||
if args.target:
|
||||
target_url = args.target
|
||||
elif args.target_config:
|
||||
# Load target config file
|
||||
_LOGGER.info("Loading target database config from: %s", args.target_config)
|
||||
try:
|
||||
target_config = load_config(args.target_config)
|
||||
target_url = get_database_url(target_config)
|
||||
_LOGGER.info("Successfully loaded target config")
|
||||
except (FileNotFoundError, ValueError, KeyError):
|
||||
_LOGGER.exception("Failed to load target config")
|
||||
sys.exit(1)
|
||||
else:
|
||||
import os
|
||||
target_url = os.environ.get("DATABASE_URL")
|
||||
if not target_url:
|
||||
_LOGGER.error("Target database URL not specified.")
|
||||
_LOGGER.error("Specify target database using one of:")
|
||||
_LOGGER.error(" - --target-config config/postgres.yaml")
|
||||
_LOGGER.error(" - DATABASE_URL environment variable")
|
||||
_LOGGER.error(" - --target postgresql+asyncpg://user:pass@host/db")
|
||||
sys.exit(1)
|
||||
|
||||
if "postgresql" not in target_url and "postgres" not in target_url:
|
||||
_LOGGER.error("Target database must be PostgreSQL.")
|
||||
sys.exit(1)
|
||||
|
||||
# Run migration
|
||||
await migrate_data(
|
||||
source_url=source_url,
|
||||
target_url=target_url,
|
||||
dry_run=args.dry_run,
|
||||
drop_tables=args.drop_tables,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user