Hashed conversion matching and more. #12

Merged
jonas merged 13 commits from hashed_conversion_matching into main 2025-11-19 19:40:07 +00:00
2 changed files with 291 additions and 76 deletions
Showing only changes of commit 35c29d80bb - Show all commits

View File

@@ -940,6 +940,8 @@ class ConversionService:
if matched_reservation: if matched_reservation:
result["reservation"] = matched_reservation result["reservation"] = matched_reservation
result["match_type"] = "id" # Matched by ID result["match_type"] = "id" # Matched by ID
_LOGGER.info( _LOGGER.info(
"Matched conversion by advertising ID data (advertisingCampagne=%s, hotel=%s)", "Matched conversion by advertising ID data (advertisingCampagne=%s, hotel=%s)",
advertising_campagne, advertising_campagne,
@@ -1323,10 +1325,156 @@ class ConversionService:
# No single clear match found # No single clear match found
return None return None
async def _extract_unmatched_guests(
self, session: AsyncSession
) -> dict[str, HashedCustomer]:
"""Phase 3b: Extract unique guests from unmatched conversions and match them to customers.
Returns a mapping of guest_id -> HashedCustomer for all unique guests found in
unmatched conversions. Only processes each guest once.
Args:
session: AsyncSession for database queries
Returns:
Dictionary mapping guest_id to matched HashedCustomer (or None if no match)
"""
# Find all conversions that have no reservation/customer match yet
result = await session.execute(
select(Conversion)
.where(
(Conversion.reservation_id.is_(None))
& (Conversion.customer_id.is_(None))
& (Conversion.guest_id.isnot(None))
)
.options(selectinload(Conversion.guest))
)
unmatched_conversions = result.scalars().all()
if not unmatched_conversions:
_LOGGER.debug("Phase 3b: No unmatched conversions with guests")
return {}
# Extract unique guests (by guest_id)
unique_guests: dict[str, Conversion] = {}
for conversion in unmatched_conversions:
if conversion.guest_id not in unique_guests:
unique_guests[conversion.guest_id] = conversion
_LOGGER.info(
"Phase 3b: Found %d unique guests from %d unmatched conversions",
len(unique_guests),
len(unmatched_conversions),
)
# Match each unique guest to a hashed customer
guest_to_hashed_customer: dict[str, HashedCustomer] = {}
for guest_id, conversion in unique_guests.items():
conversion_guest = conversion.guest
if not conversion_guest:
continue
# Try to match by guest details
matched_hashed_customer = await self._match_by_guest_details_hashed(
conversion_guest.hashed_first_name,
conversion_guest.hashed_last_name,
conversion_guest.hashed_email,
session,
)
if matched_hashed_customer:
guest_to_hashed_customer[guest_id] = matched_hashed_customer
_LOGGER.debug(
"Phase 3b: Matched guest %s to hashed_customer %d",
guest_id,
matched_hashed_customer.id,
)
else:
guest_to_hashed_customer[guest_id] = None
_LOGGER.debug("Phase 3b: No match found for guest %s", guest_id)
return guest_to_hashed_customer
async def _link_matched_guests_to_reservations(
self,
guest_to_hashed_customer: dict[str, HashedCustomer],
session: AsyncSession,
stats: dict[str, int],
) -> None:
"""Phase 3c: Link conversions from matched guests to reservations based on dates.
For each guest matched to a hashed_customer, find all conversions from that guest
that don't have a reservation yet, and try to link them to reservations based on
date matching.
Args:
guest_to_hashed_customer: Mapping from guest_id to matched HashedCustomer
session: AsyncSession for database queries
stats: Shared stats dictionary to update
"""
for guest_id, matched_hashed_customer in guest_to_hashed_customer.items():
if not matched_hashed_customer or not matched_hashed_customer.customer_id:
continue
# Find all unmatched conversions from this guest
result = await session.execute(
select(Conversion)
.where(
(Conversion.guest_id == guest_id)
& (Conversion.reservation_id.is_(None))
& (Conversion.customer_id.is_(None))
)
.options(selectinload(Conversion.conversion_rooms))
)
conversions = result.scalars().all()
if not conversions:
continue
_LOGGER.debug(
"Phase 3c: Processing %d conversions for guest %s (customer_id=%d)",
len(conversions),
guest_id,
matched_hashed_customer.customer_id,
)
# Try to link each conversion to a reservation for this customer
for conversion in conversions:
matched_reservation, is_attributable = await self._check_if_attributable(
matched_hashed_customer.customer_id, conversion, session
)
if matched_reservation and is_attributable:
conversion.reservation_id = matched_reservation.id
conversion.customer_id = matched_hashed_customer.customer_id
conversion.hashed_customer_id = matched_hashed_customer.id
conversion.directly_attributable = False
conversion.guest_matched = True
conversion.updated_at = datetime.now()
stats["matched_to_reservation"] += 1
_LOGGER.info(
"Phase 3c: Linked conversion (pms_id=%s) to reservation %d via guest matching",
conversion.pms_reservation_id,
matched_reservation.id,
)
elif matched_hashed_customer:
# No attributable reservation found, but link to customer/hashed customer
conversion.customer_id = matched_hashed_customer.customer_id
conversion.hashed_customer_id = matched_hashed_customer.id
conversion.directly_attributable = False
conversion.guest_matched = True
conversion.updated_at = datetime.now()
stats["matched_to_customer"] += 1
async def _match_conversions_from_db_sequential( async def _match_conversions_from_db_sequential(
self, pms_reservation_ids: list[str], stats: dict[str, int] self, pms_reservation_ids: list[str], stats: dict[str, int]
) -> None: ) -> None:
"""Phase 2: Match conversions sequentially using database data only.""" """Phase 3a: Match conversions sequentially using database data only.
Performs ID-based matching for all conversions. Then extracts unique guests
and performs guest detail matching in phases 3b and 3c.
"""
semaphore = asyncio.Semaphore(1) # Process one at a time semaphore = asyncio.Semaphore(1) # Process one at a time
async with asyncio.TaskGroup() as tg: async with asyncio.TaskGroup() as tg:
for pms_id in pms_reservation_ids: for pms_id in pms_reservation_ids:
@@ -1334,10 +1482,36 @@ class ConversionService:
self._match_conversion_from_db_safe(pms_id, semaphore, stats) self._match_conversion_from_db_safe(pms_id, semaphore, stats)
) )
# After Phase 3a (ID-based matching), do Phase 3b and 3c (guest detail matching)
if self.session_maker:
session = await self.session_maker.create_session()
else:
session = self.session
try:
# Phase 3b: Extract and cache unique guests from unmatched conversions
guest_to_hashed_customer = await self._extract_unmatched_guests(session)
# Phase 3c: Link matched guests to reservations
if guest_to_hashed_customer:
await self._link_matched_guests_to_reservations(
guest_to_hashed_customer, session, stats
)
await session.commit()
except Exception as e:
await session.rollback()
_LOGGER.exception("Error in Phase 3b/3c guest matching: %s", e)
finally:
if self.session_maker:
await session.close()
async def _match_conversions_from_db_concurrent( async def _match_conversions_from_db_concurrent(
self, pms_reservation_ids: list[str], stats: dict[str, int] self, pms_reservation_ids: list[str], stats: dict[str, int]
) -> None: ) -> None:
"""Phase 2: Match conversions concurrently using database data only. """Phase 3a: Match conversions concurrently using database data only.
Performs ID-based matching for all conversions concurrently. Then extracts unique guests
and performs guest detail matching sequentially in phases 3b and 3c.
Each concurrent task gets its own independent database session Each concurrent task gets its own independent database session
from the SessionMaker. from the SessionMaker.
@@ -1357,6 +1531,26 @@ class ConversionService:
self._match_conversion_from_db_safe(pms_id, semaphore, stats) self._match_conversion_from_db_safe(pms_id, semaphore, stats)
) )
# After Phase 3a (ID-based matching), do Phase 3b and 3c (guest detail matching)
# Use sequential processing for guest matching to avoid duplicating work
session = await self.session_maker.create_session()
try:
# Phase 3b: Extract and cache unique guests from unmatched conversions
guest_to_hashed_customer = await self._extract_unmatched_guests(session)
# Phase 3c: Link matched guests to reservations
if guest_to_hashed_customer:
await self._link_matched_guests_to_reservations(
guest_to_hashed_customer, session, stats
)
await session.commit()
except Exception as e:
await session.rollback()
_LOGGER.exception("Error in Phase 3b/3c guest matching: %s", e)
finally:
await session.close()
async def _match_conversion_from_db_safe( async def _match_conversion_from_db_safe(
self, self,
pms_reservation_id: str, pms_reservation_id: str,
@@ -1414,18 +1608,14 @@ class ConversionService:
session: AsyncSession | None = None, session: AsyncSession | None = None,
stats: dict[str, int] | None = None, stats: dict[str, int] | None = None,
) -> None: ) -> None:
"""Phase 2: Match a conversion using only persisted database data. """Phase 3a: Match a conversion using ID-based matching only.
This method reads both the conversion and conversion_guest from the database This method reads both the conversion and conversion_guest from the database
and uses their stored hashed data to match to existing reservations/customers. and uses their stored hashed data to match to existing reservations via
No XML parsing, no re-hashing - complete separation of concerns. advertising data (fbclid/gclid/md5_unique_id).
Matching rules: Guest detail matching is deferred to Phase 3b/3c to avoid reprocessing the same
1. Guest detail matches: Record in conversion_guests table with hashed_customer reference guest multiple times.
2. ID-based matches (md5_hash/click_id): Can also record directly in conversions-reservations
with directly_attributable=True
3. Regular guest detection: Check if conversions exist with dates before all reservations,
or if conversion_room dates match reservation dates
Updates stats dictionary in-place if provided. Updates stats dictionary in-place if provided.
@@ -1466,22 +1656,33 @@ class ConversionService:
hashed_last_name = conversion_guest.hashed_last_name hashed_last_name = conversion_guest.hashed_last_name
hashed_email = conversion_guest.hashed_email hashed_email = conversion_guest.hashed_email
# Perform matching using already-hashed data from database # Phase 3a: Only try ID-based matching (fbclid/gclid/md5_unique_id)
match_result = await self._find_matching_entities( # Guest detail matching is deferred to Phase 3b/3c
conversion.advertising_campagne, matched_reservation = None
conversion.hotel_id, matched_customer = None
conversion.reservation_date, matched_hashed_customer = None
hashed_first_name,
hashed_last_name,
hashed_email,
conversion.advertising_partner,
session,
)
matched_reservation = match_result["reservation"] if conversion.advertising_campagne:
matched_customer = match_result["customer"] matched_reservation = await self._match_by_advertising(
matched_hashed_customer = match_result["hashed_customer"] conversion.advertising_campagne,
match_type = match_result.get("match_type") # "id" or "guest_details" conversion.hotel_id,
hashed_first_name,
hashed_last_name,
hashed_email,
conversion.advertising_partner,
session,
)
if matched_reservation:
matched_customer = matched_reservation.customer
if matched_customer and matched_customer.hashed_version:
matched_hashed_customer = matched_customer.hashed_version
_LOGGER.info(
"Phase 3a: Matched conversion by advertising ID (pms_id=%s, reservation_id=%d)",
pms_reservation_id,
matched_reservation.id,
)
# Update the conversion with matched entities if found # Update the conversion with matched entities if found
if matched_reservation or matched_customer or matched_hashed_customer: if matched_reservation or matched_customer or matched_hashed_customer:
@@ -1495,22 +1696,11 @@ class ConversionService:
matched_hashed_customer.id if matched_hashed_customer else None matched_hashed_customer.id if matched_hashed_customer else None
) )
# Set attribution flags based on match type and check for regular guest # ID-based matches are always directly attributable
is_attributable = False conversion.directly_attributable = True
if match_type == "id": conversion.guest_matched = False
# ID-based matches (fbclid/gclid/md5_unique_id) are always directly attributable
conversion.directly_attributable = True
conversion.guest_matched = False
is_attributable = True
elif match_type == "guest_details":
# Guest detail matches: check if guest is regular or dates match
is_attributable = await self._check_if_attributable(
matched_reservation, conversion, session
)
conversion.directly_attributable = is_attributable
conversion.guest_matched = True
# Check if guest is regular for both ID and guest detail matches # Check if guest is regular
if matched_reservation: if matched_reservation:
await self._check_if_regular(conversion, matched_reservation, session) await self._check_if_regular(conversion, matched_reservation, session)
@@ -1597,49 +1787,71 @@ class ConversionService:
async def _check_if_attributable( async def _check_if_attributable(
self, self,
matched_reservation: Reservation, matched_customer_id: int,
conversion: Conversion, conversion: Conversion,
session: AsyncSession, session: AsyncSession,
) -> bool: exclude_reservation_id: int | None = None,
"""Check if a guest detail matched conversion should be marked as attributable. ) -> tuple[Reservation | None, bool]:
"""Search for and check if a customer has an attributable reservation for this conversion.
A conversion is attributable ONLY if the conversion_room dates match the reservation dates closely. Finds reservations linked to the customer and checks if any have dates matching this conversion.
A conversion is attributable ONLY if the conversion_room dates match a reservation's dates closely.
Args: Args:
matched_reservation: The matched Reservation record matched_customer_id: The customer ID to search reservations for
conversion: The Conversion record being evaluated conversion: The Conversion record being evaluated
session: AsyncSession for database queries session: AsyncSession for database queries
exclude_reservation_id: Reservation ID to exclude from search (if already matched)
Returns: Returns:
True if the conversion should be marked as attributable (based on date matching), False otherwise Tuple of (matched_reservation, is_attributable) where:
- matched_reservation: The Reservation that matches (if any)
- is_attributable: True if the reservation's dates match this conversion
""" """
# Check if conversion_room dates match reservation dates (criterion for attributability) # Check if conversion_room dates exist (criterion for attributability)
if not conversion.conversion_rooms or not matched_reservation: if not conversion.conversion_rooms:
return False return None, False
for room in conversion.conversion_rooms: # Find reservations for this customer
if ( query = select(Reservation).where(
room.arrival_date Reservation.customer_id == matched_customer_id
and room.departure_date )
and matched_reservation.start_date if exclude_reservation_id:
and matched_reservation.end_date query = query.where(Reservation.id != exclude_reservation_id)
):
# Check if dates match or mostly match (within 1 day tolerance)
arrival_match = abs(
(room.arrival_date - matched_reservation.start_date).days
) <= 7
departure_match = abs(
(room.departure_date - matched_reservation.end_date).days
) <= 7
if arrival_match and departure_match: db_result = await session.execute(query)
_LOGGER.info( reservations = db_result.scalars().all()
"Marking conversion as attributable: room dates %s-%s match reservation dates %s-%s",
room.arrival_date,
room.departure_date,
matched_reservation.start_date,
matched_reservation.end_date,
)
return True
return False if not reservations:
return None, False
# Check each reservation for date match
for reservation in reservations:
for room in conversion.conversion_rooms:
if (
room.arrival_date
and room.departure_date
and reservation.start_date
and reservation.end_date
):
# Check if dates match or mostly match (within 7 day tolerance)
arrival_match = abs(
(room.arrival_date - reservation.start_date).days
) <= 7
departure_match = abs(
(room.departure_date - reservation.end_date).days
) <= 7
if arrival_match and departure_match:
_LOGGER.info(
"Found attributable reservation for customer %d: "
"room dates %s-%s match reservation dates %s-%s",
matched_customer_id,
room.arrival_date,
room.departure_date,
reservation.start_date,
reservation.end_date,
)
return reservation, True
return None, False

View File

@@ -159,9 +159,12 @@ class TestConversionServiceWithImportedData:
EXPECTED_TOTAL_ROOMS = 539 EXPECTED_TOTAL_ROOMS = 539
# Note: Currently no matches by tracking ID because XML data uses different formats # Note: Currently no matches by tracking ID because XML data uses different formats
# This is expected with the test data. Real PMS data would have higher match rates. # This is expected with the test data. Real PMS data would have higher match rates.
EXPECTED_MATCHED_TO_RESERVATION = 0 # With the refactored Phase 3b/3c matching logic, we now properly link guest-matched
# conversions to reservations when dates match, so we get 19 matched to reservation
# instead of just matched to customer.
EXPECTED_MATCHED_TO_RESERVATION = 19
EXPECTED_MATCHED_TO_CUSTOMER = 19 EXPECTED_MATCHED_TO_CUSTOMER = 0
print(f"\nBaseline Match Counts:") print(f"\nBaseline Match Counts:")
print(f" Total reservations in XML: {EXPECTED_TOTAL_RESERVATIONS}") print(f" Total reservations in XML: {EXPECTED_TOTAL_RESERVATIONS}")