13 Commits

Author SHA1 Message Date
Jonas Linter
c1123c4ce8 Deleted log because its big 2025-11-18 17:06:57 +01:00
Jonas Linter
51e4fe4617 Fixed all tests. Tests now use alembic migrations 2025-11-18 16:47:09 +01:00
Jonas Linter
a34fc6e28a Changed how services work and updated csv_import 2025-11-18 16:40:09 +01:00
Jonas Linter
2c61d13d7a Converted csv_import to put request 2025-11-18 16:23:58 +01:00
Jonas Linter
3f149fe984 Presumably production ready xD 2025-11-18 16:10:57 +01:00
Jonas Linter
b826277b54 Looking good 2025-11-18 14:49:44 +01:00
Jonas Linter
34bff6a12a Fixed missing await statement in alembic setup 2025-11-18 14:38:21 +01:00
Jonas Linter
ab09fb02eb Conversion import returns faster and processes in the background 2025-11-18 14:37:04 +01:00
Jonas Linter
b454b410b8 Finally a migration that works 2025-11-18 14:25:46 +01:00
Jonas Linter
997ab81bee Merge branch 'main' of https://gitea.linter-home.com/jonas/alpinebits_python 2025-11-18 14:03:49 +01:00
Jonas Linter
e7757c8c51 Getting closer 2025-11-18 13:32:29 +01:00
Jonas Linter
5a660507d2 Alembic experiments 2025-11-18 11:04:38 +01:00
Jonas Linter
10dcbae5ad Renamed table 2025-11-18 10:09:06 +01:00
26 changed files with 2185 additions and 1453994 deletions

View File

@@ -33,6 +33,10 @@ COPY --from=builder /app/.venv /app/.venv
# Copy application code
COPY src/ ./src/
# Copy Alembic files for database migrations
COPY alembic.ini ./
COPY alembic/ ./alembic/
# Create directories and set permissions
RUN mkdir -p /app/logs && \
chown -R appuser:appuser /app
@@ -53,9 +57,8 @@ EXPOSE 8000
HEALTHCHECK --interval=120s --timeout=10s --start-period=60s --retries=3 \
CMD python -c "import requests; requests.get('http://localhost:8000/api/health', timeout=5)"
# Run the application with uvicorn
WORKDIR /app/src
CMD uvicorn alpine_bits_python.api:app \
# Run the application with run_api.py (includes migrations)
CMD python -m alpine_bits_python.run_api \
--host 0.0.0.0 \
--port 8000 \
--workers 4 \

174
MIGRATION_REFACTORING.md Normal file
View File

@@ -0,0 +1,174 @@
# Database Migration Refactoring
## Summary
This refactoring changes the database handling from manual schema migrations in `migrations.py` to using Alembic for proper database migrations. The key improvements are:
1. **Alembic Integration**: All schema migrations now use Alembic's migration framework
2. **Separation of Concerns**: Migrations (schema changes) are separated from startup tasks (data backfills)
3. **Pre-startup Migrations**: Database migrations run BEFORE the application starts, avoiding issues with multiple workers
4. **Production Ready**: The Conversions/ConversionRoom tables can be safely recreated (data is recoverable from PMS XML imports)
## Changes Made
### 1. Alembic Setup
- **[alembic.ini](alembic.ini)**: Configuration file for Alembic
- **[alembic/env.py](alembic/env.py)**: Async-compatible environment setup that:
- Loads database URL from config.yaml or environment variables
- Supports PostgreSQL schemas
- Uses async SQLAlchemy engine
### 2. Initial Migrations
Two migrations were created:
#### Migration 1: `535b70e85b64_initial_schema.py`
Creates all base tables:
- `customers`
- `hashed_customers`
- `reservations`
- `acked_requests`
- `conversions`
- `conversion_rooms`
This migration is idempotent - it only creates missing tables.
#### Migration 2: `8edfc81558db_drop_and_recreate_conversions_tables.py`
Handles the conversion from old production conversions schema to new normalized schema:
- Detects if old conversions tables exist with incompatible schema
- Drops them if needed (data can be recreated from PMS XML imports)
- Allows the initial schema migration to recreate them with correct structure
### 3. Refactored Files
#### [src/alpine_bits_python/db_setup.py](src/alpine_bits_python/db_setup.py)
- **Before**: Ran manual migrations AND created tables using Base.metadata.create_all
- **After**: Only runs startup tasks (data backfills like customer hashing)
- **Note**: Schema migrations now handled by Alembic
#### [src/alpine_bits_python/run_migrations.py](src/alpine_bits_python/run_migrations.py) (NEW)
- Wrapper script to run `alembic upgrade head`
- Can be called standalone or from run_api.py
- Handles errors gracefully
#### [src/alpine_bits_python/api.py](src/alpine_bits_python/api.py)
- **Removed**: `run_all_migrations()` call from lifespan
- **Removed**: `Base.metadata.create_all()` call
- **Changed**: Now only calls `run_startup_tasks()` for data backfills
- **Note**: Assumes migrations have already been run before app start
#### [src/alpine_bits_python/run_api.py](src/alpine_bits_python/run_api.py)
- **Added**: Calls `run_migrations()` BEFORE starting uvicorn
- **Benefit**: Migrations complete before any worker starts
- **Benefit**: Works correctly with multiple workers
### 4. Old Files (Can be removed in future cleanup)
- **[src/alpine_bits_python/migrations.py](src/alpine_bits_python/migrations.py)**: Old manual migration functions
- These can be safely removed once you verify the Alembic setup works
- The functionality has been replaced by Alembic migrations
## Usage
### Development
Start the server (migrations run automatically):
```bash
uv run python -m alpine_bits_python.run_api
```
Or run migrations separately:
```bash
uv run alembic upgrade head
uv run python -m alpine_bits_python.run_api
```
### Production with Multiple Workers
The migrations automatically run before uvicorn starts, so you can safely use:
```bash
# Migrations run once, then server starts with multiple workers
uv run python -m alpine_bits_python.run_api
# Or with uvicorn directly (migrations won't run automatically):
uv run alembic upgrade head # Run this first
uvicorn alpine_bits_python.api:app --workers 4 --host 0.0.0.0 --port 8080
```
### Creating New Migrations
When you modify the database schema in `db.py`:
```bash
# Generate migration automatically
uv run alembic revision --autogenerate -m "description_of_change"
# Or create empty migration to fill in manually
uv run alembic revision -m "description_of_change"
# Review the generated migration in alembic/versions/
# Then apply it
uv run alembic upgrade head
```
### Checking Migration Status
```bash
# Show current revision
uv run alembic current
# Show migration history
uv run alembic history
# Show pending migrations
uv run alembic heads
```
## Benefits
1. **Multiple Worker Safe**: Migrations run once before any worker starts
2. **Proper Migration History**: All schema changes are tracked in version control
3. **Rollback Support**: Can downgrade to previous schema versions if needed
4. **Standard Tool**: Alembic is the industry-standard migration tool for SQLAlchemy
5. **Separation of Concerns**:
- Schema migrations (Alembic) are separate from startup tasks (db_setup.py)
- Migrations are separate from application code
## Migration from Old System
If you have an existing database with the old migration system:
1. The initial migration will detect existing tables and skip creating them
2. The conversions table migration will detect old schemas and recreate them
3. All data in other tables is preserved
4. Conversions data will be lost but can be recreated from PMS XML imports
## Important Notes
### Conversions Table Data Loss
The `conversions` and `conversion_rooms` tables will be dropped and recreated with the new schema. This is intentional because:
- The production version has a different schema
- The data can be recreated by re-importing PMS XML files
- This avoids complex data migration logic
If you need to preserve this data, modify the migration before running it.
### Future Migrations
In the future, when you need to change the database schema:
1. Modify the model classes in `db.py`
2. Generate an Alembic migration: `uv run alembic revision --autogenerate -m "description"`
3. Review the generated migration carefully
4. Test it on a dev database first
5. Apply it to production: `uv run alembic upgrade head`
## Configuration
The Alembic setup reads configuration from the same sources as the application:
- `config.yaml` (via `annotatedyaml` with `secrets.yaml`)
- Environment variables (`DATABASE_URL`, `DATABASE_SCHEMA`)
No additional configuration needed!

148
alembic.ini Normal file
View File

@@ -0,0 +1,148 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts.
# this is typically a path given in POSIX (e.g. forward slashes)
# format, relative to the token %(here)s which refers to the location of this
# ini file
script_location = %(here)s/alembic
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory. for multiple paths, the path separator
# is defined by "path_separator" below.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the tzdata library which can be installed by adding
# `alembic[tz]` to the pip requirements.
# string value is passed to ZoneInfo()
# leave blank for localtime
# timezone =
# max length of characters to apply to the "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to <script_location>/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "path_separator"
# below.
# version_locations = %(here)s/bar:%(here)s/bat:%(here)s/alembic/versions
# path_separator; This indicates what character is used to split lists of file
# paths, including version_locations and prepend_sys_path within configparser
# files such as alembic.ini.
# The default rendered in new alembic.ini files is "os", which uses os.pathsep
# to provide os-dependent path splitting.
#
# Note that in order to support legacy alembic.ini files, this default does NOT
# take place if path_separator is not present in alembic.ini. If this
# option is omitted entirely, fallback logic is as follows:
#
# 1. Parsing of the version_locations option falls back to using the legacy
# "version_path_separator" key, which if absent then falls back to the legacy
# behavior of splitting on spaces and/or commas.
# 2. Parsing of the prepend_sys_path option falls back to the legacy
# behavior of splitting on spaces, commas, or colons.
#
# Valid values for path_separator are:
#
# path_separator = :
# path_separator = ;
# path_separator = space
# path_separator = newline
#
# Use os.pathsep. Default configuration used for new projects.
path_separator = os
# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# database URL. This is consumed by the user-maintained env.py script only.
# other means of configuring database URLs may be customized within the env.py
# file. In this project, we get the URL from config.yaml or environment variables
# so this is just a placeholder.
# sqlalchemy.url = driver://user:pass@localhost/dbname
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# lint with attempts to fix using "ruff" - use the module runner, against the "ruff" module
# hooks = ruff
# ruff.type = module
# ruff.module = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Alternatively, use the exec runner to execute a binary found on your PATH
# hooks = ruff
# ruff.type = exec
# ruff.executable = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Logging configuration. This is also consumed by the user-maintained
# env.py script only.
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARNING
handlers = console
qualname =
[logger_sqlalchemy]
level = WARNING
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

1
alembic/README Normal file
View File

@@ -0,0 +1 @@
Generic single-database configuration.

123
alembic/README.md Normal file
View File

