10 Commits

Author SHA1 Message Date
Jonas Linter
d49f6915c9 Idempotent matching. Hopefully 2025-11-19 20:34:08 +01:00
Jonas Linter
3377a82919 More cleanup 2025-11-19 19:35:54 +01:00
Jonas Linter
ead698dafc Cleanup 2025-11-19 19:35:36 +01:00
Jonas Linter
35c29d80bb Lets see if the matching is finally sensible 2025-11-19 19:35:06 +01:00
Jonas Linter
b523f3b669 Adjusted the migrations so they work on the prod db 2025-11-19 19:10:25 +01:00
Jonas Linter
e5f0aea870 Just some adjustments to conversion service so that the tests work again 2025-11-19 18:58:44 +01:00
Jonas Linter
2b98973d1c On we go. Maybe soon this will be done 2025-11-19 18:40:44 +01:00
Jonas Linter
42b353d241 Finally it works 2025-11-19 17:27:47 +01:00
Jonas Linter
d81ba79256 More refactoring 2025-11-19 16:25:18 +01:00
Jonas Linter
1396135208 Seems to work now 2025-11-19 15:39:33 +01:00
12 changed files with 301940 additions and 958 deletions

59
MIGRATION_FIXES.md Normal file
View File

@@ -0,0 +1,59 @@
# Migration Fixes for Production Database Compatibility
## Problem
The database migrations were failing when run against a production database dump because:
1. **First migration (630b0c367dcb)**: Tried to create an index on `acked_requests` that already existed in the production dump
2. **Third migration (08fe946414d8)**: Tried to add `hashed_customer_id` column to `reservations` without checking if it already existed
3. **Fourth migration (a1b2c3d4e5f6)**: Tried to modify `conversion_guests` table before it was guaranteed to exist
## Solutions Applied
### 1. Migration 630b0c367dcb - Initial Migration
**Change**: Made index creation idempotent by checking if index already exists before creating it
**Impact**: Allows migration to run even if production DB already has the `ix_acked_requests_username` index
### 2. Migration 08fe946414d8 - Add hashed_customer_id to reservations
**Change**: Added check to skip adding the column if it already exists
**Impact**:
- Preserves production data in `reservations` and `hashed_customers` tables
- Makes migration safe to re-run
- Still performs data migration to populate `hashed_customer_id` when needed
### 3. Migration a1b2c3d4e5f6 - Add hashed_customer_id to conversion_guests
**Change**: Added check to verify `conversion_guests` table exists before modifying it
**Impact**: Safely handles the case where table creation in a previous migration succeeded
## Data Preservation
All non-conversion tables are preserved:
-`customers`: 1095 rows preserved
-`reservations`: 1177 rows preserved
-`hashed_customers`: 1095 rows preserved
-`acked_requests`: preserved
Conversion tables are properly recreated:
-`conversions`: created fresh with new schema
-`conversion_rooms`: created fresh with new schema
-`conversion_guests`: created fresh with composite key
## Verification
After running `uv run alembic upgrade head`:
- All migrations apply successfully
- Database is at head revision: `a1b2c3d4e5f6`
- All required columns exist (`conversion_guests.hashed_customer_id`, `reservations.hashed_customer_id`)
- Production data is preserved
## Reset Instructions
If you need to reset and re-run all migrations:
```sql
DELETE FROM alpinebits.alembic_version;
```
Then run:
```bash
uv run alembic upgrade head
```

37
MIGRATION_RESET.md Normal file
View File

@@ -0,0 +1,37 @@
# Migration Reset Instructions
If you need to reset the alembic_version table to start migrations from scratch:
## SQL Command
```sql
-- Connect to your database and run:
DELETE FROM alpinebits.alembic_version;
```
This clears all migration records so that `alembic upgrade head` will run all migrations from the beginning.
## Python One-Liner (if preferred)
```bash
uv run python -c "
import asyncio
from sqlalchemy import text
from alpine_bits_python.config_loader import load_config
from alpine_bits_python.db import get_database_url, get_database_schema
from sqlalchemy.ext.asyncio import create_async_engine
async def reset():
app_config = load_config()
db_url = get_database_url(app_config)
schema = get_database_schema(app_config)
engine = create_async_engine(db_url)
async with engine.begin() as conn:
await conn.execute(text(f'SET search_path TO {schema}'))
await conn.execute(text('DELETE FROM alembic_version'))
print('Cleared alembic_version table')
await engine.dispose()
asyncio.run(reset())
"
```

View File

@@ -23,6 +23,7 @@ def upgrade() -> None:
# Drop existing tables to start with a clean slate
# Drop conversion_rooms first due to foreign key dependency
op.execute("DROP TABLE IF EXISTS conversion_rooms CASCADE")
op.execute("DROP TABLE IF EXISTS conversion_guests CASCADE")
op.execute("DROP TABLE IF EXISTS conversions CASCADE")
print("dropped existing conversion tables")
@@ -177,6 +178,15 @@ def upgrade() -> None:
["room_number"],
unique=False,
)
# Create index on acked_requests if it doesn't exist
connection = op.get_bind()
inspector = sa.inspect(connection)
# Get existing indices on acked_requests
acked_requests_indices = [idx['name'] for idx in inspector.get_indexes('acked_requests')]
# Only create index if it doesn't exist
if "ix_acked_requests_username" not in acked_requests_indices:
op.create_index(
op.f("ix_acked_requests_username"), "acked_requests", ["username"], unique=False
)

