Files
alpinebits_python/alembic/env.py
2025-11-18 14:38:21 +01:00

126 lines
4.0 KiB
Python

"""Alembic environment configuration for async SQLAlchemy."""
import asyncio
from logging.config import fileConfig
from alembic import context
from sqlalchemy import pool, text
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
# Import your models' Base to enable autogenerate
from alpine_bits_python.config_loader import load_config
from alpine_bits_python.db import Base, get_database_schema, get_database_url
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# Load application config to get database URL and schema
try:
app_config = load_config()
except (FileNotFoundError, KeyError, ValueError):
# Fallback if config can't be loaded (e.g., during initial setup)
app_config = {}
# Get database URL from application config
db_url = get_database_url(app_config)
if db_url:
config.set_main_option("sqlalchemy.url", db_url)
# Get schema name from application config
SCHEMA = get_database_schema(app_config)
# add your model's MetaData object here for 'autogenerate' support
target_metadata = Base.metadata
# Configure metadata to resolve unqualified table names in the schema
# This is needed so ForeignKey("customers.id") can find "alpinebits.customers"
if SCHEMA:
target_metadata.schema = SCHEMA
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
# Set search path for offline mode if schema is configured
if SCHEMA:
print(f"Setting search_path to {SCHEMA}, public")
context.execute(f"SET search_path TO {SCHEMA}, public")
context.run_migrations()
def do_run_migrations(connection: Connection) -> None:
"""Run migrations with the given connection."""
context.configure(
connection=connection,
target_metadata=target_metadata,
)
with context.begin_transaction():
# Create schema if it doesn't exist
if SCHEMA:
connection.execute(text(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA}"))
# Set search path to our schema
print(f"setting search path to schema {SCHEMA}, ")
connection.execute(text(f"SET search_path TO {SCHEMA}"))
context.run_migrations()
async def run_async_migrations() -> None:
"""In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
if connection.dialect.name == "postgresql":
# set search path on the connection, which ensures that
# PostgreSQL will emit all CREATE / ALTER / DROP statements
# in terms of this schema by default
await connection.execute(text(f"SET search_path TO {SCHEMA}"))
# in SQLAlchemy v2+ the search path change needs to be committed
await connection.commit()
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode - entry point."""
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()