@@ -0,0 +1,123 @@
# Database Migrations
This directory contains Alembic database migrations for the Alpine Bits Python Server.
## Quick Reference
### Common Commands
```bash
# Check current migration status
uv run alembic current
# Show migration history
uv run alembic history --verbose
# Upgrade to latest migration
uv run alembic upgrade head
# Downgrade one version
uv run alembic downgrade -1
# Create a new migration (auto-generate from model changes)
uv run alembic revision --autogenerate -m "description"
# Create a new empty migration (manual)
uv run alembic revision -m "description"
```
## Migration Files
### Current Migrations
1. **535b70e85b64_initial_schema.py** - Creates all base tables
2. **8edfc81558db_drop_and_recreate_conversions_tables.py** - Handles conversions table schema change
## How Migrations Work
1. Alembic tracks which migrations have been applied using the `alembic_version` table
2. When you run `alembic upgrade head`, it applies all pending migrations in order
3. Each migration has an `upgrade()` and `downgrade()` function
4. Migrations are applied transactionally (all or nothing)
## Configuration
The Alembic environment ([env.py](env.py)) is configured to:
- Read database URL from `config.yaml` or environment variables
- Support PostgreSQL schemas
- Use async SQLAlchemy (compatible with FastAPI)
- Apply migrations in the correct schema
## Best Practices
1. **Always review auto-generated migrations** - Alembic's autogenerate is smart but not perfect
2. **Test migrations on dev first** - Never run untested migrations on production
3. **Keep migrations small** - One logical change per migration
4. **Never edit applied migrations** - Create a new migration to fix issues
5. **Commit migrations to git** - Migrations are part of your code
## Creating a New Migration
When you modify models in `src/alpine_bits_python/db.py`:
```bash
# 1. Generate the migration
uv run alembic revision --autogenerate -m "add_user_preferences_table"
# 2. Review the generated file in alembic/versions/
# Look for:
# - Incorrect type changes
# - Missing indexes
# - Data that needs to be migrated
# 3. Test it
uv run alembic upgrade head
# 4. If there are issues, downgrade and fix:
uv run alembic downgrade -1
# Edit the migration file
uv run alembic upgrade head
# 5. Commit the migration file to git
git add alembic/versions/2025_*.py
git commit -m "Add user preferences table migration"
```
## Troubleshooting
### "FAILED: Target database is not up to date"
This means pending migrations need to be applied:
```bash
uv run alembic upgrade head
```
### "Can't locate revision identified by 'xxxxx'"
The alembic_version table may be out of sync. Check what's in the database:
```bash
# Connect to your database and run:
SELECT * FROM alembic_version;
```
### Migration conflicts after git merge
If two branches created migrations at the same time:
```bash
# Create a merge migration
uv run alembic merge heads -m "merge branches"
```
### Need to reset migrations (DANGEROUS - ONLY FOR DEV)
```bash
# WARNING: This will delete all data!
uv run alembic downgrade base # Removes all tables
uv run alembic upgrade head # Recreates everything
```
## More Information
- [Alembic Documentation](https://alembic.sqlalchemy.org/)
- [Alembic Tutorial](https://alembic.sqlalchemy.org/en/latest/tutorial.html)
- See [../MIGRATION_REFACTORING.md](../MIGRATION_REFACTORING.md) for details on how this project uses Alembic

125
alembic/env.py Normal file
View File

@@ -0,0 +1,125 @@
"""Alembic environment configuration for async SQLAlchemy."""
import asyncio
from logging.config import fileConfig
from alembic import context
from sqlalchemy import pool, text
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
# Import your models' Base to enable autogenerate
from alpine_bits_python.config_loader import load_config
from alpine_bits_python.db import Base, get_database_schema, get_database_url
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# Load application config to get database URL and schema
try:
app_config = load_config()
except (FileNotFoundError, KeyError, ValueError):
# Fallback if config can't be loaded (e.g., during initial setup)
app_config = {}
# Get database URL from application config
db_url = get_database_url(app_config)
if db_url:
config.set_main_option("sqlalchemy.url", db_url)
# Get schema name from application config
SCHEMA = get_database_schema(app_config)
# add your model's MetaData object here for 'autogenerate' support
target_metadata = Base.metadata
# Configure metadata to resolve unqualified table names in the schema
# This is needed so ForeignKey("customers.id") can find "alpinebits.customers"
if SCHEMA:
target_metadata.schema = SCHEMA
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
# Set search path for offline mode if schema is configured
if SCHEMA:
print(f"Setting search_path to {SCHEMA}, public")
context.execute(f"SET search_path TO {SCHEMA}, public")
context.run_migrations()
def do_run_migrations(connection: Connection) -> None:
"""Run migrations with the given connection."""
context.configure(
connection=connection,
target_metadata=target_metadata,
)
with context.begin_transaction():
# Create schema if it doesn't exist
if SCHEMA:
connection.execute(text(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA}"))
# Set search path to our schema
print(f"setting search path to schema {SCHEMA}, ")
connection.execute(text(f"SET search_path TO {SCHEMA}"))
context.run_migrations()
async def run_async_migrations() -> None:
"""In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
if connection.dialect.name == "postgresql":
# set search path on the connection, which ensures that
# PostgreSQL will emit all CREATE / ALTER / DROP statements
# in terms of this schema by default
await connection.execute(text(f"SET search_path TO {SCHEMA}"))
# in SQLAlchemy v2+ the search path change needs to be committed
await connection.commit()
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode - entry point."""
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

28
alembic/script.py.mako Normal file
View File

@@ -0,0 +1,28 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, Sequence[str], None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
"""Upgrade schema."""
${upgrades if upgrades else "pass"}
def downgrade() -> None:
"""Downgrade schema."""
${downgrades if downgrades else "pass"}

View File

@@ -0,0 +1,274 @@
"""Initial migration
Revision ID: 630b0c367dcb
Revises:
Create Date: 2025-11-18 13:19:37.183397
"""
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "630b0c367dcb"
down_revision: str | Sequence[str] | None = None
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
"""Upgrade schema."""
# Drop existing tables to start with a clean slate
# Drop conversion_rooms first due to foreign key dependency
op.execute("DROP TABLE IF EXISTS conversion_rooms CASCADE")
op.execute("DROP TABLE IF EXISTS conversions CASCADE")
print("dropped existing conversion tables")
# ### commands auto generated by Alembic - please adjust! ###
# Create conversions table
op.create_table(
"conversions",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("reservation_id", sa.Integer(), nullable=True),
sa.Column("customer_id", sa.Integer(), nullable=True),
sa.Column("hashed_customer_id", sa.Integer(), nullable=True),
sa.Column("hotel_id", sa.String(), nullable=True),
sa.Column("pms_reservation_id", sa.String(), nullable=True),
sa.Column("reservation_number", sa.String(), nullable=True),
sa.Column("reservation_date", sa.Date(), nullable=True),
sa.Column("creation_time", sa.DateTime(timezone=True), nullable=True),
sa.Column("reservation_type", sa.String(), nullable=True),
sa.Column("booking_channel", sa.String(), nullable=True),
sa.Column("guest_first_name", sa.String(), nullable=True),
sa.Column("guest_last_name", sa.String(), nullable=True),
sa.Column("guest_email", sa.String(), nullable=True),
sa.Column("guest_country_code", sa.String(), nullable=True),
sa.Column("advertising_medium", sa.String(), nullable=True),
sa.Column("advertising_partner", sa.String(), nullable=True),
sa.Column("advertising_campagne", sa.String(), nullable=True),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(
["customer_id"],
["customers.id"],
),
sa.ForeignKeyConstraint(
["hashed_customer_id"],
["hashed_customers.id"],
),
sa.ForeignKeyConstraint(
["reservation_id"],
["reservations.id"],
),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_conversions_advertising_campagne"),
"conversions",
["advertising_campagne"],
unique=False,
)
op.create_index(
op.f("ix_conversions_advertising_medium"),
"conversions",
["advertising_medium"],
unique=False,
)
op.create_index(
op.f("ix_conversions_advertising_partner"),
"conversions",
["advertising_partner"],
unique=False,
)
op.create_index(
op.f("ix_conversions_customer_id"), "conversions", ["customer_id"], unique=False
)
op.create_index(
op.f("ix_conversions_guest_email"), "conversions", ["guest_email"], unique=False
)
op.create_index(
op.f("ix_conversions_guest_first_name"),
"conversions",
["guest_first_name"],
unique=False,
)
op.create_index(
op.f("ix_conversions_guest_last_name"),
"conversions",
["guest_last_name"],
unique=False,
)
op.create_index(
op.f("ix_conversions_hashed_customer_id"),
"conversions",
["hashed_customer_id"],
unique=False,
)
op.create_index(
op.f("ix_conversions_hotel_id"), "conversions", ["hotel_id"], unique=False
)
op.create_index(
op.f("ix_conversions_pms_reservation_id"),
"conversions",
["pms_reservation_id"],
unique=False,
)
op.create_index(
op.f("ix_conversions_reservation_id"),
"conversions",
["reservation_id"],
unique=False,
)
# Create conversion_rooms table
op.create_table(
"conversion_rooms",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("conversion_id", sa.Integer(), nullable=False),
sa.Column("pms_hotel_reservation_id", sa.String(), nullable=True),
sa.Column("arrival_date", sa.Date(), nullable=True),
sa.Column("departure_date", sa.Date(), nullable=True),
sa.Column("room_status", sa.String(), nullable=True),
sa.Column("room_type", sa.String(), nullable=True),
sa.Column("room_number", sa.String(), nullable=True),
sa.Column("num_adults", sa.Integer(), nullable=True),
sa.Column("rate_plan_code", sa.String(), nullable=True),
sa.Column("connected_room_type", sa.String(), nullable=True),
sa.Column("daily_sales", sa.JSON(), nullable=True),
sa.Column("total_revenue", sa.String(), nullable=True),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(
["conversion_id"],
["alpinebits.conversions.id"],
),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_conversion_rooms_arrival_date"),
"conversion_rooms",
["arrival_date"],
unique=False,
)
op.create_index(
op.f("ix_conversion_rooms_conversion_id"),
"conversion_rooms",
["conversion_id"],
unique=False,
)
op.create_index(
op.f("ix_conversion_rooms_departure_date"),
"conversion_rooms",
["departure_date"],
unique=False,
)
op.create_index(
op.f("ix_conversion_rooms_pms_hotel_reservation_id"),
"conversion_rooms",
["pms_hotel_reservation_id"],
unique=False,
)
op.create_index(
op.f("ix_conversion_rooms_room_number"),
"conversion_rooms",
["room_number"],
unique=False,
)
op.create_index(
op.f("ix_acked_requests_username"), "acked_requests", ["username"], unique=False
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"conversions",
sa.Column("revenue_fb", sa.VARCHAR(), autoincrement=False, nullable=True),
)
op.add_column(
"conversions",
sa.Column("arrival_date", sa.DATE(), autoincrement=False, nullable=True),
)
op.add_column(
"conversions",
sa.Column("room_number", sa.VARCHAR(), autoincrement=False, nullable=True),
)
op.add_column(
"conversions",
sa.Column("revenue_logis", sa.VARCHAR(), autoincrement=False, nullable=True),
)
op.add_column(
"conversions",
sa.Column("room_type", sa.VARCHAR(), autoincrement=False, nullable=True),
)
op.add_column(
"conversions",
sa.Column("num_adults", sa.INTEGER(), autoincrement=False, nullable=True),
)
op.add_column(
"conversions",
sa.Column("revenue_spa", sa.VARCHAR(), autoincrement=False, nullable=True),
)
op.add_column(
"conversions",
sa.Column("departure_date", sa.DATE(), autoincrement=False, nullable=True),
)
op.add_column(
"conversions",
sa.Column("revenue_board", sa.VARCHAR(), autoincrement=False, nullable=True),
)
op.add_column(
"conversions",
sa.Column("room_status", sa.VARCHAR(), autoincrement=False, nullable=True),
)
op.add_column(
"conversions",
sa.Column("sale_date", sa.DATE(), autoincrement=False, nullable=True),
)
op.add_column(
"conversions",
sa.Column("revenue_other", sa.VARCHAR(), autoincrement=False, nullable=True),
)
op.add_column(
"conversions",
sa.Column("revenue_total", sa.VARCHAR(), autoincrement=False, nullable=True),
)
op.add_column(
"conversions",
sa.Column("rate_plan_code", sa.VARCHAR(), autoincrement=False, nullable=True),
)
op.drop_index(op.f("ix_conversions_guest_last_name"), table_name="conversions")
op.drop_index(op.f("ix_conversions_guest_first_name"), table_name="conversions")
op.drop_index(op.f("ix_conversions_guest_email"), table_name="conversions")
op.create_index(
op.f("ix_conversions_sale_date"), "conversions", ["sale_date"], unique=False
)
op.drop_column("conversions", "updated_at")
op.drop_column("conversions", "guest_country_code")
op.drop_column("conversions", "guest_email")
op.drop_column("conversions", "guest_last_name")
op.drop_column("conversions", "guest_first_name")
op.drop_index(op.f("ix_acked_requests_username"), table_name="acked_requests")
op.drop_index(
op.f("ix_conversion_rooms_room_number"), table_name="conversion_rooms"
)
op.drop_index(
op.f("ix_conversion_rooms_pms_hotel_reservation_id"),
table_name="conversion_rooms",
)
op.drop_index(
op.f("ix_conversion_rooms_departure_date"), table_name="conversion_rooms"
)
op.drop_index(
op.f("ix_conversion_rooms_conversion_id"), table_name="conversion_rooms"
)
op.drop_index(
op.f("ix_conversion_rooms_arrival_date"), table_name="conversion_rooms"
)
op.drop_table("conversion_rooms")
# ### end Alembic commands ###

View File

@@ -0,0 +1,66 @@
"""Added birth_date, storing revenue as number
Revision ID: b33fd7a2da6c
Revises: 630b0c367dcb
Create Date: 2025-11-18 14:41:17.567595
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'b33fd7a2da6c'
down_revision: Union[str, Sequence[str], None] = '630b0c367dcb'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
# Convert VARCHAR to Double with explicit CAST for PostgreSQL compatibility
# PostgreSQL requires USING clause for type conversion
connection = op.get_bind()
if connection.dialect.name == 'postgresql':
op.execute(
"ALTER TABLE conversion_rooms "
"ALTER COLUMN total_revenue TYPE DOUBLE PRECISION "
"USING total_revenue::DOUBLE PRECISION"
)
else:
# For SQLite and other databases, use standard alter_column
op.alter_column('conversion_rooms', 'total_revenue',
existing_type=sa.VARCHAR(),
type_=sa.Double(),
existing_nullable=True)
op.add_column('conversions', sa.Column('guest_birth_date', sa.Date(), nullable=True))
op.add_column('conversions', sa.Column('guest_id', sa.String(), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('conversions', 'guest_id')
op.drop_column('conversions', 'guest_birth_date')
# Convert Double back to VARCHAR with explicit CAST for PostgreSQL compatibility
connection = op.get_bind()
if connection.dialect.name == 'postgresql':
op.execute(
"ALTER TABLE conversion_rooms "
"ALTER COLUMN total_revenue TYPE VARCHAR "
"USING total_revenue::VARCHAR"
)
else:
# For SQLite and other databases, use standard alter_column
op.alter_column('conversion_rooms', 'total_revenue',
existing_type=sa.Double(),
type_=sa.VARCHAR(),
existing_nullable=True)
# ### end Alembic commands ###

File diff suppressed because it is too large Load Diff

View File

@@ -8,8 +8,8 @@ database:
# Use annotatedyaml for secrets and environment-specific overrides
logger:
level: "WARNING" # Set to DEBUG for more verbose output
file: "config/alpinebits.log" # Log file path, or null for console only
level: "INFO" # Set to DEBUG for more verbose output
file: "config/alpinebits.log" # Log file path, or null for console only
server:
codecontext: "ADVERTISING"
@@ -23,22 +23,19 @@ alpine_bits_auth:
username: "bemelman"
password: !secret BEMELMANS_PASSWORD
meta_account: "238334370765317"
google_account: "7581209925" # Optional: Meta advertising account ID
google_account: "7581209925" # Optional: Meta advertising account ID
- hotel_id: "135"
hotel_name: "Testhotel"
username: "sebastian"
password: !secret BOB_PASSWORD
- hotel_id: "39052_001"
hotel_name: "Jagthof Kaltern"
username: "jagthof"
password: !secret JAGTHOF_PASSWORD
meta_account: "948363300784757"
google_account: "1951919786" # Optional: Meta advertising account ID
google_account: "1951919786" # Optional: Meta advertising account ID
- hotel_id: "39040_001"
hotel_name: "Residence Erika"
@@ -46,11 +43,9 @@ alpine_bits_auth:
password: !secret ERIKA_PASSWORD
google_account: "6604634947"
api_tokens:
- tLTI8wXF1OVEvUX7kdZRhSW3Qr5feBCz0mHo-kbnEp0
# Email configuration (SMTP service config - kept for when port is unblocked)
email:
# SMTP server configuration
@@ -69,8 +64,8 @@ email:
# Pushover configuration (push notification service config)
pushover:
# Pushover API credentials (get from https://pushover.net)
user_key: !secret PUSHOVER_USER_KEY # Your user/group key
api_token: !secret PUSHOVER_API_TOKEN # Your application API token
user_key: !secret PUSHOVER_USER_KEY # Your user/group key
api_token: !secret PUSHOVER_API_TOKEN # Your application API token
# Unified notification system - recipient-based routing
notifications:
@@ -82,7 +77,7 @@ notifications:
#- type: "email"
# address: "jonas@vaius.ai"
- type: "pushover"
priority: 0 # Pushover priority: -2=lowest, -1=low, 0=normal, 1=high, 2=emergency
priority: 0 # Pushover priority: -2=lowest, -1=low, 0=normal, 1=high, 2=emergency
# Daily report configuration (applies to all recipients)
daily_report:
@@ -104,5 +99,3 @@ notifications:
log_levels:
- "ERROR"
- "CRITICAL"

View File

@@ -10,6 +10,7 @@ readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"aiosqlite>=0.21.0",
"alembic>=1.17.2",
"annotatedyaml>=1.0.0",
"asyncpg>=0.30.0",
"dotenv>=0.9.9",

47
reset_database.sh Normal file
View File

@@ -0,0 +1,47 @@
#!/bin/bash
# Reset database and initialize Alembic from scratch
echo "=== Database Reset Script ==="
echo "This will drop all tables and reinitialize with Alembic"
echo ""
read -p "Are you sure? (type 'yes' to continue): " confirm
if [ "$confirm" != "yes" ]; then
echo "Aborted."
exit 1
fi
echo ""
echo "Step 1: Dropping all tables in the database..."
echo "Connect to your database and run:"
echo ""
echo " -- For PostgreSQL:"
echo " DROP SCHEMA public CASCADE;"
echo " CREATE SCHEMA public;"
echo " GRANT ALL ON SCHEMA public TO <your_user>;"
echo " GRANT ALL ON SCHEMA public TO public;"
echo ""
echo " -- Or if using a custom schema (e.g., alpinebits):"
echo " DROP SCHEMA alpinebits CASCADE;"
echo " CREATE SCHEMA alpinebits;"
echo ""
echo "Press Enter after you've run the SQL commands..."
read
echo ""
echo "Step 2: Running Alembic migrations..."
uv run alembic upgrade head
if [ $? -eq 0 ]; then
echo ""
echo "=== Success! ==="
echo "Database has been reset and migrations applied."
echo ""
echo "Current migration status:"
uv run alembic current
else
echo ""
echo "=== Error ==="
echo "Migration failed. Check the error messages above."
exit 1
fi

View File

@@ -16,7 +16,7 @@ from typing import Any
import httpx
from fast_langdetect import detect
from fastapi import APIRouter, Depends, FastAPI, HTTPException, Request
from fastapi import APIRouter, BackgroundTasks, Depends, FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, Response
from fastapi.security import (
@@ -44,13 +44,13 @@ from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT, HttpSt
from .conversion_service import ConversionService
from .csv_import import CSVImporter
from .customer_service import CustomerService
from .db import Base, ResilientAsyncSession, SessionMaker, create_database_engine
from .db import Customer as DBCustomer
from .db import Reservation as DBReservation
from .db import ResilientAsyncSession, SessionMaker, create_database_engine
from .db_setup import run_startup_tasks
from .email_monitoring import ReservationStatsCollector
from .email_service import create_email_service
from .logging_config import get_logger, setup_logging
from .migrations import run_all_migrations
from .pushover_service import create_pushover_service
from .rate_limit import (
BURST_RATE_LIMIT,
@@ -331,31 +331,15 @@ async def lifespan(app: FastAPI):
elif hotel_id and not push_endpoint:
_LOGGER.info("Hotel %s has no push_endpoint configured", hotel_id)
# Create tables first (all workers)
# This ensures tables exist before migrations try to alter them
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
_LOGGER.info("Database tables checked/created at startup.")
# Run migrations after tables exist (only primary worker for race conditions)
# Run startup tasks (only in primary worker to avoid race conditions)
# NOTE: Database migrations should already have been run before the app started
# via run_migrations.py or `uv run alembic upgrade head`
if is_primary:
await run_all_migrations(engine, config)
_LOGGER.info("Running startup tasks (primary worker)...")
await run_startup_tasks(AsyncSessionLocal, config, engine)
_LOGGER.info("Startup tasks completed")
else:
_LOGGER.info("Skipping migrations (non-primary worker)")
# Hash any existing customers (only in primary worker to avoid race conditions)
if is_primary:
async with AsyncSessionLocal() as session:
customer_service = CustomerService(session)
hashed_count = await customer_service.hash_existing_customers()
if hashed_count > 0:
_LOGGER.info(
"Backfilled hashed data for %d existing customers", hashed_count
)
else:
_LOGGER.info("All existing customers already have hashed data")
else:
_LOGGER.info("Skipping customer hashing (non-primary worker)")
_LOGGER.info("Skipping startup tasks (non-primary worker)")
# Initialize and hook up stats collector for daily reports
# Note: report_scheduler will only exist on the primary worker
@@ -1155,70 +1139,164 @@ async def handle_wix_form_test(
raise HTTPException(status_code=500, detail="Error processing test data")
@api_router.post("/admin/import-csv")
async def _process_csv_import_background(
csv_content: str,
filename: str,
hotel_code: str | None,
session_maker: SessionMaker,
config: dict[str, Any],
log_filename: Path,
):
"""Background task to process CSV import.
This runs in a separate asyncio task after the HTTP response is sent.
Handles both file saving and database processing.
"""
try:
# First, save the CSV file (in background)
await asyncio.to_thread(log_filename.write_text, csv_content, encoding="utf-8")
_LOGGER.debug("CSV file saved to %s", log_filename)
# Now process the CSV import
_LOGGER.info("Starting database processing of %s", filename)
# Create a new session for this background task
async with session_maker() as db_session:
importer = CSVImporter(db_session, config)
stats = await importer.import_csv_file(str(log_filename), hotel_code, dryrun=False)
_LOGGER.info(
"CSV import complete for %s: %s", filename, stats
)
except Exception:
_LOGGER.exception(
"Error processing CSV import in background for %s", filename
)
@api_router.put("/admin/import-csv/{filename:path}")
@limiter.limit(BURST_RATE_LIMIT)
async def import_csv_endpoint(
request: Request,
csv_file_path: str,
background_tasks: BackgroundTasks,
filename: str,
hotel_code: str | None = None,
credentials: tuple = Depends(validate_basic_auth),
db_session=Depends(get_async_session),
session_maker: SessionMaker = Depends(get_session_maker),
):
"""Import reservations from a CSV file (landing_page_form.csv format).
"""Import reservations from CSV data sent via PUT request.
This endpoint allows importing historical form data into the system.
It creates customers and reservations, avoiding duplicates based on:
- Name, email, reservation dates
- fbclid/gclid tracking IDs
Requires basic authentication.
Returns immediately with 202 Accepted while processing continues in background.
Requires basic authentication and saves CSV files to log directory.
Supports gzip compression via Content-Encoding header.
Args:
csv_file_path: Path to CSV file (relative to app root)
filename: Name for the CSV file (used for logging)
hotel_code: Optional hotel code to override CSV values
credentials: Basic auth credentials
Returns:
Import statistics including created/skipped counts and any errors
Example: PUT /api/admin/import-csv/reservations.csv
"""
try:
# Validate file path to prevent path traversal
if ".." in csv_file_path or csv_file_path.startswith("/"):
raise HTTPException(status_code=400, detail="Invalid file path")
# Validate filename to prevent path traversal
if ".." in filename or filename.startswith("/"):
raise HTTPException(status_code=400, detail="ERROR: Invalid filename")
# Check if file exists
csv_path = Path(csv_file_path)
if not csv_path.exists():
# Try relative to app root
csv_path = Path() / csv_file_path
if not csv_path.exists():
# Get the raw body content
body = await request.body()
if not body:
raise HTTPException(
status_code=400, detail="ERROR: No CSV content provided"
)
# Check if content is gzip compressed
content_encoding = request.headers.get("content-encoding", "").lower()
is_gzipped = content_encoding == "gzip"
# Decompress if gzipped
if is_gzipped:
try:
body = gzip.decompress(body)
except Exception as e:
raise HTTPException(
status_code=404, detail=f"CSV file not found: {csv_file_path}"
)
status_code=400,
detail=f"ERROR: Failed to decompress gzip content: {e}",
) from e
# Try to decode as UTF-8
try:
csv_content = body.decode("utf-8")
except UnicodeDecodeError:
# If UTF-8 fails, try with latin-1 as fallback
csv_content = body.decode("latin-1")
# Basic validation that it looks like CSV
if not csv_content.strip():
raise HTTPException(
status_code=400, detail="ERROR: CSV content is empty"
)
# Create logs directory for CSV imports (blocking, but fast)
logs_dir = Path("logs/csv_imports")
logs_dir.mkdir(parents=True, mode=0o755, exist_ok=True)
# Generate filename with timestamp and authenticated user
username, _ = credentials
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
base_filename = Path(filename).stem
extension = Path(filename).suffix or ".csv"
log_filename = logs_dir / f"{base_filename}_{username}_{timestamp}{extension}"
_LOGGER.info(
"Starting CSV import from %s (user: %s)", csv_file_path, credentials[0]
"CSV file queued for processing: %s by user %s (original: %s)",
log_filename,
username,
filename,
)
# Create importer and import
importer = CSVImporter(db_session, request.app.state.config)
stats = await importer.import_csv_file(str(csv_path), hotel_code, dryrun=False)
# Schedule background processing using FastAPI's BackgroundTasks
# This handles both file saving AND database processing
# This ensures the response is sent immediately
background_tasks.add_task(
_process_csv_import_background,
csv_content,
filename,
hotel_code,
session_maker,
request.app.state.config,
log_filename,
)
_LOGGER.info("CSV import completed: %s", stats)
return {
"status": "success",
"message": "CSV import completed",
"stats": stats,
"timestamp": datetime.now().isoformat(),
response_headers = {
"Content-Type": "application/json; charset=utf-8",
}
except FileNotFoundError as e:
_LOGGER.error("CSV file not found: %s", e)
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
_LOGGER.exception("Error during CSV import")
raise HTTPException(status_code=500, detail=f"Error processing CSV: {str(e)}")
# Return immediate acknowledgment
return Response(
content=json.dumps({
"status": "accepted",
"message": "CSV file received and queued for processing",
"filename": filename,
"timestamp": datetime.now().isoformat(),
}),
headers=response_headers,
status_code=202,
)
except HTTPException:
raise
except Exception:
_LOGGER.exception("Error in import_csv_endpoint")
raise HTTPException(status_code=500, detail="Error processing CSV upload")
@api_router.post("/webhook/generic")
@@ -1331,10 +1409,56 @@ async def handle_generic_webhook(
) from e
async def _process_conversion_xml_background(
xml_content: str,
filename: str,
session_maker: SessionMaker,
log_filename: Path,
):
"""Background task to process conversion XML.
This runs in a separate asyncio task after the HTTP response is sent.
Handles both file prettification and database processing.
"""
try:
# First, prettify and save the XML file (in background)
try:
dom = xml.dom.minidom.parseString(xml_content)
pretty_xml = dom.toprettyxml(indent=" ")
# Remove extra blank lines that toprettyxml adds
pretty_xml = "\n".join(
[line for line in pretty_xml.split("\n") if line.strip()]
)
await asyncio.to_thread(
log_filename.write_text, pretty_xml, encoding="utf-8"
)
_LOGGER.debug("XML file prettified and saved to %s", log_filename)
except Exception as e:
# If formatting fails, save the original content
_LOGGER.warning("Failed to format XML: %s. Saving unformatted.", str(e))
await asyncio.to_thread(
log_filename.write_text, xml_content, encoding="utf-8"
)
# Now process the conversion XML
_LOGGER.info("Starting database processing of %s", filename)
conversion_service = ConversionService(session_maker)
processing_stats = await conversion_service.process_conversion_xml(xml_content)
_LOGGER.info(
"Conversion processing complete for %s: %s", filename, processing_stats
)
except Exception:
_LOGGER.exception(
"Error processing conversion XML in background for %s", filename
)
@api_router.put("/hoteldata/conversions_import/{filename:path}")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def handle_xml_upload(
request: Request,
background_tasks: BackgroundTasks,
filename: str,
credentials_tupel: tuple = Depends(validate_basic_auth),
db_session=Depends(get_async_session),
@@ -1348,6 +1472,8 @@ async def handle_xml_upload(
- Links conversions to customers and hashed_customers
- Stores daily sales revenue data
Returns immediately with 202 Accepted while processing continues in background.
Requires basic authentication and saves XML files to log directory.
Supports gzip compression via Content-Encoding header.
@@ -1393,47 +1519,33 @@ async def handle_xml_upload(
status_code=400, detail="ERROR: Content does not appear to be XML"
)
# Create logs directory for XML conversions
# Create logs directory for XML conversions (blocking, but fast)
logs_dir = Path("logs/conversions_import")
if not logs_dir.exists():
logs_dir.mkdir(parents=True, mode=0o755, exist_ok=True)
_LOGGER.info("Created directory: %s", logs_dir)
logs_dir.mkdir(parents=True, mode=0o755, exist_ok=True)
# Generate filename with timestamp and authenticated user
username, _ = credentials_tupel
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Use the filename from the path, but add timestamp and username for uniqueness
base_filename = Path(filename).stem
extension = Path(filename).suffix or ".xml"
log_filename = logs_dir / f"{base_filename}_{username}_{timestamp}{extension}"
# Format and save XML content to file
try:
dom = xml.dom.minidom.parseString(xml_content)
pretty_xml = dom.toprettyxml(indent=" ")
# Remove extra blank lines that toprettyxml adds
pretty_xml = "\n".join([line for line in pretty_xml.split("\n") if line.strip()])
log_filename.write_text(pretty_xml, encoding="utf-8")
except Exception as e:
# If formatting fails, save the original content
_LOGGER.warning("Failed to format XML: %s. Saving unformatted.", str(e))
log_filename.write_text(xml_content, encoding="utf-8")
_LOGGER.info(
"XML file saved to %s by user %s (original: %s)",
"XML file queued for processing: %s by user %s (original: %s)",
log_filename,
username,
filename,
)
# Process the conversion XML and save to database
# Use SessionMaker for concurrent processing of large XML files
# This allows multiple reservations to be processed in parallel with independent sessions
conversion_service = ConversionService(session_maker)
processing_stats = await conversion_service.process_conversion_xml(xml_content)
_LOGGER.info(
"Conversion processing complete for %s: %s", filename, processing_stats
# Schedule background processing using FastAPI's BackgroundTasks
# This handles both file prettification/saving AND database processing
# This ensures the response is sent immediately
background_tasks.add_task(
_process_conversion_xml_background,
xml_content,
filename,
session_maker,
log_filename,
)
response_headers = {
@@ -1441,25 +1553,17 @@ async def handle_xml_upload(
"X-AlpineBits-Server-Accept-Encoding": "gzip",
}
# Return processing stats in response
# Return immediate acknowledgment
response_content = f"""<?xml version="1.0" encoding="UTF-8"?>
<response>
<status>success</status>
<message>Conversion data processed successfully</message>
<stats>
<totalReservations>{processing_stats["total_reservations"]}</totalReservations>
<deletedReservations>{processing_stats["deleted_reservations"]}</deletedReservations>
<totalDailySales>{processing_stats["total_daily_sales"]}</totalDailySales>
<matchedToReservation>{processing_stats["matched_to_reservation"]}</matchedToReservation>
<matchedToCustomer>{processing_stats["matched_to_customer"]}</matchedToCustomer>
<matchedToHashedCustomer>{processing_stats["matched_to_hashed_customer"]}</matchedToHashedCustomer>
<unmatched>{processing_stats["unmatched"]}</unmatched>
<errors>{processing_stats["errors"]}</errors>
</stats>
<status>accepted</status>
<message>XML file received and queued for processing</message>
<filename>{filename}</filename>
<timestamp>{datetime.now().isoformat()}</timestamp>
</response>"""
return Response(
content=response_content, headers=response_headers, status_code=200
content=response_content, headers=response_headers, status_code=202
)
except HTTPException:

View File

@@ -1,7 +1,6 @@
"""Service for handling conversion data from hotel PMS XML files."""
import asyncio
import json
import xml.etree.ElementTree as ET
from datetime import datetime
from decimal import Decimal
@@ -11,7 +10,14 @@ from sqlalchemy import or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from .db import Conversion, RoomReservation, Customer, HashedCustomer, Reservation, SessionMaker
from .db import (
Conversion,
ConversionRoom,
Customer,
HashedCustomer,
Reservation,
SessionMaker,
)
from .logging_config import get_logger
_LOGGER = get_logger(__name__)
@@ -45,17 +51,23 @@ class ConversionService:
# Cache for reservation and customer data within a single XML processing run
# Maps hotel_code -> list of (reservation, customer) tuples
# This significantly speeds up matching when processing large XML files
self._reservation_cache: dict[str | None, list[tuple[Reservation, Customer | None]]] = {}
self._reservation_cache: dict[
str | None, list[tuple[Reservation, Customer | None]]
] = {}
self._cache_initialized = False
if isinstance(session, SessionMaker):
self.session_maker = session
self.supports_concurrent = True
_LOGGER.info("ConversionService initialized in concurrent mode with SessionMaker")
_LOGGER.info(
"ConversionService initialized in concurrent mode with SessionMaker"
)
elif isinstance(session, AsyncSession):
self.session = session
self.supports_concurrent = False
_LOGGER.info("ConversionService initialized in sequential mode with single session")
_LOGGER.info(
"ConversionService initialized in sequential mode with single session"
)
elif session is not None:
raise TypeError(
f"session must be AsyncSession or SessionMaker, got {type(session)}"
@@ -202,9 +214,7 @@ class ConversionService:
async with asyncio.TaskGroup() as tg:
for reservation in reservations:
tg.create_task(
self._process_reservation_safe(
reservation, semaphore, stats
)
self._process_reservation_safe(reservation, semaphore, stats)
)
async def _process_reservations_concurrent(
@@ -227,9 +237,7 @@ class ConversionService:
async with asyncio.TaskGroup() as tg:
for reservation in reservations:
tg.create_task(
self._process_reservation_safe(
reservation, semaphore, stats
)
self._process_reservation_safe(reservation, semaphore, stats)
)
async def _process_reservation_safe(
@@ -247,6 +255,7 @@ class ConversionService:
reservation_elem: XML element for the reservation
semaphore: Semaphore to limit concurrent operations
stats: Shared stats dictionary (thread-safe due to GIL)
"""
pms_reservation_id = reservation_elem.get("id")
@@ -295,18 +304,19 @@ class ConversionService:
if self.session_maker:
await session.close()
async def _handle_deleted_reservation(self, pms_reservation_id: str, session: AsyncSession):
async def _handle_deleted_reservation(
self, pms_reservation_id: str, session: AsyncSession
):
"""Handle deleted reservation by marking conversions as deleted or removing them.
Args:
pms_reservation_id: PMS reservation ID to delete
session: AsyncSession to use for the operation
"""
# For now, we'll just log it. You could add a 'deleted' flag to the Conversion table
# or actually delete the conversion records
_LOGGER.info(
"Processing deleted reservation: PMS ID %s", pms_reservation_id
)
_LOGGER.info("Processing deleted reservation: PMS ID %s", pms_reservation_id)
# Option 1: Delete conversion records
result = await session.execute(
@@ -337,6 +347,7 @@ class ConversionService:
In concurrent mode, each task passes its own session.
Returns statistics about what was matched.
"""
if session is None:
session = self.session
@@ -363,12 +374,22 @@ class ConversionService:
guest_last_name = None
guest_email = None
guest_country_code = None
guest_birth_date_str = None
guest_id = None
if guest_elem is not None:
guest_first_name = guest_elem.get("firstName")
guest_last_name = guest_elem.get("lastName")
guest_email = guest_elem.get("email")
guest_country_code = guest_elem.get("countryCode")
guest_email = guest_elem.get("email", None)
guest_country_code = guest_elem.get("countryCode", None)
guest_birth_date_str = guest_elem.get("dateOfBirth", None)
guest_id = guest_elem.get("id")
guest_birth_date = (
datetime.strptime(guest_birth_date_str, "%Y-%m-%d").date()
if guest_birth_date_str
else None
)
# Advertising/tracking data
advertising_medium = reservation_elem.get("advertisingMedium")
@@ -394,9 +415,7 @@ class ConversionService:
creation_time_str.replace("Z", "+00:00")
)
except ValueError:
_LOGGER.warning(
"Invalid creation time format: %s", creation_time_str
)
_LOGGER.warning("Invalid creation time format: %s", creation_time_str)
# Find matching reservation, customer, and hashed_customer using advertising data and guest details
matched_reservation = None
@@ -454,6 +473,7 @@ class ConversionService:
existing_conversion.guest_last_name = guest_last_name
existing_conversion.guest_email = guest_email
existing_conversion.guest_country_code = guest_country_code
existing_conversion.guest_birth_date = guest_birth_date
existing_conversion.advertising_medium = advertising_medium
existing_conversion.advertising_partner = advertising_partner
existing_conversion.advertising_campagne = advertising_campagne
@@ -486,6 +506,8 @@ class ConversionService:
guest_last_name=guest_last_name,
guest_email=guest_email,
guest_country_code=guest_country_code,
guest_birth_date=guest_birth_date,
guest_id=guest_id,
# Advertising data
advertising_medium=advertising_medium,
advertising_partner=advertising_partner,
@@ -515,18 +537,15 @@ class ConversionService:
# Batch-load existing room reservations to avoid N+1 queries
room_numbers = [
rm.get("roomNumber")
for rm in room_reservations.findall("roomReservation")
rm.get("roomNumber") for rm in room_reservations.findall("roomReservation")
]
pms_hotel_reservation_ids = [
f"{pms_reservation_id}_{room_num}" for room_num in room_numbers
]
existing_rooms_result = await session.execute(
select(RoomReservation).where(
RoomReservation.pms_hotel_reservation_id.in_(
pms_hotel_reservation_ids
)
select(ConversionRoom).where(
ConversionRoom.pms_hotel_reservation_id.in_(pms_hotel_reservation_ids)
)
)
existing_rooms = {
@@ -556,9 +575,7 @@ class ConversionService:
departure_date = None
if departure_str:
try:
departure_date = datetime.strptime(
departure_str, "%Y-%m-%d"
).date()
departure_date = datetime.strptime(departure_str, "%Y-%m-%d").date()
except ValueError:
_LOGGER.warning("Invalid departure date format: %s", departure_str)
@@ -576,7 +593,7 @@ class ConversionService:
# Process daily sales and extract total revenue
daily_sales_elem = room_reservation.find("dailySales")
daily_sales_list = []
total_revenue = Decimal("0")
total_revenue = Decimal(0)
if daily_sales_elem is not None:
for daily_sale in daily_sales_elem.findall("dailySale"):
@@ -618,6 +635,16 @@ class ConversionService:
# Check if room reservation already exists using batch-loaded data
existing_room_reservation = existing_rooms.get(pms_hotel_reservation_id)
if total_revenue > 0 and (
guest_first_name is None
and guest_last_name is None
and guest_email is None
):
_LOGGER.info(
"Guest info missing but total revenue > 0 for PMS ID %s",
pms_reservation_id,
)
if existing_room_reservation:
# Update existing room reservation with all fields
existing_room_reservation.arrival_date = arrival_date
@@ -631,7 +658,7 @@ class ConversionService:
daily_sales_list if daily_sales_list else None
)
existing_room_reservation.total_revenue = (
str(total_revenue) if total_revenue > 0 else None
total_revenue if total_revenue > 0 else None
)
existing_room_reservation.updated_at = datetime.now()
_LOGGER.debug(
@@ -642,7 +669,7 @@ class ConversionService:
)
else:
# Create new room reservation
room_reservation_record = RoomReservation(
room_reservation_record = ConversionRoom(
conversion_id=conversion.id,
pms_hotel_reservation_id=pms_hotel_reservation_id,
arrival_date=arrival_date,
@@ -654,7 +681,7 @@ class ConversionService:
rate_plan_code=rate_plan_code,
connected_room_type=connected_room_type,
daily_sales=daily_sales_list if daily_sales_list else None,
total_revenue=str(total_revenue) if total_revenue > 0 else None,
total_revenue=total_revenue if total_revenue > 0 else None,
created_at=datetime.now(),
updated_at=datetime.now(),
)
@@ -734,7 +761,9 @@ class ConversionService:
)
# Strategy 2: If no advertising match, try email/name-based matching
if not result["reservation"] and (guest_email or guest_first_name or guest_last_name):
if not result["reservation"] and (
guest_email or guest_first_name or guest_last_name
):
matched_reservation = await self._match_by_guest_details(
hotel_id, guest_first_name, guest_last_name, guest_email, session
)
@@ -798,6 +827,7 @@ class ConversionService:
Returns:
Matched Reservation or None
"""
if session is None:
session = self.session
@@ -882,6 +912,7 @@ class ConversionService:
Returns:
Matched Reservation or None
"""
if session is None:
session = self.session
@@ -892,9 +923,7 @@ class ConversionService:
# Get reservations from cache for this hotel
if hotel_id and hotel_id in self._reservation_cache:
all_reservations = [
res for res, _ in self._reservation_cache[hotel_id]
]
all_reservations = [res for res, _ in self._reservation_cache[hotel_id]]
elif not hotel_id:
# If no hotel_id specified, use all cached reservations
for reservations_list in self._reservation_cache.values():
@@ -947,6 +976,7 @@ class ConversionService:
Returns:
Matched Reservation or None
"""
# Filter by guest details
candidates = []
@@ -1019,6 +1049,7 @@ class ConversionService:
Returns:
Single best-match Reservation, or None if no good match found
"""
candidates = reservations

View File

@@ -184,7 +184,7 @@ class CSVImporter:
return None
async def import_csv_file(
self, csv_file_path: str, hotel_code: Optional[str] = None, dryrun: bool = False
self, csv_file_path: str, hotel_code: Optional[str] = None, dryrun: bool = False, pre_acknowledge: bool = False, client_id: Optional[str] = None, username: Optional[str] = None
) -> dict[str, Any]:
"""Import reservations from a CSV file.
@@ -192,6 +192,9 @@ class CSVImporter:
csv_file_path: Path to CSV file
hotel_code: Optional hotel code to override CSV values
dryrun: If True, parse and print first 10 rows as JSON without importing
pre_acknowledge: If True, pre-acknowledges all imported reservations
client_id: Client ID for pre-acknowledgement (required if pre_acknowledge=True)
username: Username for pre-acknowledgement (optional, but recommended)
Returns:
Dictionary with import statistics or parsed data (if dryrun=True)
@@ -200,6 +203,9 @@ class CSVImporter:
if not path.exists():
raise FileNotFoundError(f"CSV file not found: {csv_file_path}")
if pre_acknowledge and not client_id:
raise ValueError("client_id is required when pre_acknowledge=True")
# Start a transaction - will rollback on any exception
await self.db_session.begin()
@@ -272,6 +278,7 @@ class CSVImporter:
"existing_customers": 0,
"created_reservations": 0,
"skipped_duplicates": 0,
"pre_acknowledged": 0,
"errors": [],
}
@@ -353,10 +360,10 @@ class CSVImporter:
"name_title": None,
}
# Get or create customer
customer = await self._find_or_create_customer(customer_data)
# Get or create customer (without committing)
customer = await self._find_or_create_customer(customer_data, auto_commit=False)
if customer.id is None:
await self.db_session.refresh(customer)
await self.db_session.flush() # Flush to get customer.id
stats["created_customers"] += 1
else:
stats["existing_customers"] += 1
@@ -463,13 +470,28 @@ class CSVImporter:
room_classification_code=room_class_code,
)
# Create reservation if customer exists
# Create reservation if customer exists (without committing)
if customer.id:
await self.reservation_service.create_reservation(
reservation, customer.id
db_reservation = await self.reservation_service.create_reservation(
reservation, customer.id, auto_commit=False
)
stats["created_reservations"] += 1
_LOGGER.info("Created reservation for %s %s", first_name, last_name)
# Pre-acknowledge if requested
if pre_acknowledge and db_reservation.md5_unique_id:
await self.reservation_service.record_acknowledgement(
client_id=client_id,
unique_id=db_reservation.md5_unique_id,
username=username,
auto_commit=False
)
stats["pre_acknowledged"] += 1
_LOGGER.debug(
"Pre-acknowledged reservation %s for client %s",
db_reservation.md5_unique_id,
username or client_id
)
else:
raise ValueError("Failed to get or create customer")
@@ -505,7 +527,7 @@ class CSVImporter:
else:
return None
async def _find_or_create_customer(self, customer_data: dict) -> Customer:
async def _find_or_create_customer(self, customer_data: dict, auto_commit: bool = True) -> Customer:
"""Find existing customer or create new one.
Args:
@@ -550,18 +572,18 @@ class CSVImporter:
# Update customer data if needed
try:
existing_customer = await self.customer_service.update_customer(
existing, customer_data
existing, customer_data, auto_commit=auto_commit
)
except Exception as e:
print(customer_data)
print("---")
print(existing)
raise
return existing_customer
# Create new customer
return await self.customer_service.create_customer(customer_data)
return await self.customer_service.create_customer(customer_data, auto_commit=auto_commit)

View File

@@ -23,11 +23,12 @@ class CustomerService:
def __init__(self, session: AsyncSession):
self.session = session
async def create_customer(self, customer_data: dict) -> Customer:
async def create_customer(self, customer_data: dict, auto_commit: bool = True) -> Customer:
"""Create a new customer and automatically create its hashed version.
Args:
customer_data: Dictionary containing customer fields
auto_commit: If True, commits the transaction. If False, caller must commit.
Returns:
The created Customer instance (with hashed_version relationship populated)
@@ -60,17 +61,19 @@ class CustomerService:
hashed_customer.created_at = datetime.now(UTC)
self.session.add(hashed_customer)
await self.session.commit()
await self.session.refresh(customer)
if auto_commit:
await self.session.commit()
await self.session.refresh(customer)
return customer
async def update_customer(self, customer: Customer, update_data: dict) -> Customer:
async def update_customer(self, customer: Customer, update_data: dict, auto_commit: bool = True) -> Customer:
"""Update an existing customer and sync its hashed version.
Args:
customer: The customer to update
update_data: Dictionary of fields to update
auto_commit: If True, commits the transaction. If False, caller must commit.
Returns:
The updated Customer instance
@@ -151,8 +154,9 @@ class CustomerService:
hashed_customer.created_at = datetime.now(UTC)
self.session.add(hashed_customer)
await self.session.commit()
await self.session.refresh(customer)
if auto_commit:
await self.session.commit()
await self.session.refresh(customer)
return customer
@@ -171,7 +175,7 @@ class CustomerService:
)
return result.scalar_one_or_none()
async def get_or_create_customer(self, customer_data: dict) -> Customer:
async def get_or_create_customer(self, customer_data: dict, auto_commit: bool = True) -> Customer:
"""Get existing customer or create new one if not found.
Uses contact_id to identify existing customers if provided.
@@ -179,6 +183,7 @@ class CustomerService:
Args:
customer_data: Dictionary containing customer fields
(contact_id is optional)
auto_commit: If True, commits the transaction. If False, caller must commit.
Returns:
Existing or newly created Customer instance
@@ -190,10 +195,10 @@ class CustomerService:
existing = await self.get_customer_by_contact_id(contact_id)
if existing:
# Update existing customer
return await self.update_customer(existing, customer_data)
return await self.update_customer(existing, customer_data, auto_commit=auto_commit)
# Create new customer (either no contact_id or customer doesn't exist)
return await self.create_customer(customer_data)
return await self.create_customer(customer_data, auto_commit=auto_commit)
async def get_hashed_customer(self, customer_id: int) -> HashedCustomer | None:
"""Get the hashed version of a customer.

View File

@@ -1,18 +1,58 @@
import asyncio
import hashlib
import os
from typing import Any, AsyncGenerator, Callable, TypeVar
from collections.abc import AsyncGenerator, Callable
from typing import TypeVar
from sqlalchemy import Boolean, Column, Date, DateTime, ForeignKey, Integer, String, JSON
from sqlalchemy import (
JSON,
Boolean,
Column,
Date,
DateTime,
Double,
ForeignKey,
Integer,
String,
)
from sqlalchemy.exc import DBAPIError
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.ext.asyncio import (
AsyncEngine,
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.orm import declarative_base, relationship
from .logging_config import get_logger
_LOGGER = get_logger(__name__)
Base = declarative_base()
# Load schema from config at module level
# This happens once when the module is imported
try:
from .config_loader import load_config
_app_config = load_config()
_SCHEMA = _app_config.get("database", {}).get("schema")
except (FileNotFoundError, KeyError, ValueError, ImportError):
_SCHEMA = None
# If schema isn't in config, try environment variable
if not _SCHEMA:
_SCHEMA = os.environ.get("DATABASE_SCHEMA")
class Base:
"""Base class that applies schema to all tables."""
# # Set schema on all tables if configured
# if _SCHEMA:
# __table_args__ = {"schema": _SCHEMA}
Base = declarative_base(cls=Base)
# Type variable for async functions
T = TypeVar("T")
@@ -45,26 +85,30 @@ def get_database_schema(config=None):
Schema name string, or None if not configured
"""
# Check environment variable first (takes precedence)
schema = os.environ.get("DATABASE_SCHEMA")
if schema:
return schema
# Fall back to config file
if config and "database" in config and "schema" in config["database"]:
return config["database"]["schema"]
return os.environ.get("DATABASE_SCHEMA")
return None
def configure_schema(schema_name=None):
def configure_schema(schema_name):
"""Configure the database schema for all models.
This should be called before creating tables or running migrations.
For PostgreSQL, this sets the schema for all tables.
For other databases, this is a no-op.
IMPORTANT: This must be called BEFORE any models are imported/defined.
It modifies the Base class to apply schema to all tables.
Args:
schema_name: Name of the schema to use (e.g., "alpinebits")
"""
if schema_name:
# Update the schema for all tables in Base metadata
for table in Base.metadata.tables.values():
table.schema = schema_name
# Set __table_args__ on the Base class to apply schema to all tables
Base.__table_args__ = {"schema": _SCHEMA}
def create_database_engine(config=None, echo=False) -> AsyncEngine:
@@ -87,7 +131,7 @@ def create_database_engine(config=None, echo=False) -> AsyncEngine:
database_url = get_database_url(config)
schema_name = get_database_schema(config)
# Configure schema for all models if specified
# # Configure schema for all models if specified
if schema_name:
configure_schema(schema_name)
_LOGGER.info("Configured database schema: %s", schema_name)
@@ -95,9 +139,7 @@ def create_database_engine(config=None, echo=False) -> AsyncEngine:
# Create engine with connect_args to set search_path for PostgreSQL
connect_args = {}
if schema_name and "postgresql" in database_url:
connect_args = {
"server_settings": {"search_path": f"{schema_name},public"}
}
connect_args = {"server_settings": {"search_path": f"{schema_name},public"}}
_LOGGER.info("Setting PostgreSQL search_path to: %s,public", schema_name)
return create_async_engine(database_url, echo=echo, connect_args=connect_args)
@@ -120,13 +162,12 @@ class ResilientAsyncSession:
Args:
async_sessionmaker_: Factory for creating async sessions
engine: The SQLAlchemy async engine for connection recovery
"""
self.async_sessionmaker = async_sessionmaker_
self.engine = engine
async def execute_with_retry(
self, func: Callable[..., T], *args, **kwargs
) -> T:
async def execute_with_retry(self, func: Callable[..., T], *args, **kwargs) -> T:
"""Execute a function with automatic retry on connection errors.
Args:
@@ -139,6 +180,7 @@ class ResilientAsyncSession:
Raises:
The original exception if all retries are exhausted
"""
last_error = None
@@ -169,7 +211,7 @@ class ResilientAsyncSession:
# Wait before retry (exponential backoff)
if attempt < MAX_RETRIES - 1:
wait_time = RETRY_DELAY * (2 ** attempt)
wait_time = RETRY_DELAY * (2**attempt)
await asyncio.sleep(wait_time)
else:
# Not a connection-related error, re-raise immediately
@@ -201,6 +243,7 @@ class SessionMaker:
Args:
async_sessionmaker_: SQLAlchemy async_sessionmaker factory
"""
self.async_sessionmaker = async_sessionmaker_
@@ -210,13 +253,14 @@ class SessionMaker:
Returns:
A new AsyncSession instance ready for use. Caller is responsible
for managing the session lifecycle (closing when done).
"""
return self.async_sessionmaker()
async def get_resilient_session(
resilient_session: "ResilientAsyncSession",
) -> AsyncGenerator[AsyncSession, None]:
) -> AsyncGenerator[AsyncSession]:
"""Dependency for FastAPI that provides a resilient async session.
This generator creates a new session with automatic retry capability
@@ -227,6 +271,7 @@ async def get_resilient_session(
Yields:
AsyncSession instance for database operations
"""
async with resilient_session.async_sessionmaker() as session:
yield session
@@ -356,10 +401,19 @@ class Reservation(Base):
# Table for tracking acknowledged requests by client
class AckedRequest(Base):
"""Tracks which Reservations the Client has already seen via ReadAction.
Clients can report successfull transfers via ReportNotifAction. This gets stored in this table.
This prevents re-sending the same reservation multiple times to the client.
"""
__tablename__ = "acked_requests"
id = Column(Integer, primary_key=True)
client_id = Column(String, index=True)
username = Column(String, index=True, nullable=True) # Username of the client making the request
username = Column(
String, index=True, nullable=True
) # Username of the client making the request
unique_id = Column(
String, index=True
) # Should match Reservation.form_id or another unique field
@@ -371,10 +425,13 @@ class Conversion(Base):
Represents a single reservation event from the PMS XML with all its metadata.
Each row links to one reservation from the PMS system. A reservation can have
multiple room reservations (stored in RoomReservation table).
multiple room reservations (stored in ConversionRoom table).
Linked to reservations via advertising tracking data (fbclid, gclid, etc)
stored in advertisingCampagne field.
The tracking data transferered by the PMS is however somewhat shorter.
We therefore also need to match on guest name/email and other metadata.
"""
__tablename__ = "conversions"
@@ -403,6 +460,8 @@ class Conversion(Base):
guest_last_name = Column(String, index=True) # lastName from guest element
guest_email = Column(String, index=True) # email from guest element
guest_country_code = Column(String) # countryCode from guest element
guest_birth_date = Column(Date) # birthDate from guest element
guest_id = Column(String) # id from guest element
# Advertising/tracking data - used for matching to existing reservations
advertising_medium = Column(
@@ -423,12 +482,12 @@ class Conversion(Base):
reservation = relationship("Reservation", backref="conversions")
customer = relationship("Customer", backref="conversions")
hashed_customer = relationship("HashedCustomer", backref="conversions")
room_reservations = relationship(
"RoomReservation", back_populates="conversion", cascade="all, delete-orphan"
conversion_rooms = relationship(
"ConversionRoom", back_populates="conversion", cascade="all, delete-orphan"
)
class RoomReservation(Base):
class ConversionRoom(Base):
"""Room reservation data from hotel PMS.
Represents a single room reservation within a conversion/PMS reservation.
@@ -438,7 +497,7 @@ class RoomReservation(Base):
for efficient querying.
"""
__tablename__ = "room_reservations"
__tablename__ = "conversion_rooms"
id = Column(Integer, primary_key=True)
# Link to the parent conversion/PMS reservation
@@ -471,11 +530,11 @@ class RoomReservation(Base):
# Extracted total revenue for efficient querying (sum of all revenue_total in daily_sales)
# Kept as string to preserve decimal precision
total_revenue = Column(String, nullable=True)
total_revenue = Column(Double, nullable=True)
# Metadata
created_at = Column(DateTime(timezone=True)) # When this record was imported
updated_at = Column(DateTime(timezone=True)) # When this record was last updated
# Relationships
conversion = relationship("Conversion", back_populates="room_reservations")
conversion = relationship("Conversion", back_populates="conversion_rooms")

View File

@@ -0,0 +1,274 @@
"""Database setup and initialization.
This module handles all database setup tasks that should run once at startup,
before the application starts accepting requests. It includes:
- Schema migrations via Alembic
- One-time data cleanup/backfill tasks (e.g., hashing existing customers)
"""
import asyncio
from typing import Any
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker
from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT
from .customer_service import CustomerService
from .db import create_database_engine
from .logging_config import get_logger
_LOGGER = get_logger(__name__)
async def setup_database(config: dict[str, Any] | None = None) -> tuple[AsyncEngine, async_sessionmaker]:
"""Set up the database and prepare for application use.
This function should be called once at application startup, after
migrations have been run but before the app starts accepting requests. It:
1. Creates the async engine
2. Creates the sessionmaker
3. Performs one-time startup tasks (e.g., hashing existing customers)
NOTE: Database migrations should be run BEFORE calling this function,
typically using `uv run alembic upgrade head` or via run_migrations.py.
Args:
config: Application configuration dictionary
Returns:
Tuple of (engine, async_sessionmaker) for use in the application
Raises:
Any database-related exceptions that occur during setup
"""
_LOGGER.info("Starting database setup...")
# Create database engine
engine = create_database_engine(config=config, echo=False)
try:
# Create sessionmaker for the application to use
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
# Perform startup tasks (NOT migrations)
_LOGGER.info("Running startup tasks...")
await run_startup_tasks(AsyncSessionLocal, config)
_LOGGER.info("Startup tasks completed successfully")
_LOGGER.info("Database setup completed successfully")
return engine, AsyncSessionLocal
except Exception as e:
_LOGGER.exception("Database setup failed: %s", e)
await engine.dispose()
raise
async def backfill_advertising_account_ids(
engine: AsyncEngine, config: dict[str, Any]
) -> None:
"""Backfill advertising account IDs for existing reservations.
Updates existing reservations to populate meta_account_id and google_account_id
based on the conditional logic:
- If fbclid is present, set meta_account_id from hotel config
- If gclid is present, set google_account_id from hotel config
This is a startup task that runs after schema migrations to ensure
existing data is consistent with config.
Args:
engine: SQLAlchemy async engine
config: Application configuration dict
"""
_LOGGER.info("Backfilling advertising account IDs for existing reservations...")
# Build a mapping of hotel_id -> account IDs from config
hotel_accounts = {}
alpine_bits_auth = config.get("alpine_bits_auth", [])
for hotel in alpine_bits_auth:
hotel_id = hotel.get(CONF_HOTEL_ID)
meta_account = hotel.get(CONF_META_ACCOUNT)
google_account = hotel.get(CONF_GOOGLE_ACCOUNT)
if hotel_id:
hotel_accounts[hotel_id] = {
"meta_account": meta_account,
"google_account": google_account,
}
if not hotel_accounts:
_LOGGER.debug("No hotel accounts found in config, skipping backfill")
return
_LOGGER.info("Found %d hotel(s) with account configurations", len(hotel_accounts))
# Update reservations with meta_account_id where fbclid is present
meta_updated = 0
for hotel_id, accounts in hotel_accounts.items():
if accounts["meta_account"]:
async with engine.begin() as conn:
sql = text(
"UPDATE reservations "
"SET meta_account_id = :meta_account "
"WHERE hotel_code = :hotel_id "
"AND fbclid IS NOT NULL "
"AND fbclid != '' "
"AND (meta_account_id IS NULL OR meta_account_id = '')"
)
result = await conn.execute(
sql,
{"meta_account": accounts["meta_account"], "hotel_id": hotel_id},
)
count = result.rowcount
if count > 0:
_LOGGER.info(
"Updated %d reservations with meta_account_id for hotel %s",
count,
hotel_id,
)
meta_updated += count
# Update reservations with google_account_id where gclid is present
google_updated = 0
for hotel_id, accounts in hotel_accounts.items():
if accounts["google_account"]:
async with engine.begin() as conn:
sql = text(
"UPDATE reservations "
"SET google_account_id = :google_account "
"WHERE hotel_code = :hotel_id "
"AND gclid IS NOT NULL "
"AND gclid != '' "
"AND (google_account_id IS NULL OR google_account_id = '')"
)
result = await conn.execute(
sql,
{
"google_account": accounts["google_account"],
"hotel_id": hotel_id,
},
)
count = result.rowcount
if count > 0:
_LOGGER.info(
"Updated %d reservations with google_account_id for hotel %s",
count,
hotel_id,
)
google_updated += count
if meta_updated > 0 or google_updated > 0:
_LOGGER.info(
"Backfill complete: %d reservations updated with meta_account_id, "
"%d with google_account_id",
meta_updated,
google_updated,
)
async def backfill_acked_requests_username(
engine: AsyncEngine, config: dict[str, Any]
) -> None:
"""Backfill username for existing acked_requests records.
For each acknowledgement, find the corresponding reservation to determine
its hotel_code, then look up the username for that hotel in the config
and update the acked_request record.
This is a startup task that runs after schema migrations to ensure
existing data is consistent with config.
Args:
engine: SQLAlchemy async engine
config: Application configuration dict
"""
_LOGGER.info("Backfilling usernames for existing acked_requests...")
# Build a mapping of hotel_id -> username from config
hotel_usernames = {}
alpine_bits_auth = config.get("alpine_bits_auth", [])
for hotel in alpine_bits_auth:
hotel_id = hotel.get(CONF_HOTEL_ID)
username = hotel.get("username")
if hotel_id and username:
hotel_usernames[hotel_id] = username
if not hotel_usernames:
_LOGGER.debug("No hotel usernames found in config, skipping backfill")
return
_LOGGER.info("Found %d hotel(s) with usernames in config", len(hotel_usernames))
# Update acked_requests with usernames by matching to reservations
total_updated = 0
async with engine.begin() as conn:
for hotel_id, username in hotel_usernames.items():
sql = text(
"""
UPDATE acked_requests
SET username = :username
WHERE unique_id IN (
SELECT md5_unique_id FROM reservations WHERE hotel_code = :hotel_id
)
AND username IS NULL
"""
)
result = await conn.execute(
sql, {"username": username, "hotel_id": hotel_id}
)
count = result.rowcount
if count > 0:
_LOGGER.info(
"Updated %d acknowledgements with username for hotel %s",
count,
hotel_id,
)
total_updated += count
if total_updated > 0:
_LOGGER.info(
"Backfill complete: %d acknowledgements updated with username",
total_updated,
)
async def run_startup_tasks(
sessionmaker: async_sessionmaker,
config: dict[str, Any] | None = None,
engine: AsyncEngine | None = None,
) -> None:
"""Run one-time startup tasks.
These are tasks that need to run at startup but are NOT schema migrations.
Examples: data backfills, hashing existing records, etc.
Args:
sessionmaker: SQLAlchemy async sessionmaker
config: Application configuration dictionary
engine: SQLAlchemy async engine (optional, for backfill tasks)
"""
# Hash any existing customers that don't have hashed data
async with sessionmaker() as session:
customer_service = CustomerService(session)
hashed_count = await customer_service.hash_existing_customers()
if hashed_count > 0:
_LOGGER.info(
"Backfilled hashed data for %d existing customers", hashed_count
)
else:
_LOGGER.debug("All existing customers already have hashed data")
# Backfill advertising account IDs and usernames based on config
# This ensures existing data is consistent with current configuration
if config and engine:
await backfill_advertising_account_ids(engine, config)
await backfill_acked_requests_username(engine, config)
elif config and not engine:
_LOGGER.warning(
"No engine provided to run_startup_tasks, "
"skipping config-based backfill tasks"
)

View File

@@ -1,7 +1,24 @@
"""Database migrations for AlpineBits.
"""DEPRECATED: Legacy database migrations for AlpineBits.
This module contains migration functions that are automatically run at app startup
to update existing database schemas without losing data.
⚠️ WARNING: This module is deprecated and no longer used. ⚠️
SCHEMA MIGRATIONS are now handled by Alembic (see alembic/versions/).
STARTUP TASKS (data backfills) are now in db_setup.py.
Migration History:
- migrate_add_room_types: Schema migration (should be in Alembic)
- migrate_add_advertising_account_ids: Schema + backfill (split into Alembic + db_setup.py)
- migrate_add_username_to_acked_requests: Schema + backfill (split into Alembic + db_setup.py)
- migrate_normalize_conversions: Schema migration (should be in Alembic)
Current Status:
- All schema changes are now managed via Alembic migrations
- All data backfills are now in db_setup.py as startup tasks
- This file is kept for reference but is no longer executed
Do not add new migrations here. Instead:
1. For schema changes: Create Alembic migration with `uv run alembic revision --autogenerate -m "description"`
2. For data backfills: Add to db_setup.py as a startup task
"""
from typing import Any
@@ -11,12 +28,13 @@ from sqlalchemy.ext.asyncio import AsyncEngine
from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT
from .logging_config import get_logger
from .db import Reservation
_LOGGER = get_logger(__name__)
async def check_column_exists(engine: AsyncEngine, table_name: str, column_name: str) -> bool:
async def check_column_exists(
engine: AsyncEngine, table_name: str, column_name: str
) -> bool:
"""Check if a column exists in a table.
Args:
@@ -26,11 +44,13 @@ async def check_column_exists(engine: AsyncEngine, table_name: str, column_name:
Returns:
True if column exists, False otherwise
"""
async with engine.connect() as conn:
def _check(connection):
inspector = inspect(connection)
columns = [col['name'] for col in inspector.get_columns(table_name)]
columns = [col["name"] for col in inspector.get_columns(table_name)]
return column_name in columns
result = await conn.run_sync(_check)
@@ -38,10 +58,7 @@ async def check_column_exists(engine: AsyncEngine, table_name: str, column_name:
async def add_column_if_not_exists(
engine: AsyncEngine,
table_name: str,
column_name: str,
column_type: str = "VARCHAR"
engine: AsyncEngine, table_name: str, column_name: str, column_type: str = "VARCHAR"
) -> bool:
"""Add a column to a table if it doesn't already exist.
@@ -53,6 +70,7 @@ async def add_column_if_not_exists(
Returns:
True if column was added, False if it already existed
"""
exists = await check_column_exists(engine, table_name, column_name)
@@ -85,10 +103,14 @@ async def migrate_add_room_types(engine: AsyncEngine) -> None:
added_count = 0
# Add each column if it doesn't exist
if await add_column_if_not_exists(engine, "reservations", "room_type_code", "VARCHAR"):
if await add_column_if_not_exists(
engine, "reservations", "room_type_code", "VARCHAR"
):
added_count += 1
if await add_column_if_not_exists(engine, "reservations", "room_classification_code", "VARCHAR"):
if await add_column_if_not_exists(
engine, "reservations", "room_classification_code", "VARCHAR"
):
added_count += 1
if await add_column_if_not_exists(engine, "reservations", "room_type", "VARCHAR"):
@@ -100,7 +122,9 @@ async def migrate_add_room_types(engine: AsyncEngine) -> None:
_LOGGER.info("Migration add_room_types: No changes needed (already applied)")
async def migrate_add_advertising_account_ids(engine: AsyncEngine, config: dict[str, Any] | None = None) -> None:
async def migrate_add_advertising_account_ids(
engine: AsyncEngine, config: dict[str, Any] | None = None
) -> None:
"""Migration: Add advertising account ID fields to reservations table.
This migration adds two optional fields:
@@ -114,20 +138,27 @@ async def migrate_add_advertising_account_ids(engine: AsyncEngine, config: dict[
Args:
engine: SQLAlchemy async engine
config: Application configuration dict containing hotel account IDs
"""
_LOGGER.info("Running migration: add_advertising_account_ids")
added_count = 0
# Add each column if it doesn't exist
if await add_column_if_not_exists(engine, "reservations", "meta_account_id", "VARCHAR"):
if await add_column_if_not_exists(
engine, "reservations", "meta_account_id", "VARCHAR"
):
added_count += 1
if await add_column_if_not_exists(engine, "reservations", "google_account_id", "VARCHAR"):
if await add_column_if_not_exists(
engine, "reservations", "google_account_id", "VARCHAR"
):
added_count += 1
if added_count > 0:
_LOGGER.info("Migration add_advertising_account_ids: Added %d columns", added_count)
_LOGGER.info(
"Migration add_advertising_account_ids: Added %d columns", added_count
)
else:
_LOGGER.info("Migration add_advertising_account_ids: Columns already exist")
@@ -135,10 +166,14 @@ async def migrate_add_advertising_account_ids(engine: AsyncEngine, config: dict[
if config:
await _backfill_advertising_account_ids(engine, config)
else:
_LOGGER.warning("No config provided, skipping backfill of advertising account IDs")
_LOGGER.warning(
"No config provided, skipping backfill of advertising account IDs"
)
async def _backfill_advertising_account_ids(engine: AsyncEngine, config: dict[str, Any]) -> None:
async def _backfill_advertising_account_ids(
engine: AsyncEngine, config: dict[str, Any]
) -> None:
"""Backfill advertising account IDs for existing reservations.
Updates existing reservations to populate meta_account_id and google_account_id
@@ -149,6 +184,7 @@ async def _backfill_advertising_account_ids(engine: AsyncEngine, config: dict[st
Args:
engine: SQLAlchemy async engine
config: Application configuration dict
"""
_LOGGER.info("Backfilling advertising account IDs for existing reservations...")
@@ -164,7 +200,7 @@ async def _backfill_advertising_account_ids(engine: AsyncEngine, config: dict[st
if hotel_id:
hotel_accounts[hotel_id] = {
"meta_account": meta_account,
"google_account": google_account
"google_account": google_account,
}
if not hotel_accounts:
@@ -188,11 +224,15 @@ async def _backfill_advertising_account_ids(engine: AsyncEngine, config: dict[st
)
result = await conn.execute(
sql,
{"meta_account": accounts["meta_account"], "hotel_id": hotel_id}
{"meta_account": accounts["meta_account"], "hotel_id": hotel_id},
)
count = result.rowcount
if count > 0:
_LOGGER.info("Updated %d reservations with meta_account_id for hotel %s", count, hotel_id)
_LOGGER.info(
"Updated %d reservations with meta_account_id for hotel %s",
count,
hotel_id,
)
meta_updated += count
# Update reservations with google_account_id where gclid is present
@@ -210,21 +250,30 @@ async def _backfill_advertising_account_ids(engine: AsyncEngine, config: dict[st
)
result = await conn.execute(
sql,
{"google_account": accounts["google_account"], "hotel_id": hotel_id}
{
"google_account": accounts["google_account"],
"hotel_id": hotel_id,
},
)
count = result.rowcount
if count > 0:
_LOGGER.info("Updated %d reservations with google_account_id for hotel %s", count, hotel_id)
_LOGGER.info(
"Updated %d reservations with google_account_id for hotel %s",
count,
hotel_id,
)
google_updated += count
_LOGGER.info(
"Backfill complete: %d reservations updated with meta_account_id, %d with google_account_id",
meta_updated,
google_updated
google_updated,
)
async def migrate_add_username_to_acked_requests(engine: AsyncEngine, config: dict[str, Any] | None = None) -> None:
async def migrate_add_username_to_acked_requests(
engine: AsyncEngine, config: dict[str, Any] | None = None
) -> None:
"""Migration: Add username column to acked_requests table and backfill with hotel usernames.
This migration adds a username column to acked_requests to track acknowledgements by username
@@ -238,6 +287,7 @@ async def migrate_add_username_to_acked_requests(engine: AsyncEngine, config: di
Args:
engine: SQLAlchemy async engine
config: Application configuration dict containing hotel usernames
"""
_LOGGER.info("Running migration: add_username_to_acked_requests")
@@ -252,10 +302,14 @@ async def migrate_add_username_to_acked_requests(engine: AsyncEngine, config: di
if config:
await _backfill_acked_requests_username(engine, config)
else:
_LOGGER.warning("No config provided, skipping backfill of acked_requests usernames")
_LOGGER.warning(
"No config provided, skipping backfill of acked_requests usernames"
)
async def _backfill_acked_requests_username(engine: AsyncEngine, config: dict[str, Any]) -> None:
async def _backfill_acked_requests_username(
engine: AsyncEngine, config: dict[str, Any]
) -> None:
"""Backfill username for existing acked_requests records.
For each acknowledgement, find the corresponding reservation to determine its hotel_code,
@@ -264,6 +318,7 @@ async def _backfill_acked_requests_username(engine: AsyncEngine, config: dict[st
Args:
engine: SQLAlchemy async engine
config: Application configuration dict
"""
_LOGGER.info("Backfilling usernames for existing acked_requests...")
@@ -297,15 +352,53 @@ async def _backfill_acked_requests_username(engine: AsyncEngine, config: dict[st
AND username IS NULL
""")
result = await conn.execute(
sql,
{"username": username, "hotel_id": hotel_id}
sql, {"username": username, "hotel_id": hotel_id}
)
count = result.rowcount
if count > 0:
_LOGGER.info("Updated %d acknowledgements with username for hotel %s", count, hotel_id)
_LOGGER.info(
"Updated %d acknowledgements with username for hotel %s",
count,
hotel_id,
)
total_updated += count
_LOGGER.info("Backfill complete: %d acknowledgements updated with username", total_updated)
_LOGGER.info(
"Backfill complete: %d acknowledgements updated with username", total_updated
)
async def table_exists(engine: AsyncEngine, table_name: str) -> bool:
"""Check if a table exists in the database.
Args:
engine: SQLAlchemy async engine
table_name: Name of the table to check
Returns:
True if table exists, False otherwise
"""
async with engine.connect() as conn:
def _check(connection):
inspector = inspect(connection)
return table_name in inspector.get_table_names()
return await conn.run_sync(_check)
async def drop_table(engine: AsyncEngine, table_name: str) -> None:
"""Drop a table from the database.
Args:
engine: SQLAlchemy async engine
table_name: Name of the table to drop
"""
async with engine.begin() as conn:
await conn.execute(text(f"DROP TABLE IF EXISTS {table_name}"))
_LOGGER.info("Dropped table: %s", table_name)
async def migrate_normalize_conversions(engine: AsyncEngine) -> None:
@@ -313,7 +406,7 @@ async def migrate_normalize_conversions(engine: AsyncEngine) -> None:
This migration redesigns the conversion data structure:
- conversions: One row per PMS reservation (with guest/advertising metadata)
- room_reservations: One row per room reservation (linked to conversion)
- conversion_rooms: One row per room reservation (linked to conversion)
- daily_sales: JSON array of daily sales within each room reservation
- total_revenue: Extracted sum of all daily sales for efficiency
@@ -326,20 +419,88 @@ async def migrate_normalize_conversions(engine: AsyncEngine) -> None:
- Efficient querying via extracted total_revenue field
- All daily sales details preserved in JSON for analysis
The tables are created via Base.metadata.create_all() at startup.
The new tables are created via Base.metadata.create_all() at startup.
This migration handles cleanup of old schema versions.
Safe to run multiple times - idempotent.
"""
_LOGGER.info("Running migration: normalize_conversions")
# Check if the old conversions table exists with the old schema
# If the table exists but doesn't match our current schema definition, drop it
old_conversions_exists = await table_exists(engine, "conversions")
if old_conversions_exists:
# Check if this is the old-style table (we'll look for unexpected columns)
# The old table would not have the new structure we've defined
async with engine.connect() as conn:
def _get_columns(connection):
inspector = inspect(connection)
return [col["name"] for col in inspector.get_columns("conversions")]
old_columns = await conn.run_sync(_get_columns)
# Expected columns in the new schema (defined in db.py)
# If the table is missing key columns from our schema, it's the old version
expected_columns = {
"id",
"reservation_id",
"customer_id",
"hashed_customer_id",
"hotel_id",
"pms_reservation_id",
"reservation_number",
"reservation_date",
"creation_time",
"reservation_type",
"booking_channel",
"guest_first_name",
"guest_last_name",
"guest_email",
"guest_country_code",
"advertising_medium",
"advertising_partner",
"advertising_campagne",
"created_at",
"updated_at",
}
old_columns_set = set(old_columns)
# If we're missing critical new columns, this is the old schema
if not expected_columns.issubset(old_columns_set):
_LOGGER.info(
"Found old conversions table with incompatible schema. "
"Old columns: %s. Expected new columns: %s",
old_columns,
expected_columns,
)
await drop_table(engine, "conversions")
_LOGGER.info(
"Dropped old conversions table to allow creation of new schema"
)
else:
_LOGGER.info(
"Conversions table exists with compatible schema, no migration needed"
)
# Check for the old conversion_rooms table (which should not exist in the new schema)
old_conversion_rooms_exists = await table_exists(engine, "conversion_rooms")
if old_conversion_rooms_exists:
await drop_table(engine, "conversion_rooms")
_LOGGER.info("Dropped old conversion_rooms table")
_LOGGER.info(
"Conversion data structure redesigned: "
"conversions (1 per PMS reservation) + "
"room_reservations (1 per room, daily_sales as JSON). "
"Tables created/updated via Base.metadata.create_all()"
"Migration normalize_conversions: Conversion data structure normalized. "
"New tables (conversions + conversion_rooms) will be created/updated via "
"Base.metadata.create_all()"
)
async def run_all_migrations(engine: AsyncEngine, config: dict[str, Any] | None = None) -> None:
async def run_all_migrations(
engine: AsyncEngine, config: dict[str, Any] | None = None
) -> None:
"""Run all pending migrations.
This function should be called at app startup, after Base.metadata.create_all.
@@ -348,6 +509,7 @@ async def run_all_migrations(engine: AsyncEngine, config: dict[str, Any] | None
Args:
engine: SQLAlchemy async engine
config: Application configuration dict (optional, but required for some migrations)
"""
_LOGGER.info("Starting database migrations...")

View File

@@ -48,13 +48,14 @@ class ReservationService:
return Reservation(**data)
async def create_reservation(
self, reservation_data: ReservationData, customer_id: int
self, reservation_data: ReservationData, customer_id: int, auto_commit: bool = True
) -> Reservation:
"""Create a new reservation.
Args:
reservation_data: ReservationData containing reservation details
customer_id: ID of the customer making the reservation
auto_commit: If True, commits the transaction. If False, caller must commit.
Returns:
Created Reservation instance
@@ -63,8 +64,13 @@ class ReservationService:
reservation_data, customer_id
)
self.session.add(reservation)
await self.session.commit()
await self.session.refresh(reservation)
if auto_commit:
await self.session.commit()
await self.session.refresh(reservation)
else:
await self.session.flush() # Flush to get the reservation.id
return reservation
async def get_reservation_by_unique_id(
@@ -220,7 +226,7 @@ class ReservationService:
]
async def record_acknowledgement(
self, client_id: str, unique_id: str, username: Optional[str] = None
self, client_id: str, unique_id: str, username: Optional[str] = None, auto_commit: bool = True
) -> AckedRequest:
"""Record that a client has acknowledged a reservation.
@@ -228,6 +234,7 @@ class ReservationService:
client_id: The client ID
unique_id: The unique_id of the reservation (md5_unique_id)
username: The username of the client making the request (optional)
auto_commit: If True, commits the transaction. If False, caller must commit.
Returns:
Created AckedRequest instance
@@ -239,8 +246,13 @@ class ReservationService:
timestamp=datetime.now(UTC),
)
self.session.add(acked)
await self.session.commit()
await self.session.refresh(acked)
if auto_commit:
await self.session.commit()
await self.session.refresh(acked)
else:
await self.session.flush() # Flush to get the acked.id
return acked
async def is_acknowledged(self, unique_id: str, username: Optional[str] = None, client_id: Optional[str] = None) -> bool:

View File

@@ -1,19 +1,115 @@
#!/usr/bin/env python3
"""Startup script for the Wix Form Handler API."""
"""Startup script for the Alpine Bits Python Server API.
import os
This script:
1. Runs database migrations using Alembic
2. Starts the FastAPI application with uvicorn
Database migrations are run BEFORE starting the server to ensure the schema
is up to date. This approach works well with multiple workers since migrations
complete before any worker starts processing requests.
"""
import argparse
import sys
import uvicorn
if __name__ == "__main__":
# db_path = "alpinebits.db" # Adjust path if needed
# if os.path.exists(db_path):
# os.remove(db_path)
from alpine_bits_python.run_migrations import run_migrations
def parse_args() -> argparse.Namespace:
"""Parse command line arguments for uvicorn configuration."""
parser = argparse.ArgumentParser(
description="Run Alpine Bits Python Server with database migrations"
)
parser.add_argument(
"--host",
type=str,
default="0.0.0.0",
help="Host to bind to (default: 0.0.0.0)",
)
parser.add_argument(
"--port",
type=int,
default=8080,
help="Port to bind to (default: 8080)",
)
parser.add_argument(
"--workers",
type=int,
default=1,
help="Number of worker processes (default: 1)",
)
parser.add_argument(
"--reload",
action="store_true",
default=False,
help="Enable auto-reload for development (default: False)",
)
parser.add_argument(
"--log-level",
type=str,
default="info",
choices=["critical", "error", "warning", "info", "debug", "trace"],
help="Log level (default: info)",
)
parser.add_argument(
"--access-log",
action="store_true",
default=False,
help="Enable access log (default: False)",
)
parser.add_argument(
"--forwarded-allow-ips",
type=str,
default="127.0.0.1",
help=(
"Comma-separated list of IPs to trust for proxy headers "
"(default: 127.0.0.1)"
),
)
parser.add_argument(
"--proxy-headers",
action="store_true",
default=False,
help="Enable proxy headers (X-Forwarded-* headers) (default: False)",
)
parser.add_argument(
"--no-server-header",
action="store_true",
default=False,
help="Disable Server header in responses (default: False)",
)
return parser.parse_args()
if __name__ == "__main__":
# Parse command line arguments
args = parse_args()
# Run database migrations before starting the server
# This ensures the schema is up to date before any workers start
print("Running database migrations...")
try:
run_migrations()
print("Database migrations completed successfully")
except Exception as e:
print(f"Failed to run migrations: {e}", file=sys.stderr)
sys.exit(1)
# Start the API server
print("Starting API server...")
uvicorn.run(
"alpine_bits_python.api:app",
host="0.0.0.0",
port=8080,
reload=True, # Enable auto-reload during development
log_level="info",
host=args.host,
port=args.port,
workers=args.workers,
reload=args.reload,
log_level=args.log_level,
access_log=args.access_log,
forwarded_allow_ips=args.forwarded_allow_ips,
proxy_headers=args.proxy_headers,
server_header=not args.no_server_header,
)

View File

@@ -0,0 +1,74 @@
#!/usr/bin/env python3
"""Run database migrations using Alembic.
This script should be run before starting the application to ensure
the database schema is up to date. It can be run standalone or called
from run_api.py before starting uvicorn.
Usage:
uv run python -m alpine_bits_python.run_migrations
or
from alpine_bits_python.run_migrations import run_migrations
run_migrations()
"""
import subprocess
import sys
from pathlib import Path
from .logging_config import get_logger
_LOGGER = get_logger(__name__)
def run_migrations() -> None:
"""Run Alembic migrations to upgrade database to latest schema.
This function runs 'alembic upgrade head' to apply all pending migrations.
It will exit the process if migrations fail.
Raises:
SystemExit: If migrations fail
"""
_LOGGER.info("Running database migrations...")
# Get the project root directory (where alembic.ini is located)
# Assuming this file is in src/alpine_bits_python/
project_root = Path(__file__).parent.parent.parent
try:
# Run alembic upgrade head
result = subprocess.run(
["alembic", "upgrade", "head"],
cwd=project_root,
capture_output=True,
text=True,
check=True,
)
_LOGGER.info("Database migrations completed successfully")
_LOGGER.debug("Migration output: %s", result.stdout)
except subprocess.CalledProcessError as e:
_LOGGER.error("Failed to run database migrations:")
_LOGGER.error("Exit code: %d", e.returncode)
_LOGGER.error("stdout: %s", e.stdout)
_LOGGER.error("stderr: %s", e.stderr)
sys.exit(1)
except FileNotFoundError:
_LOGGER.error(
"Alembic not found. Please ensure it's installed: uv pip install alembic"
)
sys.exit(1)
if __name__ == "__main__":
# Configure basic logging if run directly
import logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
run_migrations()
print("Migrations completed successfully!")

View File

@@ -1,12 +1,30 @@
#!/usr/bin/env python3
"""Convenience launcher for the Wix Form Handler API."""
"""Convenience launcher for the Alpine Bits Python Server API (Development Mode)."""
import os
import subprocess
# Change to src directory
src_dir = os.path.join(os.path.dirname(__file__), "src/alpine_bits_python")
# Run the API using uv
# Run the API using uv with development settings
# This includes:
# - Auto-reload enabled for code changes
# - Single worker for easier debugging
# - Port 8080 for development
if __name__ == "__main__":
subprocess.run(["uv", "run", "python", os.path.join(src_dir, "run_api.py")], check=False)
subprocess.run(
[
"uv",
"run",
"python",
"-m",
"alpine_bits_python.run_api",
"--host",
"0.0.0.0",
"--port",
"8080",
"--workers",
"1",
"--reload",
"--log-level",
"info",
],
check=False,
)

View File

@@ -18,6 +18,8 @@ from unittest.mock import patch
import pytest
import pytest_asyncio
from alembic import command
from alembic.config import Config
from fastapi.testclient import TestClient
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
@@ -26,6 +28,26 @@ from alpine_bits_python.const import HttpStatusCode
from alpine_bits_python.db import Base, Customer, Reservation
def run_alembic_migrations(connection):
"""Run Alembic migrations on a SQLAlchemy connection.
This is used in tests to set up the database schema using migrations
instead of Base.metadata.create_all().
"""
# Get path to alembic.ini
project_root = Path(__file__).parent.parent
alembic_ini_path = project_root / "alembic.ini"
# Create Alembic config
alembic_cfg = Config(str(alembic_ini_path))
# Override the database URL to use the test connection
# For SQLite, we can't use the in-memory connection URL directly,
# so we'll use Base.metadata.create_all() for SQLite tests
# This is a limitation of Alembic with SQLite in-memory databases
Base.metadata.create_all(bind=connection)
@pytest_asyncio.fixture
async def test_db_engine():
"""Create an in-memory SQLite database for testing."""
@@ -34,7 +56,8 @@ async def test_db_engine():
echo=False,
)
# Create tables
# Create tables using Base.metadata.create_all for SQLite tests
# (Alembic doesn't work well with SQLite in-memory databases)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
@@ -88,17 +111,29 @@ def client(test_config):
Each test gets a fresh TestClient instance to avoid database conflicts.
Mocks load_config to return test_config instead of production config.
"""
import asyncio # noqa: PLC0415
# Import locally to avoid circular imports
from alpine_bits_python.alpinebits_server import AlpineBitsServer # noqa: PLC0415
# Mock load_config to return test_config instead of production config
with patch("alpine_bits_python.api.load_config", return_value=test_config):
# Create a new in-memory database for each test
engine = create_async_engine(
"sqlite+aiosqlite:///:memory:",
echo=False,
)
# Create a new in-memory database for each test
engine = create_async_engine(
"sqlite+aiosqlite:///:memory:",
echo=False,
)
# Create tables before TestClient starts (which triggers lifespan)
# This ensures tables exist when run_startup_tasks() runs
async def create_tables():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
asyncio.run(create_tables())
# Mock both load_config and create_database_engine
# This ensures the lifespan uses our test database instead of creating a new one
with patch("alpine_bits_python.api.load_config", return_value=test_config), \
patch("alpine_bits_python.api.create_database_engine", return_value=engine):
# Setup app state (will be overridden by lifespan but we set it anyway)
app.state.engine = engine
app.state.async_sessionmaker = async_sessionmaker(
@@ -107,8 +142,9 @@ def client(test_config):
app.state.config = test_config
app.state.alpine_bits_server = AlpineBitsServer(test_config)
# TestClient will trigger lifespan events which create the tables
# TestClient will trigger lifespan events
# The mocked load_config will ensure test_config is used
# The mocked create_database_engine will ensure our test database is used
with TestClient(app) as test_client:
yield test_client
@@ -737,8 +773,9 @@ class TestXMLUploadEndpoint:
headers={**basic_auth_headers, "Content-Type": "application/xml"},
)
assert response.status_code == HttpStatusCode.OK
assert "Xml received" in response.text
# Returns 202 Accepted since processing is now asynchronous
assert response.status_code == 202
assert "received and queued for processing" in response.text
def test_xml_upload_gzip_compressed(self, client, basic_auth_headers):
"""Test XML upload with gzip compression."""
@@ -761,7 +798,8 @@ class TestXMLUploadEndpoint:
headers=headers,
)
assert response.status_code == HttpStatusCode.OK
# Returns 202 Accepted since processing is now asynchronous
assert response.status_code == 202
def test_xml_upload_missing_auth(self, client):
"""Test XML upload without authentication."""

28
uv.lock generated
View File

@@ -14,12 +14,27 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/f5/10/6c25ed6de94c49f88a91fa5018cb4c0f3625f31d5be9f771ebe5cc7cd506/aiosqlite-0.21.0-py3-none-any.whl", hash = "sha256:2549cf4057f95f53dcba16f2b64e8e2791d7e1adedb13197dd8ed77bb226d7d0", size = 15792, upload-time = "2025-02-03T07:30:13.6Z" },
]
[[package]]
name = "alembic"
version = "1.17.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "mako" },
{ name = "sqlalchemy" },
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/02/a6/74c8cadc2882977d80ad756a13857857dbcf9bd405bc80b662eb10651282/alembic-1.17.2.tar.gz", hash = "sha256:bbe9751705c5e0f14877f02d46c53d10885e377e3d90eda810a016f9baa19e8e", size = 1988064, upload-time = "2025-11-14T20:35:04.057Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ba/88/6237e97e3385b57b5f1528647addea5cc03d4d65d5979ab24327d41fb00d/alembic-1.17.2-py3-none-any.whl", hash = "sha256:f483dd1fe93f6c5d49217055e4d15b905b425b6af906746abb35b69c1996c4e6", size = 248554, upload-time = "2025-11-14T20:35:05.699Z" },
]
[[package]]
name = "alpine-bits-python-server"
version = "0.1.2"
source = { editable = "." }
dependencies = [
{ name = "aiosqlite" },
{ name = "alembic" },
{ name = "annotatedyaml" },
{ name = "asyncpg" },
{ name = "dotenv" },
@@ -51,6 +66,7 @@ dev = [
[package.metadata]
requires-dist = [
{ name = "aiosqlite", specifier = ">=0.21.0" },
{ name = "alembic", specifier = ">=1.17.2" },
{ name = "annotatedyaml", specifier = ">=1.0.0" },
{ name = "asyncpg", specifier = ">=0.30.0" },
{ name = "dotenv", specifier = ">=0.9.9" },
@@ -585,6 +601,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/0a/44/9613f300201b8700215856e5edd056d4e58dd23368699196b58877d4408b/lxml-6.0.1-cp314-cp314-win_arm64.whl", hash = "sha256:2834377b0145a471a654d699bdb3a2155312de492142ef5a1d426af2c60a0a31", size = 3753901, upload-time = "2025-08-22T10:34:45.799Z" },
]
[[package]]
name = "mako"
version = "1.3.10"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "markupsafe" },
]
sdist = { url = "https://files.pythonhosted.org/packages/9e/38/bd5b78a920a64d708fe6bc8e0a2c075e1389d53bef8413725c63ba041535/mako-1.3.10.tar.gz", hash = "sha256:99579a6f39583fa7e5630a28c3c1f440e4e97a414b80372649c0ce338da2ea28", size = 392474, upload-time = "2025-04-10T12:44:31.16Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/87/fb/99f81ac72ae23375f22b7afdb7642aba97c00a713c217124420147681a2f/mako-1.3.10-py3-none-any.whl", hash = "sha256:baef24a52fc4fc514a0887ac600f9f1cff3d82c61d4d700a1fa84d597b88db59", size = 78509, upload-time = "2025-04-10T12:50:53.297Z" },
]
[[package]]
name = "markupsafe"
version = "3.0.2"