merge_db_fixes_to_main #16
@@ -5,6 +5,10 @@ This script should be run before starting the application to ensure
|
||||
the database schema is up to date. It can be run standalone or called
|
||||
from run_api.py before starting uvicorn.
|
||||
|
||||
If the database is completely empty (no tables), it will create all tables
|
||||
from the current SQLAlchemy models and stamp the database with the latest
|
||||
migration version, avoiding the need to run historical migrations.
|
||||
|
||||
Usage:
|
||||
uv run python -m alpine_bits_python.run_migrations
|
||||
or
|
||||
@@ -12,24 +16,158 @@ Usage:
|
||||
run_migrations()
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.ext.asyncio import create_async_engine
|
||||
|
||||
from .config_loader import load_config
|
||||
from .db import Base, get_database_schema, get_database_url
|
||||
from .logging_config import get_logger
|
||||
|
||||
_LOGGER = get_logger(__name__)
|
||||
|
||||
|
||||
async def is_database_empty() -> bool:
|
||||
"""Check if the database has any tables in our schema.
|
||||
|
||||
Returns:
|
||||
True if the database has no tables in the target schema, False otherwise.
|
||||
"""
|
||||
try:
|
||||
app_config = load_config()
|
||||
db_url = get_database_url(app_config)
|
||||
schema = get_database_schema(app_config)
|
||||
|
||||
if not db_url:
|
||||
_LOGGER.error("Database URL not configured")
|
||||
return False
|
||||
|
||||
# Create async engine for checking
|
||||
engine = create_async_engine(db_url, echo=False)
|
||||
|
||||
async with engine.connect() as conn:
|
||||
# Set search path if schema is configured
|
||||
if schema:
|
||||
await conn.execute(text(f"SET search_path TO {schema}"))
|
||||
|
||||
# Check for any tables in the schema
|
||||
result = await conn.execute(
|
||||
text(
|
||||
"""
|
||||
SELECT COUNT(*)
|
||||
FROM information_schema.tables
|
||||
WHERE table_schema = :schema
|
||||
"""
|
||||
),
|
||||
{"schema": schema or "public"},
|
||||
)
|
||||
count = result.scalar()
|
||||
await engine.dispose()
|
||||
return count == 0
|
||||
|
||||
except Exception as e:
|
||||
_LOGGER.warning(f"Could not check if database is empty: {e}")
|
||||
return False
|
||||
|
||||
|
||||
async def create_all_tables() -> None:
|
||||
"""Create all tables from SQLAlchemy models in an empty database."""
|
||||
try:
|
||||
app_config = load_config()
|
||||
db_url = get_database_url(app_config)
|
||||
schema = get_database_schema(app_config)
|
||||
|
||||
if not db_url:
|
||||
_LOGGER.error("Database URL not configured")
|
||||
sys.exit(1)
|
||||
|
||||
_LOGGER.info("Creating all database tables from SQLAlchemy models...")
|
||||
|
||||
# Create async engine
|
||||
engine = create_async_engine(db_url, echo=False)
|
||||
|
||||
async with engine.begin() as conn:
|
||||
# Set search path if schema is configured
|
||||
if schema:
|
||||
await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema}"))
|
||||
await conn.execute(text(f"SET search_path TO {schema}"))
|
||||
|
||||
# Create all tables
|
||||
await conn.run_sync(Base.metadata.create_all)
|
||||
|
||||
await engine.dispose()
|
||||
_LOGGER.info("All tables created successfully")
|
||||
|
||||
except Exception as e:
|
||||
_LOGGER.error(f"Failed to create tables: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def stamp_database() -> None:
|
||||
"""Stamp the database with the latest migration version.
|
||||
|
||||
This tells Alembic that the database is at the 'head' revision without
|
||||
actually running the migration scripts.
|
||||
"""
|
||||
_LOGGER.info("Stamping database with latest migration version...")
|
||||
|
||||
project_root = Path(__file__).parent.parent.parent
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["alembic", "stamp", "head"],
|
||||
cwd=project_root,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True,
|
||||
)
|
||||
|
||||
_LOGGER.info("Database stamped successfully")
|
||||
_LOGGER.debug("Stamp output: %s", result.stdout)
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
_LOGGER.error("Failed to stamp database:")
|
||||
_LOGGER.error("Exit code: %d", e.returncode)
|
||||
_LOGGER.error("stdout: %s", e.stdout)
|
||||
_LOGGER.error("stderr: %s", e.stderr)
|
||||
sys.exit(1)
|
||||
except FileNotFoundError:
|
||||
_LOGGER.error(
|
||||
"Alembic not found. Please ensure it's installed: uv pip install alembic"
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def run_migrations() -> None:
|
||||
"""Run Alembic migrations to upgrade database to latest schema.
|
||||
|
||||
This function runs 'alembic upgrade head' to apply all pending migrations.
|
||||
It will exit the process if migrations fail.
|
||||
If the database is empty, creates all tables from SQLAlchemy models
|
||||
and stamps the database with the latest migration version.
|
||||
|
||||
Otherwise, runs 'alembic upgrade head' to apply all pending migrations.
|
||||
|
||||
Raises:
|
||||
SystemExit: If migrations fail
|
||||
"""
|
||||
_LOGGER.info("Checking database state...")
|
||||
|
||||
# Check if database is empty
|
||||
is_empty = asyncio.run(is_database_empty())
|
||||
|
||||
if is_empty:
|
||||
_LOGGER.info(
|
||||
"Database is empty - creating all tables from models and stamping version"
|
||||
)
|
||||
asyncio.run(create_all_tables())
|
||||
stamp_database()
|
||||
_LOGGER.info("Database initialization completed successfully")
|
||||
return
|
||||
|
||||
# Database has tables, run normal migrations
|
||||
_LOGGER.info("Running database migrations...")
|
||||
|
||||
# Get the project root directory (where alembic.ini is located)
|
||||
|
||||
Reference in New Issue
Block a user