Hashed conversion matching and more. #12

Merged
jonas merged 13 commits from hashed_conversion_matching into main 2025-11-19 19:40:07 +00:00
9 changed files with 301001 additions and 340 deletions
Showing only changes of commit d81ba79256 - Show all commits

View File

@@ -23,6 +23,7 @@ def upgrade() -> None:
# Drop existing tables to start with a clean slate
# Drop conversion_rooms first due to foreign key dependency
op.execute("DROP TABLE IF EXISTS conversion_rooms CASCADE")
op.execute("DROP TABLE IF EXISTS conversion_guests CASCADE")
op.execute("DROP TABLE IF EXISTS conversions CASCADE")
print("dropped existing conversion tables")

View File

@@ -1,66 +0,0 @@
"""Added birth_date, storing revenue as number
Revision ID: b33fd7a2da6c
Revises: 630b0c367dcb
Create Date: 2025-11-18 14:41:17.567595
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'b33fd7a2da6c'
down_revision: Union[str, Sequence[str], None] = '630b0c367dcb'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
# Convert VARCHAR to Double with explicit CAST for PostgreSQL compatibility
# PostgreSQL requires USING clause for type conversion
connection = op.get_bind()
if connection.dialect.name == 'postgresql':
op.execute(
"ALTER TABLE conversion_rooms "
"ALTER COLUMN total_revenue TYPE DOUBLE PRECISION "
"USING total_revenue::DOUBLE PRECISION"
)
else:
# For SQLite and other databases, use standard alter_column
op.alter_column('conversion_rooms', 'total_revenue',
existing_type=sa.VARCHAR(),
type_=sa.Double(),
existing_nullable=True)
op.add_column('conversions', sa.Column('guest_birth_date', sa.Date(), nullable=True))
op.add_column('conversions', sa.Column('guest_id', sa.String(), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('conversions', 'guest_id')
op.drop_column('conversions', 'guest_birth_date')
# Convert Double back to VARCHAR with explicit CAST for PostgreSQL compatibility
connection = op.get_bind()
if connection.dialect.name == 'postgresql':
op.execute(
"ALTER TABLE conversion_rooms "
"ALTER COLUMN total_revenue TYPE VARCHAR "
"USING total_revenue::VARCHAR"
)
else:
# For SQLite and other databases, use standard alter_column
op.alter_column('conversion_rooms', 'total_revenue',
existing_type=sa.Double(),
type_=sa.VARCHAR(),
existing_nullable=True)
# ### end Alembic commands ###

View File

@@ -0,0 +1,284 @@
"""Update conversions schema with new attribution fields and composite key for guests.
Revision ID: a2b3c4d5e6f7
Revises: 630b0c367dcb
Create Date: 2025-11-19 00:00:00.000000
"""
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "a2b3c4d5e6f7"
down_revision: str | Sequence[str] | None = "630b0c367dcb"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
"""Upgrade schema."""
# Drop existing conversion tables to migrate to new schema
# Drop conversion_rooms first due to foreign key dependency
op.execute("DROP TABLE IF EXISTS conversion_rooms CASCADE")
op.execute("DROP TABLE IF EXISTS conversions CASCADE")
op.execute("DROP TABLE IF EXISTS conversion_guests CASCADE")
# Create conversion_guests table with composite primary key (hotel_id, guest_id)
op.create_table(
"conversion_guests",
sa.Column("hotel_id", sa.String(), nullable=False, primary_key=True),
sa.Column("guest_id", sa.String(), nullable=False, primary_key=True),
sa.Column("guest_first_name", sa.String(), nullable=True),
sa.Column("guest_last_name", sa.String(), nullable=True),
sa.Column("guest_email", sa.String(), nullable=True),
sa.Column("guest_country_code", sa.String(), nullable=True),
sa.Column("guest_birth_date", sa.Date(), nullable=True),
sa.Column("hashed_first_name", sa.String(64), nullable=True),
sa.Column("hashed_last_name", sa.String(64), nullable=True),
sa.Column("hashed_email", sa.String(64), nullable=True),
sa.Column("hashed_country_code", sa.String(64), nullable=True),
sa.Column("hashed_birth_date", sa.String(64), nullable=True),
sa.Column("is_regular", sa.Boolean(), default=False, nullable=False),
sa.Column("first_seen", sa.DateTime(timezone=True), nullable=True),
sa.Column("last_seen", sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint("hotel_id", "guest_id"),
)
op.create_index(
op.f("ix_conversion_guests_hotel_id"),
"conversion_guests",
["hotel_id"],
unique=False,
)
op.create_index(
op.f("ix_conversion_guests_guest_id"),
"conversion_guests",
["guest_id"],
unique=False,
)
op.create_index(
op.f("ix_conversion_guests_hashed_first_name"),
"conversion_guests",
["hashed_first_name"],
unique=False,
)
op.create_index(
op.f("ix_conversion_guests_hashed_last_name"),
"conversion_guests",
["hashed_last_name"],
unique=False,
)
op.create_index(
op.f("ix_conversion_guests_hashed_email"),
"conversion_guests",
["hashed_email"],
unique=False,
)
# Create conversions table with new schema
op.create_table(
"conversions",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("reservation_id", sa.Integer(), nullable=True),
sa.Column("customer_id", sa.Integer(), nullable=True),
sa.Column("hashed_customer_id", sa.Integer(), nullable=True),
sa.Column("hotel_id", sa.String(), nullable=True),
sa.Column("guest_id", sa.String(), nullable=True),
sa.Column("pms_reservation_id", sa.String(), nullable=True),
sa.Column("reservation_number", sa.String(), nullable=True),
sa.Column("reservation_date", sa.Date(), nullable=True),
sa.Column("creation_time", sa.DateTime(timezone=True), nullable=True),
sa.Column("reservation_type", sa.String(), nullable=True),
sa.Column("booking_channel", sa.String(), nullable=True),
sa.Column("advertising_medium", sa.String(), nullable=True),
sa.Column("advertising_partner", sa.String(), nullable=True),
sa.Column("advertising_campagne", sa.String(), nullable=True),
sa.Column("directly_attributable", sa.Boolean(), default=False, nullable=False),
sa.Column("guest_matched", sa.Boolean(), default=False, nullable=False),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(["reservation_id"], ["reservations.id"]),
sa.ForeignKeyConstraint(["customer_id"], ["customers.id"]),
sa.ForeignKeyConstraint(["hashed_customer_id"], ["hashed_customers.id"]),
sa.ForeignKeyConstraint(
["hotel_id", "guest_id"],
["conversion_guests.hotel_id", "conversion_guests.guest_id"],
ondelete="SET NULL",
),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_conversions_advertising_campagne"),
"conversions",
["advertising_campagne"],
unique=False,
)
op.create_index(
op.f("ix_conversions_advertising_medium"),
"conversions",
["advertising_medium"],
unique=False,
)
op.create_index(
op.f("ix_conversions_advertising_partner"),
"conversions",
["advertising_partner"],
unique=False,
)
op.create_index(
op.f("ix_conversions_customer_id"),
"conversions",
["customer_id"],
unique=False,
)
op.create_index(
op.f("ix_conversions_hashed_customer_id"),
"conversions",
["hashed_customer_id"],
unique=False,
)
op.create_index(
op.f("ix_conversions_hotel_id"),
"conversions",
["hotel_id"],
unique=False,
)
op.create_index(
op.f("ix_conversions_guest_id"),
"conversions",
["guest_id"],
unique=False,
)
op.create_index(
op.f("ix_conversions_pms_reservation_id"),
"conversions",
["pms_reservation_id"],
unique=False,
)
op.create_index(
op.f("ix_conversions_reservation_id"),
"conversions",
["reservation_id"],
unique=False,
)
# Create conversion_rooms table
op.create_table(
"conversion_rooms",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("conversion_id", sa.Integer(), nullable=False),
sa.Column("pms_hotel_reservation_id", sa.String(), nullable=True),
sa.Column("arrival_date", sa.Date(), nullable=True),
sa.Column("departure_date", sa.Date(), nullable=True),
sa.Column("room_status", sa.String(), nullable=True),
sa.Column("room_type", sa.String(), nullable=True),
sa.Column("room_number", sa.String(), nullable=True),
sa.Column("num_adults", sa.Integer(), nullable=True),
sa.Column("rate_plan_code", sa.String(), nullable=True),
sa.Column("connected_room_type", sa.String(), nullable=True),
sa.Column("daily_sales", sa.JSON(), nullable=True),
sa.Column("total_revenue", sa.Double(), nullable=True),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(["conversion_id"], ["conversions.id"]),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_conversion_rooms_arrival_date"),
"conversion_rooms",
["arrival_date"],
unique=False,
)
op.create_index(
op.f("ix_conversion_rooms_conversion_id"),
"conversion_rooms",
["conversion_id"],
unique=False,
)
op.create_index(
op.f("ix_conversion_rooms_departure_date"),
"conversion_rooms",
["departure_date"],
unique=False,
)
op.create_index(
op.f("ix_conversion_rooms_pms_hotel_reservation_id"),
"conversion_rooms",
["pms_hotel_reservation_id"],
unique=False,
)
op.create_index(
op.f("ix_conversion_rooms_room_number"),
"conversion_rooms",
["room_number"],
unique=False,
)
def downgrade() -> None:
"""Downgrade schema."""
op.drop_index(
op.f("ix_conversion_rooms_room_number"), table_name="conversion_rooms"
)
op.drop_index(
op.f("ix_conversion_rooms_pms_hotel_reservation_id"),
table_name="conversion_rooms",
)
op.drop_index(
op.f("ix_conversion_rooms_departure_date"), table_name="conversion_rooms"
)
op.drop_index(
op.f("ix_conversion_rooms_conversion_id"), table_name="conversion_rooms"
)
op.drop_index(
op.f("ix_conversion_rooms_arrival_date"), table_name="conversion_rooms"
)
op.drop_table("conversion_rooms")
op.drop_index(
op.f("ix_conversions_reservation_id"), table_name="conversions"
)
op.drop_index(
op.f("ix_conversions_pms_reservation_id"), table_name="conversions"
)
op.drop_index(
op.f("ix_conversions_guest_id"), table_name="conversions"
)
op.drop_index(
op.f("ix_conversions_hotel_id"), table_name="conversions"
)
op.drop_index(
op.f("ix_conversions_hashed_customer_id"), table_name="conversions"
)
op.drop_index(
op.f("ix_conversions_customer_id"), table_name="conversions"
)
op.drop_index(
op.f("ix_conversions_advertising_partner"), table_name="conversions"
)
op.drop_index(
op.f("ix_conversions_advertising_medium"), table_name="conversions"
)
op.drop_index(
op.f("ix_conversions_advertising_campagne"), table_name="conversions"
)
op.drop_table("conversions")
op.drop_index(
op.f("ix_conversion_guests_hashed_email"), table_name="conversion_guests"
)
op.drop_index(
op.f("ix_conversion_guests_hashed_last_name"), table_name="conversion_guests"
)
op.drop_index(
op.f("ix_conversion_guests_hashed_first_name"), table_name="conversion_guests"
)
op.drop_index(
op.f("ix_conversion_guests_guest_id"), table_name="conversion_guests"
)
op.drop_index(
op.f("ix_conversion_guests_hotel_id"), table_name="conversion_guests"
)
op.drop_table("conversion_guests")

View File

@@ -1,168 +0,0 @@
"""Add ConversionGuest table and link conversions
Revision ID: 70b2579d1d96
Revises: b33fd7a2da6c
Create Date: 2025-11-19 11:56:46.532881
"""
from typing import Sequence, Union
import hashlib
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '70b2579d1d96'
down_revision: Union[str, Sequence[str], None] = 'b33fd7a2da6c'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def normalize_and_hash(value):
"""Normalize and hash a value for ConversionGuest hashed fields."""
if not value:
return None
normalized = str(value).lower().strip()
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('conversion_guests',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('hotel_id', sa.String(), nullable=False),
sa.Column('guest_id', sa.String(), nullable=True),
sa.Column('guest_first_name', sa.String(), nullable=True),
sa.Column('guest_last_name', sa.String(), nullable=True),
sa.Column('guest_email', sa.String(), nullable=True),
sa.Column('guest_country_code', sa.String(), nullable=True),
sa.Column('guest_birth_date', sa.Date(), nullable=True),
sa.Column('hashed_first_name', sa.String(length=64), nullable=True),
sa.Column('hashed_last_name', sa.String(length=64), nullable=True),
sa.Column('hashed_email', sa.String(length=64), nullable=True),
sa.Column('hashed_country_code', sa.String(length=64), nullable=True),
sa.Column('hashed_birth_date', sa.String(length=64), nullable=True),
sa.Column('first_seen', sa.DateTime(timezone=True), nullable=True),
sa.Column('last_seen', sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_conversion_guests_guest_id'), 'conversion_guests', ['guest_id'], unique=False)
op.create_index(op.f('ix_conversion_guests_hashed_email'), 'conversion_guests', ['hashed_email'], unique=False)
op.create_index(op.f('ix_conversion_guests_hashed_first_name'), 'conversion_guests', ['hashed_first_name'], unique=False)
op.create_index(op.f('ix_conversion_guests_hashed_last_name'), 'conversion_guests', ['hashed_last_name'], unique=False)
op.create_index(op.f('ix_conversion_guests_hotel_id'), 'conversion_guests', ['hotel_id'], unique=False)
op.add_column('conversions', sa.Column('conversion_guest_id', sa.Integer(), nullable=True))
op.create_index(op.f('ix_conversions_conversion_guest_id'), 'conversions', ['conversion_guest_id'], unique=False)
op.create_foreign_key(None, 'conversions', 'conversion_guests', ['conversion_guest_id'], ['id'])
# ### end Alembic commands ###
# Data migration: Migrate existing conversion guest data to ConversionGuest table
connection = op.get_bind()
# Get all conversions grouped by (hotel_id, guest_id), picking the most recent by creation_time
# For guests with NULL guest_id, group by hotel_id only
result = connection.execute(sa.text("""
SELECT
c.hotel_id,
c.guest_id,
c.guest_first_name,
c.guest_last_name,
c.guest_email,
c.guest_country_code,
c.guest_birth_date,
c.creation_time,
ROW_NUMBER() OVER (
PARTITION BY c.hotel_id, c.guest_id
ORDER BY c.creation_time DESC NULLS LAST
) as rn
FROM conversions c
WHERE c.guest_first_name IS NOT NULL
OR c.guest_last_name IS NOT NULL
OR c.guest_email IS NOT NULL
OR c.guest_country_code IS NOT NULL
OR c.guest_birth_date IS NOT NULL
"""))
conversion_guests = {} # Map of (hotel_id, guest_id) -> guest data
for row in result:
hotel_id = row.hotel_id
guest_id = row.guest_id
# Only process the most recent record for each guest
if row.rn != 1:
continue
key = (hotel_id, guest_id)
if key not in conversion_guests:
conversion_guests[key] = {
'hotel_id': hotel_id,
'guest_id': guest_id,
'guest_first_name': row.guest_first_name,
'guest_last_name': row.guest_last_name,
'guest_email': row.guest_email,
'guest_country_code': row.guest_country_code,
'guest_birth_date': row.guest_birth_date,
'first_seen': row.creation_time,
'last_seen': row.creation_time,
}
# Insert conversion guests
if conversion_guests:
for guest_data in conversion_guests.values():
insert_stmt = sa.text("""
INSERT INTO conversion_guests
(hotel_id, guest_id, guest_first_name, guest_last_name, guest_email,
guest_country_code, guest_birth_date, hashed_first_name, hashed_last_name,
hashed_email, hashed_country_code, hashed_birth_date, first_seen, last_seen)
VALUES
(:hotel_id, :guest_id, :guest_first_name, :guest_last_name, :guest_email,
:guest_country_code, :guest_birth_date, :hashed_first_name, :hashed_last_name,
:hashed_email, :hashed_country_code, :hashed_birth_date, :first_seen, :last_seen)
""")
connection.execute(insert_stmt, {
'hotel_id': guest_data['hotel_id'],
'guest_id': guest_data['guest_id'],
'guest_first_name': guest_data['guest_first_name'],
'guest_last_name': guest_data['guest_last_name'],
'guest_email': guest_data['guest_email'],
'guest_country_code': guest_data['guest_country_code'],
'guest_birth_date': guest_data['guest_birth_date'],
'hashed_first_name': normalize_and_hash(guest_data['guest_first_name']),
'hashed_last_name': normalize_and_hash(guest_data['guest_last_name']),
'hashed_email': normalize_and_hash(guest_data['guest_email']),
'hashed_country_code': normalize_and_hash(guest_data['guest_country_code']),
'hashed_birth_date': normalize_and_hash(
guest_data['guest_birth_date'].isoformat() if guest_data['guest_birth_date'] else None
),
'first_seen': guest_data['first_seen'],
'last_seen': guest_data['last_seen'],
})
# Link conversions to conversion_guests based on (hotel_id, guest_id)
update_stmt = sa.text("""
UPDATE conversions c
SET conversion_guest_id = cg.id
FROM conversion_guests cg
WHERE c.hotel_id = cg.hotel_id
AND c.guest_id IS NOT DISTINCT FROM cg.guest_id
""")
connection.execute(update_stmt)
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(None, 'conversions', type_='foreignkey')
op.drop_index(op.f('ix_conversions_conversion_guest_id'), table_name='conversions')
op.drop_column('conversions', 'conversion_guest_id')
op.drop_index(op.f('ix_conversion_guests_hotel_id'), table_name='conversion_guests')
op.drop_index(op.f('ix_conversion_guests_hashed_last_name'), table_name='conversion_guests')
op.drop_index(op.f('ix_conversion_guests_hashed_first_name'), table_name='conversion_guests')
op.drop_index(op.f('ix_conversion_guests_hashed_email'), table_name='conversion_guests')
op.drop_index(op.f('ix_conversion_guests_guest_id'), table_name='conversion_guests')
op.drop_table('conversion_guests')
# ### end Alembic commands ###

View File

@@ -13,7 +13,7 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '08fe946414d8'
down_revision: Union[str, Sequence[str], None] = '70b2579d1d96'
down_revision: Union[str, Sequence[str], None] = 'a2b3c4d5e6f7'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None

File diff suppressed because one or more lines are too long

View File

@@ -558,7 +558,7 @@ class ConversionService:
if existing_conversion:
# Update existing conversion - only update reservation metadata and advertising data
# Don't overwrite guest info (will be handled by matching logic which uses hashed data)
# Guest info is stored in ConversionGuest table, not here
# Don't clear reservation/customer links (matching logic will update if needed)
existing_conversion.reservation_number = reservation_number
existing_conversion.reservation_date = reservation_date
@@ -568,19 +568,6 @@ class ConversionService:
existing_conversion.advertising_medium = advertising_medium
existing_conversion.advertising_partner = advertising_partner
existing_conversion.advertising_campagne = advertising_campagne
# Update guest info only if new data is provided (not None)
if guest_first_name:
existing_conversion.guest_first_name = guest_first_name
if guest_last_name:
existing_conversion.guest_last_name = guest_last_name
if guest_email:
existing_conversion.guest_email = guest_email
if guest_country_code:
existing_conversion.guest_country_code = guest_country_code
if guest_birth_date:
existing_conversion.guest_birth_date = guest_birth_date
existing_conversion.updated_at = datetime.now()
conversion = existing_conversion
_LOGGER.info(
@@ -590,6 +577,7 @@ class ConversionService:
)
else:
# Create new conversion entry (without matching - will be done later)
# Note: Guest information (first_name, last_name, email, etc) is stored in ConversionGuest table
conversion = Conversion(
# Links to existing entities (nullable, will be filled in after matching)
reservation_id=None,
@@ -597,19 +585,13 @@ class ConversionService:
hashed_customer_id=None,
# Reservation metadata
hotel_id=hotel_id,
guest_id=guest_id, # Links to ConversionGuest
pms_reservation_id=pms_reservation_id,
reservation_number=reservation_number,
reservation_date=reservation_date,
creation_time=creation_time,
reservation_type=reservation_type,
booking_channel=booking_channel,
# Guest information
guest_first_name=guest_first_name,
guest_last_name=guest_last_name,
guest_email=guest_email,
guest_country_code=guest_country_code,
guest_birth_date=guest_birth_date,
guest_id=guest_id,
# Advertising data
advertising_medium=advertising_medium,
advertising_partner=advertising_partner,
@@ -628,6 +610,8 @@ class ConversionService:
await session.flush()
# Create or update ConversionGuest and link it to the conversion
# The conversion is linked to ConversionGuest via composite FK (hotel_id, guest_id)
# So we just need to ensure ConversionGuest exists - the FK is already set via hotel_id + guest_id
conversion_guest = await self._get_or_create_conversion_guest(
hotel_id=hotel_id,
guest_id=guest_id,
@@ -638,8 +622,7 @@ class ConversionService:
guest_birth_date=guest_birth_date,
session=session,
)
if conversion_guest:
conversion.conversion_guest_id = conversion_guest.id
# guest_id is already set on conversion, so the composite FK relationship is established
# Batch-load existing room reservations to avoid N+1 queries
room_numbers = [
@@ -863,6 +846,7 @@ class ConversionService:
matched_reservation = match_result["reservation"]
matched_customer = match_result["customer"]
matched_hashed_customer = match_result["hashed_customer"]
match_type = match_result.get("match_type") # "id" or "guest_details"
# Update the conversion with matched entities if found
if matched_reservation or matched_customer or matched_hashed_customer:
@@ -875,6 +859,15 @@ class ConversionService:
conversion.hashed_customer_id = (
matched_hashed_customer.id if matched_hashed_customer else None
)
# Set attribution flags based on match type
if match_type == "id":
conversion.directly_attributable = True
conversion.guest_matched = False
elif match_type == "guest_details":
conversion.directly_attributable = False
conversion.guest_matched = True
conversion.updated_at = datetime.now()
# Update stats
@@ -902,22 +895,23 @@ class ConversionService:
) -> dict[str, Any]:
"""Find matching Reservation, Customer, and HashedCustomer.
Uses two strategies:
1. Advertising data matching (fbclid/gclid/utm_campaign) with guest details fallback
2. If no advertising data match, falls back to email/name-based matching
Uses two strategies with separate attribution:
1. ID-based matching (fbclid/gclid/md5_unique_id) - directly_attributable
2. Guest detail matching (email/name) - guest_matched only
Args:
advertising_campagne: Truncated tracking ID from conversion XML
hotel_id: Hotel ID for additional filtering
reservation_date: Reservation date for additional filtering
guest_first_name: Guest first name for matching
guest_last_name: Guest last name for matching
guest_email: Guest email for matching
guest_first_name: Guest first name (hashed) for matching
guest_last_name: Guest last name (hashed) for matching
guest_email: Guest email (hashed) for matching
advertising_partner: Partner info (matches utm_medium for additional filtering)
session: AsyncSession to use. If None, uses self.session.
Returns:
Dictionary with 'reservation', 'customer', and 'hashed_customer' keys
Dictionary with 'reservation', 'customer', 'hashed_customer', and 'match_type' keys.
match_type is either 'id' (high confidence) or 'guest_details' (lower confidence)
"""
if session is None:
@@ -926,9 +920,10 @@ class ConversionService:
"reservation": None,
"customer": None,
"hashed_customer": None,
"match_type": None, # "id" or "guest_details"
}
# Strategy 1: Try to match by advertising data (fbclid/gclid/utm_campaign)
# Strategy 1: Try to match by advertising data (fbclid/gclid/md5_unique_id) - ID-based, high confidence
if advertising_campagne:
matched_reservation = await self._match_by_advertising(
advertising_campagne,
@@ -942,19 +937,20 @@ class ConversionService:
if matched_reservation:
result["reservation"] = matched_reservation
result["match_type"] = "id" # Matched by ID
_LOGGER.info(
"Matched conversion by advertising data (advertisingCampagne=%s, hotel=%s)",
"Matched conversion by advertising ID data (advertisingCampagne=%s, hotel=%s)",
advertising_campagne,
hotel_id,
)
else:
_LOGGER.debug(
"No match found by advertising data (advertisingCampagne=%s), "
"falling back to email/name matching",
"No match found by advertising ID data (advertisingCampagne=%s), "
"falling back to guest details matching",
advertising_campagne,
)
# Strategy 2: If no advertising match, try email/name-based matching
# Strategy 2: If no ID-based match, try email/name-based matching - guest details, lower confidence
if not result["reservation"] and (
guest_email or guest_first_name or guest_last_name
):
@@ -964,6 +960,7 @@ class ConversionService:
if matched_reservation:
result["reservation"] = matched_reservation
result["match_type"] = "guest_details" # Matched by guest details only
_LOGGER.info(
"Matched conversion by guest details (name=%s %s, email=%s, hotel=%s)",
guest_first_name,
@@ -1484,6 +1481,7 @@ class ConversionService:
matched_reservation = match_result["reservation"]
matched_customer = match_result["customer"]
matched_hashed_customer = match_result["hashed_customer"]
match_type = match_result.get("match_type") # "id" or "guest_details"
# Update the conversion with matched entities if found
if matched_reservation or matched_customer or matched_hashed_customer:
@@ -1496,6 +1494,15 @@ class ConversionService:
conversion.hashed_customer_id = (
matched_hashed_customer.id if matched_hashed_customer else None
)
# Set attribution flags based on match type
if match_type == "id":
conversion.directly_attributable = True
conversion.guest_matched = False
elif match_type == "guest_details":
conversion.directly_attributable = False
conversion.guest_matched = True
conversion.updated_at = datetime.now()
# Update stats if provided

View File

@@ -12,6 +12,7 @@ from sqlalchemy import (
DateTime,
Double,
ForeignKey,
ForeignKeyConstraint,
Integer,
String,
)
@@ -368,19 +369,17 @@ class ConversionGuest(Base):
"""Guest information from hotel PMS conversions, with hashed fields for privacy.
Stores both unhashed (for reference during transition) and hashed (SHA256 per Meta API)
versions of guest PII. Multiple conversions can reference the same guest if they have
the same hotel_id and guest_id (PMS guest identifier).
versions of guest PII. Uses composite primary key (hotel_id, guest_id) from the PMS.
When multiple conversions for the same guest arrive with different guest info,
the most recent (by creation_time) data is kept as the canonical version.
the most recent (by last_seen) data is kept as the canonical version.
"""
__tablename__ = "conversion_guests"
id = Column(Integer, primary_key=True)
# Natural keys from PMS (composite unique constraint)
hotel_id = Column(String, nullable=False, index=True)
guest_id = Column(String, index=True) # PMS guest ID (nullable for unidentified guests)
# Natural keys from PMS - composite primary key
hotel_id = Column(String, nullable=False, primary_key=True, index=True)
guest_id = Column(String, nullable=False, primary_key=True, index=True)
# Unhashed guest information (for reference/transition period)
guest_first_name = Column(String)
@@ -396,6 +395,9 @@ class ConversionGuest(Base):
hashed_country_code = Column(String(64))
hashed_birth_date = Column(String(64))
# Guest classification
is_regular = Column(Boolean, default=False) # True if guest has many prior stays before appearing in our reservations
# Metadata
first_seen = Column(DateTime(timezone=True))
last_seen = Column(DateTime(timezone=True))
@@ -424,6 +426,7 @@ class ConversionGuest(Base):
guest_country_code: str | None,
guest_birth_date: Date | None,
now: DateTime,
is_regular: bool = False,
):
"""Create a ConversionGuest from conversion guest data."""
return cls(
@@ -441,6 +444,7 @@ class ConversionGuest(Base):
hashed_birth_date=cls._normalize_and_hash(
guest_birth_date.isoformat() if guest_birth_date else None
),
is_regular=is_regular,
first_seen=now,
last_seen=now,
)
@@ -544,6 +548,12 @@ class Conversion(Base):
The tracking data transferered by the PMS is however somewhat shorter.
We therefore also need to match on guest name/email and other metadata.
Attribution flags:
- directly_attributable: True if matched by ID (reservation_id is set), meaning
this conversion is directly responsible for this reservation
- guest_matched: True if matched only by guest details (customer_id/hashed_customer_id set),
meaning the same person made this request but the reservation may not be directly attributable
"""
__tablename__ = "conversions"
@@ -557,12 +567,10 @@ class Conversion(Base):
hashed_customer_id = Column(
Integer, ForeignKey("hashed_customers.id"), nullable=True, index=True
)
conversion_guest_id = Column(
Integer, ForeignKey("conversion_guests.id"), nullable=True, index=True
)
# Reservation metadata from XML
hotel_id = Column(String, index=True) # hotelID attribute
guest_id = Column(String, nullable=True, index=True) # PMS guest ID, FK to conversion_guests
pms_reservation_id = Column(String, index=True) # id attribute from reservation
reservation_number = Column(String) # number attribute
reservation_date = Column(Date) # date attribute (when reservation was made)
@@ -570,13 +578,8 @@ class Conversion(Base):
reservation_type = Column(String) # type attribute (e.g., "reservation")
booking_channel = Column(String) # bookingChannel attribute
# Guest information from reservation XML - used for matching
guest_first_name = Column(String, index=True) # firstName from guest element
guest_last_name = Column(String, index=True) # lastName from guest element
guest_email = Column(String, index=True) # email from guest element
guest_country_code = Column(String) # countryCode from guest element
guest_birth_date = Column(Date) # birthDate from guest element
guest_id = Column(String) # id from guest element
# Advertising/tracking data - used for matching to existing reservations
advertising_medium = Column(
@@ -589,10 +592,23 @@ class Conversion(Base):
String, index=True
) # advertisingCampagne (contains fbclid/gclid)
# Attribution flags - track how this conversion was matched
directly_attributable = Column(Boolean, default=False) # Matched by ID (high confidence)
guest_matched = Column(Boolean, default=False) # Matched by guest details only
# Metadata
created_at = Column(DateTime(timezone=True)) # When this record was imported
updated_at = Column(DateTime(timezone=True)) # When this record was last updated
# Composite foreign key constraint for ConversionGuest (hotel_id, guest_id)
__table_args__ = (
ForeignKeyConstraint(
["hotel_id", "guest_id"],
["conversion_guests.hotel_id", "conversion_guests.guest_id"],
ondelete="SET NULL",
),
)
# Relationships
reservation = relationship("Reservation", backref="conversions")
customer = relationship("Customer", backref="conversions")