View File

@@ -1,66 +0,0 @@
"""Added birth_date, storing revenue as number
Revision ID: b33fd7a2da6c
Revises: 630b0c367dcb
Create Date: 2025-11-18 14:41:17.567595
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'b33fd7a2da6c'
down_revision: Union[str, Sequence[str], None] = '630b0c367dcb'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
# Convert VARCHAR to Double with explicit CAST for PostgreSQL compatibility
# PostgreSQL requires USING clause for type conversion
connection = op.get_bind()
if connection.dialect.name == 'postgresql':
op.execute(
"ALTER TABLE conversion_rooms "
"ALTER COLUMN total_revenue TYPE DOUBLE PRECISION "
"USING total_revenue::DOUBLE PRECISION"
)
else:
# For SQLite and other databases, use standard alter_column
op.alter_column('conversion_rooms', 'total_revenue',
existing_type=sa.VARCHAR(),
type_=sa.Double(),
existing_nullable=True)
op.add_column('conversions', sa.Column('guest_birth_date', sa.Date(), nullable=True))
op.add_column('conversions', sa.Column('guest_id', sa.String(), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('conversions', 'guest_id')
op.drop_column('conversions', 'guest_birth_date')
# Convert Double back to VARCHAR with explicit CAST for PostgreSQL compatibility
connection = op.get_bind()
if connection.dialect.name == 'postgresql':
op.execute(
"ALTER TABLE conversion_rooms "
"ALTER COLUMN total_revenue TYPE VARCHAR "
"USING total_revenue::VARCHAR"
)
else:
# For SQLite and other databases, use standard alter_column
op.alter_column('conversion_rooms', 'total_revenue',
existing_type=sa.Double(),
type_=sa.VARCHAR(),
existing_nullable=True)
# ### end Alembic commands ###

View File

@@ -0,0 +1,284 @@
"""Update conversions schema with new attribution fields and composite key for guests.
Revision ID: a2b3c4d5e6f7
Revises: 630b0c367dcb
Create Date: 2025-11-19 00:00:00.000000
"""
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "a2b3c4d5e6f7"
down_revision: str | Sequence[str] | None = "630b0c367dcb"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
"""Upgrade schema."""
# Drop existing conversion tables to migrate to new schema
# Drop conversion_rooms first due to foreign key dependency
op.execute("DROP TABLE IF EXISTS conversion_rooms CASCADE")
op.execute("DROP TABLE IF EXISTS conversions CASCADE")
op.execute("DROP TABLE IF EXISTS conversion_guests CASCADE")
# Create conversion_guests table with composite primary key (hotel_id, guest_id)
op.create_table(
"conversion_guests",
sa.Column("hotel_id", sa.String(), nullable=False, primary_key=True),
sa.Column("guest_id", sa.String(), nullable=False, primary_key=True),
sa.Column("guest_first_name", sa.String(), nullable=True),
sa.Column("guest_last_name", sa.String(), nullable=True),
sa.Column("guest_email", sa.String(), nullable=True),
sa.Column("guest_country_code", sa.String(), nullable=True),
sa.Column("guest_birth_date", sa.Date(), nullable=True),
sa.Column("hashed_first_name", sa.String(64), nullable=True),
sa.Column("hashed_last_name", sa.String(64), nullable=True),
sa.Column("hashed_email", sa.String(64), nullable=True),
sa.Column("hashed_country_code", sa.String(64), nullable=True),
sa.Column("hashed_birth_date", sa.String(64), nullable=True),
sa.Column("is_regular", sa.Boolean(), default=False, nullable=False),
sa.Column("first_seen", sa.DateTime(timezone=True), nullable=True),
sa.Column("last_seen", sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint("hotel_id", "guest_id"),
)
op.create_index(
op.f("ix_conversion_guests_hotel_id"),
"conversion_guests",
["hotel_id"],
unique=False,
)
op.create_index(
op.f("ix_conversion_guests_guest_id"),
"conversion_guests",
["guest_id"],
unique=False,
)
op.create_index(
op.f("ix_conversion_guests_hashed_first_name"),
"conversion_guests",
["hashed_first_name"],
unique=False,
)
op.create_index(
op.f("ix_conversion_guests_hashed_last_name"),
"conversion_guests",
["hashed_last_name"],
unique=False,
)
op.create_index(
op.f("ix_conversion_guests_hashed_email"),
"conversion_guests",
["hashed_email"],
unique=False,
)
# Create conversions table with new schema
op.create_table(
"conversions",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("reservation_id", sa.Integer(), nullable=True),
sa.Column("customer_id", sa.Integer(), nullable=True),
sa.Column("hashed_customer_id", sa.Integer(), nullable=True),
sa.Column("hotel_id", sa.String(), nullable=True),
sa.Column("guest_id", sa.String(), nullable=True),
sa.Column("pms_reservation_id", sa.String(), nullable=True),
sa.Column("reservation_number", sa.String(), nullable=True),
sa.Column("reservation_date", sa.Date(), nullable=True),
sa.Column("creation_time", sa.DateTime(timezone=True), nullable=True),
sa.Column("reservation_type", sa.String(), nullable=True),
sa.Column("booking_channel", sa.String(), nullable=True),
sa.Column("advertising_medium", sa.String(), nullable=True),
sa.Column("advertising_partner", sa.String(), nullable=True),
sa.Column("advertising_campagne", sa.String(), nullable=True),
sa.Column("directly_attributable", sa.Boolean(), default=False, nullable=False),
sa.Column("guest_matched", sa.Boolean(), default=False, nullable=False),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(["reservation_id"], ["reservations.id"]),
sa.ForeignKeyConstraint(["customer_id"], ["customers.id"]),
sa.ForeignKeyConstraint(["hashed_customer_id"], ["hashed_customers.id"]),
sa.ForeignKeyConstraint(
["hotel_id", "guest_id"],
["conversion_guests.hotel_id", "conversion_guests.guest_id"],
ondelete="SET NULL",
),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_conversions_advertising_campagne"),
"conversions",
["advertising_campagne"],
unique=False,
)
op.create_index(
op.f("ix_conversions_advertising_medium"),
"conversions",
["advertising_medium"],
unique=False,
)
op.create_index(
op.f("ix_conversions_advertising_partner"),
"conversions",
["advertising_partner"],
unique=False,
)
op.create_index(
op.f("ix_conversions_customer_id"),
"conversions",
["customer_id"],
unique=False,
)
op.create_index(
op.f("ix_conversions_hashed_customer_id"),
"conversions",
["hashed_customer_id"],
unique=False,
)
op.create_index(
op.f("ix_conversions_hotel_id"),
"conversions",
["hotel_id"],
unique=False,
)
op.create_index(
op.f("ix_conversions_guest_id"),
"conversions",
["guest_id"],
unique=False,
)
op.create_index(
op.f("ix_conversions_pms_reservation_id"),
"conversions",
["pms_reservation_id"],
unique=False,
)
op.create_index(
op.f("ix_conversions_reservation_id"),
"conversions",
["reservation_id"],
unique=False,
)
# Create conversion_rooms table
op.create_table(
"conversion_rooms",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("conversion_id", sa.Integer(), nullable=False),
sa.Column("pms_hotel_reservation_id", sa.String(), nullable=True),
sa.Column("arrival_date", sa.Date(), nullable=True),
sa.Column("departure_date", sa.Date(), nullable=True),
sa.Column("room_status", sa.String(), nullable=True),
sa.Column("room_type", sa.String(), nullable=True),
sa.Column("room_number", sa.String(), nullable=True),
sa.Column("num_adults", sa.Integer(), nullable=True),
sa.Column("rate_plan_code", sa.String(), nullable=True),
sa.Column("connected_room_type", sa.String(), nullable=True),
sa.Column("daily_sales", sa.JSON(), nullable=True),
sa.Column("total_revenue", sa.Double(), nullable=True),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(["conversion_id"], ["conversions.id"]),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_conversion_rooms_arrival_date"),
"conversion_rooms",
["arrival_date"],
unique=False,
)
op.create_index(
op.f("ix_conversion_rooms_conversion_id"),
"conversion_rooms",
["conversion_id"],
unique=False,
)
op.create_index(
op.f("ix_conversion_rooms_departure_date"),
"conversion_rooms",
["departure_date"],
unique=False,
)
op.create_index(
op.f("ix_conversion_rooms_pms_hotel_reservation_id"),
"conversion_rooms",
["pms_hotel_reservation_id"],
unique=False,
)
op.create_index(
op.f("ix_conversion_rooms_room_number"),
"conversion_rooms",
["room_number"],
unique=False,
)
def downgrade() -> None:
"""Downgrade schema."""
op.drop_index(
op.f("ix_conversion_rooms_room_number"), table_name="conversion_rooms"
)
op.drop_index(
op.f("ix_conversion_rooms_pms_hotel_reservation_id"),
table_name="conversion_rooms",
)
op.drop_index(
op.f("ix_conversion_rooms_departure_date"), table_name="conversion_rooms"
)
op.drop_index(
op.f("ix_conversion_rooms_conversion_id"), table_name="conversion_rooms"
)
op.drop_index(
op.f("ix_conversion_rooms_arrival_date"), table_name="conversion_rooms"
)
op.drop_table("conversion_rooms")
op.drop_index(
op.f("ix_conversions_reservation_id"), table_name="conversions"
)
op.drop_index(
op.f("ix_conversions_pms_reservation_id"), table_name="conversions"
)
op.drop_index(
op.f("ix_conversions_guest_id"), table_name="conversions"
)
op.drop_index(
op.f("ix_conversions_hotel_id"), table_name="conversions"
)
op.drop_index(
op.f("ix_conversions_hashed_customer_id"), table_name="conversions"
)
op.drop_index(
op.f("ix_conversions_customer_id"), table_name="conversions"
)
op.drop_index(
op.f("ix_conversions_advertising_partner"), table_name="conversions"
)
op.drop_index(
op.f("ix_conversions_advertising_medium"), table_name="conversions"
)
op.drop_index(
op.f("ix_conversions_advertising_campagne"), table_name="conversions"
)
op.drop_table("conversions")
op.drop_index(
op.f("ix_conversion_guests_hashed_email"), table_name="conversion_guests"
)
op.drop_index(
op.f("ix_conversion_guests_hashed_last_name"), table_name="conversion_guests"
)
op.drop_index(
op.f("ix_conversion_guests_hashed_first_name"), table_name="conversion_guests"
)
op.drop_index(
op.f("ix_conversion_guests_guest_id"), table_name="conversion_guests"
)
op.drop_index(
op.f("ix_conversion_guests_hotel_id"), table_name="conversion_guests"
)
op.drop_table("conversion_guests")

View File

@@ -1,168 +0,0 @@
"""Add ConversionGuest table and link conversions
Revision ID: 70b2579d1d96
Revises: b33fd7a2da6c
Create Date: 2025-11-19 11:56:46.532881
"""
from typing import Sequence, Union
import hashlib
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '70b2579d1d96'
down_revision: Union[str, Sequence[str], None] = 'b33fd7a2da6c'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def normalize_and_hash(value):
"""Normalize and hash a value for ConversionGuest hashed fields."""
if not value:
return None
normalized = str(value).lower().strip()
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('conversion_guests',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('hotel_id', sa.String(), nullable=False),
sa.Column('guest_id', sa.String(), nullable=True),
sa.Column('guest_first_name', sa.String(), nullable=True),
sa.Column('guest_last_name', sa.String(), nullable=True),
sa.Column('guest_email', sa.String(), nullable=True),
sa.Column('guest_country_code', sa.String(), nullable=True),
sa.Column('guest_birth_date', sa.Date(), nullable=True),
sa.Column('hashed_first_name', sa.String(length=64), nullable=True),
sa.Column('hashed_last_name', sa.String(length=64), nullable=True),
sa.Column('hashed_email', sa.String(length=64), nullable=True),
sa.Column('hashed_country_code', sa.String(length=64), nullable=True),
sa.Column('hashed_birth_date', sa.String(length=64), nullable=True),
sa.Column('first_seen', sa.DateTime(timezone=True), nullable=True),
sa.Column('last_seen', sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_conversion_guests_guest_id'), 'conversion_guests', ['guest_id'], unique=False)
op.create_index(op.f('ix_conversion_guests_hashed_email'), 'conversion_guests', ['hashed_email'], unique=False)
op.create_index(op.f('ix_conversion_guests_hashed_first_name'), 'conversion_guests', ['hashed_first_name'], unique=False)
op.create_index(op.f('ix_conversion_guests_hashed_last_name'), 'conversion_guests', ['hashed_last_name'], unique=False)
op.create_index(op.f('ix_conversion_guests_hotel_id'), 'conversion_guests', ['hotel_id'], unique=False)
op.add_column('conversions', sa.Column('conversion_guest_id', sa.Integer(), nullable=True))
op.create_index(op.f('ix_conversions_conversion_guest_id'), 'conversions', ['conversion_guest_id'], unique=False)
op.create_foreign_key(None, 'conversions', 'conversion_guests', ['conversion_guest_id'], ['id'])
# ### end Alembic commands ###
# Data migration: Migrate existing conversion guest data to ConversionGuest table
connection = op.get_bind()
# Get all conversions grouped by (hotel_id, guest_id), picking the most recent by creation_time
# For guests with NULL guest_id, group by hotel_id only
result = connection.execute(sa.text("""
SELECT
c.hotel_id,
c.guest_id,
c.guest_first_name,
c.guest_last_name,
c.guest_email,
c.guest_country_code,
c.guest_birth_date,
c.creation_time,
ROW_NUMBER() OVER (
PARTITION BY c.hotel_id, c.guest_id
ORDER BY c.creation_time DESC NULLS LAST
) as rn
FROM conversions c
WHERE c.guest_first_name IS NOT NULL
OR c.guest_last_name IS NOT NULL
OR c.guest_email IS NOT NULL
OR c.guest_country_code IS NOT NULL
OR c.guest_birth_date IS NOT NULL
"""))
conversion_guests = {} # Map of (hotel_id, guest_id) -> guest data
for row in result:
hotel_id = row.hotel_id
guest_id = row.guest_id
# Only process the most recent record for each guest
if row.rn != 1:
continue
key = (hotel_id, guest_id)
if key not in conversion_guests:
conversion_guests[key] = {
'hotel_id': hotel_id,
'guest_id': guest_id,
'guest_first_name': row.guest_first_name,
'guest_last_name': row.guest_last_name,
'guest_email': row.guest_email,
'guest_country_code': row.guest_country_code,
'guest_birth_date': row.guest_birth_date,
'first_seen': row.creation_time,
'last_seen': row.creation_time,
}
# Insert conversion guests
if conversion_guests:
for guest_data in conversion_guests.values():
insert_stmt = sa.text("""
INSERT INTO conversion_guests
(hotel_id, guest_id, guest_first_name, guest_last_name, guest_email,
guest_country_code, guest_birth_date, hashed_first_name, hashed_last_name,
hashed_email, hashed_country_code, hashed_birth_date, first_seen, last_seen)
VALUES
(:hotel_id, :guest_id, :guest_first_name, :guest_last_name, :guest_email,
:guest_country_code, :guest_birth_date, :hashed_first_name, :hashed_last_name,
:hashed_email, :hashed_country_code, :hashed_birth_date, :first_seen, :last_seen)
""")
connection.execute(insert_stmt, {
'hotel_id': guest_data['hotel_id'],
'guest_id': guest_data['guest_id'],
'guest_first_name': guest_data['guest_first_name'],
'guest_last_name': guest_data['guest_last_name'],
'guest_email': guest_data['guest_email'],
'guest_country_code': guest_data['guest_country_code'],
'guest_birth_date': guest_data['guest_birth_date'],
'hashed_first_name': normalize_and_hash(guest_data['guest_first_name']),
'hashed_last_name': normalize_and_hash(guest_data['guest_last_name']),
'hashed_email': normalize_and_hash(guest_data['guest_email']),
'hashed_country_code': normalize_and_hash(guest_data['guest_country_code']),
'hashed_birth_date': normalize_and_hash(
guest_data['guest_birth_date'].isoformat() if guest_data['guest_birth_date'] else None
),
'first_seen': guest_data['first_seen'],
'last_seen': guest_data['last_seen'],
})
# Link conversions to conversion_guests based on (hotel_id, guest_id)
update_stmt = sa.text("""
UPDATE conversions c
SET conversion_guest_id = cg.id
FROM conversion_guests cg
WHERE c.hotel_id = cg.hotel_id
AND c.guest_id IS NOT DISTINCT FROM cg.guest_id
""")
connection.execute(update_stmt)
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(None, 'conversions', type_='foreignkey')
op.drop_index(op.f('ix_conversions_conversion_guest_id'), table_name='conversions')
op.drop_column('conversions', 'conversion_guest_id')
op.drop_index(op.f('ix_conversion_guests_hotel_id'), table_name='conversion_guests')
op.drop_index(op.f('ix_conversion_guests_hashed_last_name'), table_name='conversion_guests')
op.drop_index(op.f('ix_conversion_guests_hashed_first_name'), table_name='conversion_guests')
op.drop_index(op.f('ix_conversion_guests_hashed_email'), table_name='conversion_guests')
op.drop_index(op.f('ix_conversion_guests_guest_id'), table_name='conversion_guests')
op.drop_table('conversion_guests')
# ### end Alembic commands ###

View File

@@ -13,13 +13,19 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '08fe946414d8'
down_revision: Union[str, Sequence[str], None] = '70b2579d1d96'
down_revision: Union[str, Sequence[str], None] = 'a2b3c4d5e6f7'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
connection = op.get_bind()
# Check if hashed_customer_id column already exists in reservations
inspector = sa.inspect(connection)
reservations_columns = [col['name'] for col in inspector.get_columns('reservations')]
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('hashed_customers', 'customer_id',
existing_type=sa.INTEGER(),
@@ -29,14 +35,13 @@ def upgrade() -> None:
op.drop_constraint(op.f('reservations_customer_id_fkey'), 'reservations', type_='foreignkey')
op.create_foreign_key(None, 'reservations', 'customers', ['customer_id'], ['id'], ondelete='SET NULL')
# Add hashed_customer_id column to reservations with cascade delete
# Add hashed_customer_id column to reservations if it doesn't exist
if 'hashed_customer_id' not in reservations_columns:
op.add_column('reservations', sa.Column('hashed_customer_id', sa.Integer(), nullable=True))
op.create_index(op.f('ix_reservations_hashed_customer_id'), 'reservations', ['hashed_customer_id'], unique=False)
op.create_foreign_key(None, 'reservations', 'hashed_customers', ['hashed_customer_id'], ['id'], ondelete='CASCADE')
# ### end Alembic commands ###
# Data migration: Populate hashed_customer_id from customer relationship
connection = op.get_bind()
update_stmt = sa.text("""
UPDATE reservations r
SET hashed_customer_id = hc.id
@@ -45,6 +50,7 @@ def upgrade() -> None:
AND hc.customer_id IS NOT NULL
""")
connection.execute(update_stmt)
# ### end Alembic commands ###
def downgrade() -> None:

View File

@@ -0,0 +1,45 @@
"""add hashed_customer_id to conversion_guests
Revision ID: a1b2c3d4e5f6
Revises: 08fe946414d8
Create Date: 2025-11-19 18:00:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'a1b2c3d4e5f6'
down_revision: Union[str, Sequence[str], None] = '08fe946414d8'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
connection = op.get_bind()
inspector = sa.inspect(connection)
# Check if conversion_guests table and hashed_customer_id column exist
tables = inspector.get_table_names()
# Only proceed if conversion_guests table exists
if 'conversion_guests' in tables:
conversion_guests_columns = [col['name'] for col in inspector.get_columns('conversion_guests')]
# Add hashed_customer_id column if it doesn't exist
if 'hashed_customer_id' not in conversion_guests_columns:
op.add_column('conversion_guests', sa.Column('hashed_customer_id', sa.Integer(), nullable=True))
op.create_index(op.f('ix_conversion_guests_hashed_customer_id'), 'conversion_guests', ['hashed_customer_id'], unique=False)
op.create_foreign_key(None, 'conversion_guests', 'hashed_customers', ['hashed_customer_id'], ['id'], ondelete='SET NULL')
def downgrade() -> None:
"""Downgrade schema."""
# Drop the hashed_customer_id column and its constraints
op.drop_constraint(None, 'conversion_guests', type_='foreignkey')
op.drop_index(op.f('ix_conversion_guests_hashed_customer_id'), table_name='conversion_guests')
op.drop_column('conversion_guests', 'hashed_customer_id')

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

View File

@@ -12,6 +12,7 @@ from sqlalchemy import (
DateTime,
Double,
ForeignKey,
ForeignKeyConstraint,
Integer,
String,
)
@@ -368,19 +369,17 @@ class ConversionGuest(Base):
"""Guest information from hotel PMS conversions, with hashed fields for privacy.
Stores both unhashed (for reference during transition) and hashed (SHA256 per Meta API)
versions of guest PII. Multiple conversions can reference the same guest if they have
the same hotel_id and guest_id (PMS guest identifier).
versions of guest PII. Uses composite primary key (hotel_id, guest_id) from the PMS.
When multiple conversions for the same guest arrive with different guest info,
the most recent (by creation_time) data is kept as the canonical version.
the most recent (by last_seen) data is kept as the canonical version.
"""
__tablename__ = "conversion_guests"
id = Column(Integer, primary_key=True)
# Natural keys from PMS (composite unique constraint)
hotel_id = Column(String, nullable=False, index=True)
guest_id = Column(String, index=True) # PMS guest ID (nullable for unidentified guests)
# Natural keys from PMS - composite primary key
hotel_id = Column(String, nullable=False, primary_key=True, index=True)
guest_id = Column(String, nullable=False, primary_key=True, index=True)
# Unhashed guest information (for reference/transition period)
guest_first_name = Column(String)
@@ -396,12 +395,19 @@ class ConversionGuest(Base):
hashed_country_code = Column(String(64))
hashed_birth_date = Column(String(64))
# Matched customer reference (nullable, filled after matching)
hashed_customer_id = Column(Integer, ForeignKey("hashed_customers.id"), nullable=True, index=True)
# Guest classification
is_regular = Column(Boolean, default=False) # True if guest has many prior stays before appearing in our reservations
# Metadata
first_seen = Column(DateTime(timezone=True))
last_seen = Column(DateTime(timezone=True))
# Relationships
conversions = relationship("Conversion", back_populates="guest")
hashed_customer = relationship("HashedCustomer", backref="conversion_guests")
@staticmethod
def _normalize_and_hash(value):
@@ -424,6 +430,7 @@ class ConversionGuest(Base):
guest_country_code: str | None,
guest_birth_date: Date | None,
now: DateTime,
is_regular: bool = False,
):
"""Create a ConversionGuest from conversion guest data."""
return cls(
@@ -441,6 +448,7 @@ class ConversionGuest(Base):
hashed_birth_date=cls._normalize_and_hash(
guest_birth_date.isoformat() if guest_birth_date else None
),
is_regular=is_regular,
first_seen=now,
last_seen=now,
)
@@ -544,6 +552,12 @@ class Conversion(Base):
The tracking data transferered by the PMS is however somewhat shorter.
We therefore also need to match on guest name/email and other metadata.
Attribution flags:
- directly_attributable: True if matched by ID (reservation_id is set), meaning
this conversion is directly responsible for this reservation
- guest_matched: True if matched only by guest details (customer_id/hashed_customer_id set),
meaning the same person made this request but the reservation may not be directly attributable
"""
__tablename__ = "conversions"
@@ -557,12 +571,10 @@ class Conversion(Base):
hashed_customer_id = Column(
Integer, ForeignKey("hashed_customers.id"), nullable=True, index=True
)
conversion_guest_id = Column(
Integer, ForeignKey("conversion_guests.id"), nullable=True, index=True
)
# Reservation metadata from XML
hotel_id = Column(String, index=True) # hotelID attribute
guest_id = Column(String, nullable=True, index=True) # PMS guest ID, FK to conversion_guests
pms_reservation_id = Column(String, index=True) # id attribute from reservation
reservation_number = Column(String) # number attribute
reservation_date = Column(Date) # date attribute (when reservation was made)
@@ -570,13 +582,8 @@ class Conversion(Base):
reservation_type = Column(String) # type attribute (e.g., "reservation")
booking_channel = Column(String) # bookingChannel attribute
# Guest information from reservation XML - used for matching
guest_first_name = Column(String, index=True) # firstName from guest element
guest_last_name = Column(String, index=True) # lastName from guest element
guest_email = Column(String, index=True) # email from guest element
guest_country_code = Column(String) # countryCode from guest element
guest_birth_date = Column(Date) # birthDate from guest element
guest_id = Column(String) # id from guest element
# Advertising/tracking data - used for matching to existing reservations
advertising_medium = Column(
@@ -589,10 +596,23 @@ class Conversion(Base):
String, index=True
) # advertisingCampagne (contains fbclid/gclid)
# Attribution flags - track how this conversion was matched
directly_attributable = Column(Boolean, default=False) # Matched by ID (high confidence)
guest_matched = Column(Boolean, default=False) # Matched by guest details only
# Metadata
created_at = Column(DateTime(timezone=True)) # When this record was imported
updated_at = Column(DateTime(timezone=True)) # When this record was last updated
# Composite foreign key constraint for ConversionGuest (hotel_id, guest_id)
__table_args__ = (
ForeignKeyConstraint(
["hotel_id", "guest_id"],
["conversion_guests.hotel_id", "conversion_guests.guest_id"],
ondelete="SET NULL",
),
)
# Relationships
reservation = relationship("Reservation", backref="conversions")
customer = relationship("Customer", backref="conversions")

View File

@@ -159,8 +159,13 @@ class TestConversionServiceWithImportedData:
EXPECTED_TOTAL_ROOMS = 539
# Note: Currently no matches by tracking ID because XML data uses different formats
# This is expected with the test data. Real PMS data would have higher match rates.
# With the refactored Phase 3b/3c matching logic, we now properly link guest-matched
# conversions to reservations when dates match, so we get 19 matched to reservation
# instead of just matched to customer.
EXPECTED_MATCHED_TO_RESERVATION = 19
EXPECTED_MATCHED_TO_CUSTOMER = 0
print(f"\nBaseline Match Counts:")
print(f" Total reservations in XML: {EXPECTED_TOTAL_RESERVATIONS}")
print(f" Total daily sales records: {EXPECTED_TOTAL_DAILY_SALES}")
@@ -168,6 +173,8 @@ class TestConversionServiceWithImportedData:
print(f" Matched to reservation: {EXPECTED_MATCHED_TO_RESERVATION}")
match_rate = (EXPECTED_MATCHED_TO_RESERVATION / EXPECTED_TOTAL_RESERVATIONS * 100) if EXPECTED_TOTAL_RESERVATIONS > 0 else 0
print(f" Match rate: {match_rate:.1f}%")
print(f" Matched to customer: {EXPECTED_MATCHED_TO_CUSTOMER}")
print(f" Match rate (to customer): {(EXPECTED_MATCHED_TO_CUSTOMER / EXPECTED_TOTAL_RESERVATIONS * 100) if EXPECTED_TOTAL_RESERVATIONS > 0 else 0:.1f}%")
# Verify baseline stability on subsequent runs
assert (
@@ -180,6 +187,10 @@ class TestConversionServiceWithImportedData:
stats["matched_to_reservation"] == EXPECTED_MATCHED_TO_RESERVATION
), f"Matched reservations should be {EXPECTED_MATCHED_TO_RESERVATION}, got {stats['matched_to_reservation']}"
assert (
stats["matched_to_customer"] == EXPECTED_MATCHED_TO_CUSTOMER
), f"Matched customers should be {EXPECTED_MATCHED_TO_CUSTOMER}, got {stats['matched_to_customer']}"
@pytest.mark.asyncio
async def test_conversion_room_revenue_aggregation(
self, test_db_session, test_config, test_data_dir
@@ -334,53 +345,6 @@ class TestHashedMatchingLogic:
"""Test the hashed matching logic used in ConversionService."""
@pytest.mark.asyncio
async def test_no_match_without_hashed_customer(self, test_db_session):
"""Test that matching fails gracefully when customer has no hashed version."""
# Create a customer WITHOUT hashed data
customer = Customer(
given_name="Bob",
surname="Jones",
email_address="bob@example.com",
contact_id="test_contact_3",
)
test_db_session.add(customer)
await test_db_session.commit()
# Create a reservation
reservation = Reservation(
customer_id=customer.id,
unique_id="res_3",
hotel_code="test_hotel",
)
test_db_session.add(reservation)
await test_db_session.commit()
# Test the matching logic
service = ConversionService(test_db_session)
# Eagerly load reservations
from sqlalchemy.orm import selectinload
result = await test_db_session.execute(
select(Reservation)
.where(Reservation.id == reservation.id)
.options(selectinload(Reservation.customer).selectinload(Customer.hashed_version))
)
reservations = result.scalars().all()
hashed_email = hashlib.sha256(
"bob@example.com".lower().strip().encode("utf-8")
).hexdigest()
matched = service._match_reservations_by_guest_details(
reservations,
guest_first_name=None,
guest_last_name=None,
guest_email=hashed_email,
)
# Should not match because customer has no hashed version
assert matched is None, "Should not match without hashed customer"
@pytest.mark.asyncio
@@ -454,7 +418,7 @@ class TestHashedMatchingLogic:
xml_content = f"""<?xml version="1.0"?>
<root>
<reservation id="pms_123" hotelID="hotel_1" number="RES001" date="2025-01-15">
<guest firstName="David" lastName="Miller" email="david@example.com"/>
<guest id="guest_001" firstName="David" lastName="Miller" email="david@example.com"/>
<roomReservations>
<roomReservation roomNumber="101" arrival="2025-01-15" departure="2025-01-17" status="confirmed">
<dailySales>
@@ -476,12 +440,20 @@ class TestHashedMatchingLogic:
assert conversion is not None, "Conversion should be created"
assert conversion.hotel_id == "hotel_1"
assert conversion.guest_first_name == "David"
assert conversion.guest_last_name == "Miller"
assert conversion.guest_email == "david@example.com"
assert conversion.guest_id is not None, "ConversionGuest should be linked"
# Verify conversion_guest was created
assert conversion.conversion_guest_id is not None, "ConversionGuest should be created"
# Verify conversion_guest was created with the correct data
from sqlalchemy.orm import selectinload
result_with_guest = await test_db_session.execute(
select(Conversion)
.where(Conversion.pms_reservation_id == "pms_123")
.options(selectinload(Conversion.guest))
)
conversion_with_guest = result_with_guest.scalar_one_or_none()
assert conversion_with_guest.guest is not None, "ConversionGuest relationship should exist"
assert conversion_with_guest.guest.guest_first_name == "David"
assert conversion_with_guest.guest.guest_last_name == "Miller"
assert conversion_with_guest.guest.guest_email == "david@example.com"
# Verify conversion_room was created
room_result = await test_db_session.execute(
@@ -497,86 +469,24 @@ class TestHashedMatchingLogic:
assert stats["total_reservations"] == 1
assert stats["total_daily_sales"] == 1
@pytest.mark.asyncio
async def test_hashed_customer_missing_fields_handled_gracefully(
self, test_db_session
):
"""Test that matching handles customers with missing hashed fields gracefully."""
# Create a customer
customer = Customer(
given_name="Eve",
surname="Taylor",
email_address="eve@example.com",
contact_id="test_contact_7",
)
test_db_session.add(customer)
await test_db_session.flush()
# Create hashed customer but simulate missing fields by manually setting to None
hashed_customer = HashedCustomer(
customer_id=customer.id,
contact_id="test_contact_7_hashed",
hashed_email=None, # Simulate missing hashed email
hashed_given_name=None, # Simulate missing hashed name
hashed_surname=None,
)
test_db_session.add(hashed_customer)
await test_db_session.flush()
# Create reservation
reservation = Reservation(
customer_id=customer.id,
unique_id="res_7",
hotel_code="test_hotel",
)
test_db_session.add(reservation)
await test_db_session.commit()
# Test matching - should not crash even with missing hashed fields
service = ConversionService(test_db_session)
# Eagerly load reservations
from sqlalchemy.orm import selectinload
result = await test_db_session.execute(
select(Reservation)
.where(Reservation.id == reservation.id)
.options(selectinload(Reservation.customer).selectinload(Customer.hashed_version))
)
reservations = result.scalars().all()
hashed_email = hashlib.sha256(
"eve@example.com".lower().strip().encode("utf-8")
).hexdigest()
matched = service._match_reservations_by_guest_details(
reservations,
guest_first_name=None,
guest_last_name=None,
guest_email=hashed_email,
)
# Should not match because hashed customer fields are None
assert matched is None, "Should not match with missing hashed fields"
@pytest.mark.asyncio
async def test_duplicate_conversion_guests_with_same_hotel_and_guest_id(
async def test_conversion_guest_composite_key_prevents_duplicates(
self, test_db_session
):
"""Test handling of duplicate ConversionGuest records with same (hotel_id, guest_id).
"""Test that ConversionGuest composite primary key (hotel_id, guest_id) prevents duplicates.
This test reproduces the production issue where multiple ConversionGuest records
can be created with the same (hotel_id, guest_id) combination, causing
scalar_one_or_none() to fail with "Multiple rows were found when one or none was required".
With the new schema, the composite PK ensures that each (hotel_id, guest_id) combination
is unique. This prevents the production issue where multiple ConversionGuest records
could exist for the same guest, which previously caused scalar_one_or_none() to fail.
This can happen when:
- Multiple conversions arrive for the same hotel and PMS guest within the same batch
- The XML is processed multiple times
- Race conditions in concurrent processing
Now the database itself enforces uniqueness at the PK level.
"""
hotel_id = "test_hotel"
guest_id = "guest_123"
# Simulate the production scenario: multiple conversion guests with same (hotel_id, guest_id)
# Create and commit first conversion guest
guest1 = ConversionGuest.create_from_conversion_data(
hotel_id=hotel_id,
guest_id=guest_id,
@@ -588,10 +498,21 @@ class TestHashedMatchingLogic:
now=None,
)
test_db_session.add(guest1)
await test_db_session.flush()
await test_db_session.commit()
# Create a second guest with the SAME (hotel_id, guest_id)
# This should not happen, but can occur in production
# Verify guest was created
result = await test_db_session.execute(
select(ConversionGuest).where(
(ConversionGuest.hotel_id == hotel_id)
& (ConversionGuest.guest_id == guest_id)
)
)
guests = result.scalars().all()
assert len(guests) == 1, "Should have created one guest"
assert guests[0].guest_first_name == "John"
# Now try to create a second guest with the SAME (hotel_id, guest_id)
# With composite PK, this should raise an IntegrityError
guest2 = ConversionGuest.create_from_conversion_data(
hotel_id=hotel_id,
guest_id=guest_id,
@@ -603,42 +524,12 @@ class TestHashedMatchingLogic:
now=None,
)
test_db_session.add(guest2)
# The composite PK constraint prevents the duplicate insert
from sqlalchemy.exc import IntegrityError
with pytest.raises(IntegrityError):
await test_db_session.commit()
# Now try to query for the guest by (hotel_id, guest_id)
# This should return multiple results
result = await test_db_session.execute(
select(ConversionGuest).where(
(ConversionGuest.hotel_id == hotel_id)
& (ConversionGuest.guest_id == guest_id)
)
)
guests = result.scalars().all()
# Verify we have duplicates (the production bug condition)
assert len(guests) == 2, "Should have created duplicate conversion guests"
# Verify that scalars().first() returns one of the guests (the fixed behavior)
result2 = await test_db_session.execute(
select(ConversionGuest).where(
(ConversionGuest.hotel_id == hotel_id)
& (ConversionGuest.guest_id == guest_id)
)
)
first_guest = result2.scalars().first()
assert first_guest is not None, "Should find at least one guest with scalars().first()"
# The old code would have raised an error here with scalar_one_or_none()
# when finding multiple results. Now it's fixed to use .first() instead.
result3 = await test_db_session.execute(
select(ConversionGuest).where(
(ConversionGuest.hotel_id == hotel_id)
& (ConversionGuest.guest_id == guest_id)
)
)
with pytest.raises(Exception): # MultipleResultsFound from old code path
result3.scalar_one_or_none()
if __name__ == "__main__":
pytest.main([__file__, "-v"])