View File

@@ -454,7 +454,7 @@ class TestHashedMatchingLogic:
xml_content = f"""<?xml version="1.0"?>
<root>
<reservation id="pms_123" hotelID="hotel_1" number="RES001" date="2025-01-15">
<guest firstName="David" lastName="Miller" email="david@example.com"/>
<guest id="guest_001" firstName="David" lastName="Miller" email="david@example.com"/>
<roomReservations>
<roomReservation roomNumber="101" arrival="2025-01-15" departure="2025-01-17" status="confirmed">
<dailySales>
@@ -476,12 +476,20 @@ class TestHashedMatchingLogic:
assert conversion is not None, "Conversion should be created"
assert conversion.hotel_id == "hotel_1"
assert conversion.guest_first_name == "David"
assert conversion.guest_last_name == "Miller"
assert conversion.guest_email == "david@example.com"
assert conversion.guest_id is not None, "ConversionGuest should be linked"
# Verify conversion_guest was created
assert conversion.conversion_guest_id is not None, "ConversionGuest should be created"
# Verify conversion_guest was created with the correct data
from sqlalchemy.orm import selectinload
result_with_guest = await test_db_session.execute(
select(Conversion)
.where(Conversion.pms_reservation_id == "pms_123")
.options(selectinload(Conversion.guest))
)
conversion_with_guest = result_with_guest.scalar_one_or_none()
assert conversion_with_guest.guest is not None, "ConversionGuest relationship should exist"
assert conversion_with_guest.guest.guest_first_name == "David"
assert conversion_with_guest.guest.guest_last_name == "Miller"
assert conversion_with_guest.guest.guest_email == "david@example.com"
# Verify conversion_room was created
room_result = await test_db_session.execute(
@@ -559,24 +567,21 @@ class TestHashedMatchingLogic:
assert matched is None, "Should not match with missing hashed fields"
@pytest.mark.asyncio
async def test_duplicate_conversion_guests_with_same_hotel_and_guest_id(
async def test_conversion_guest_composite_key_prevents_duplicates(
self, test_db_session
):
"""Test handling of duplicate ConversionGuest records with same (hotel_id, guest_id).
"""Test that ConversionGuest composite primary key (hotel_id, guest_id) prevents duplicates.
This test reproduces the production issue where multiple ConversionGuest records
can be created with the same (hotel_id, guest_id) combination, causing
scalar_one_or_none() to fail with "Multiple rows were found when one or none was required".
With the new schema, the composite PK ensures that each (hotel_id, guest_id) combination
is unique. This prevents the production issue where multiple ConversionGuest records
could exist for the same guest, which previously caused scalar_one_or_none() to fail.
This can happen when:
- Multiple conversions arrive for the same hotel and PMS guest within the same batch
- The XML is processed multiple times
- Race conditions in concurrent processing
Now the database itself enforces uniqueness at the PK level.
"""
hotel_id = "test_hotel"
guest_id = "guest_123"
# Simulate the production scenario: multiple conversion guests with same (hotel_id, guest_id)
# Create and commit first conversion guest
guest1 = ConversionGuest.create_from_conversion_data(
hotel_id=hotel_id,
guest_id=guest_id,
@@ -588,10 +593,21 @@ class TestHashedMatchingLogic:
now=None,
)
test_db_session.add(guest1)
await test_db_session.flush()
await test_db_session.commit()
# Create a second guest with the SAME (hotel_id, guest_id)
# This should not happen, but can occur in production
# Verify guest was created
result = await test_db_session.execute(
select(ConversionGuest).where(
(ConversionGuest.hotel_id == hotel_id)
& (ConversionGuest.guest_id == guest_id)
)
)
guests = result.scalars().all()
assert len(guests) == 1, "Should have created one guest"
assert guests[0].guest_first_name == "John"
# Now try to create a second guest with the SAME (hotel_id, guest_id)
# With composite PK, this should raise an IntegrityError
guest2 = ConversionGuest.create_from_conversion_data(
hotel_id=hotel_id,
guest_id=guest_id,
@@ -603,42 +619,12 @@ class TestHashedMatchingLogic:
now=None,
)
test_db_session.add(guest2)
# The composite PK constraint prevents the duplicate insert
from sqlalchemy.exc import IntegrityError
with pytest.raises(IntegrityError):
await test_db_session.commit()
# Now try to query for the guest by (hotel_id, guest_id)
# This should return multiple results
result = await test_db_session.execute(
select(ConversionGuest).where(
(ConversionGuest.hotel_id == hotel_id)
& (ConversionGuest.guest_id == guest_id)
)
)
guests = result.scalars().all()
# Verify we have duplicates (the production bug condition)
assert len(guests) == 2, "Should have created duplicate conversion guests"
# Verify that scalars().first() returns one of the guests (the fixed behavior)
result2 = await test_db_session.execute(
select(ConversionGuest).where(
(ConversionGuest.hotel_id == hotel_id)
& (ConversionGuest.guest_id == guest_id)
)
)
first_guest = result2.scalars().first()
assert first_guest is not None, "Should find at least one guest with scalars().first()"
# The old code would have raised an error here with scalar_one_or_none()
# when finding multiple results. Now it's fixed to use .first() instead.
result3 = await test_db_session.execute(
select(ConversionGuest).where(
(ConversionGuest.hotel_id == hotel_id)
& (ConversionGuest.guest_id == guest_id)
)
)
with pytest.raises(Exception): # MultipleResultsFound from old code path
result3.scalar_one_or_none()
if __name__ == "__main__":
pytest.main([__file__, "-v"])