diff --git a/src/alpine_bits_python/conversion_service.py b/src/alpine_bits_python/conversion_service.py index 9d10153..3b494de 100644 --- a/src/alpine_bits_python/conversion_service.py +++ b/src/alpine_bits_python/conversion_service.py @@ -940,6 +940,8 @@ class ConversionService: if matched_reservation: result["reservation"] = matched_reservation result["match_type"] = "id" # Matched by ID + + _LOGGER.info( "Matched conversion by advertising ID data (advertisingCampagne=%s, hotel=%s)", advertising_campagne, @@ -1323,10 +1325,156 @@ class ConversionService: # No single clear match found return None + async def _extract_unmatched_guests( + self, session: AsyncSession + ) -> dict[str, HashedCustomer]: + """Phase 3b: Extract unique guests from unmatched conversions and match them to customers. + + Returns a mapping of guest_id -> HashedCustomer for all unique guests found in + unmatched conversions. Only processes each guest once. + + Args: + session: AsyncSession for database queries + + Returns: + Dictionary mapping guest_id to matched HashedCustomer (or None if no match) + """ + # Find all conversions that have no reservation/customer match yet + result = await session.execute( + select(Conversion) + .where( + (Conversion.reservation_id.is_(None)) + & (Conversion.customer_id.is_(None)) + & (Conversion.guest_id.isnot(None)) + ) + .options(selectinload(Conversion.guest)) + ) + unmatched_conversions = result.scalars().all() + + if not unmatched_conversions: + _LOGGER.debug("Phase 3b: No unmatched conversions with guests") + return {} + + # Extract unique guests (by guest_id) + unique_guests: dict[str, Conversion] = {} + for conversion in unmatched_conversions: + if conversion.guest_id not in unique_guests: + unique_guests[conversion.guest_id] = conversion + + _LOGGER.info( + "Phase 3b: Found %d unique guests from %d unmatched conversions", + len(unique_guests), + len(unmatched_conversions), + ) + + # Match each unique guest to a hashed customer + guest_to_hashed_customer: dict[str, HashedCustomer] = {} + for guest_id, conversion in unique_guests.items(): + conversion_guest = conversion.guest + if not conversion_guest: + continue + + # Try to match by guest details + matched_hashed_customer = await self._match_by_guest_details_hashed( + conversion_guest.hashed_first_name, + conversion_guest.hashed_last_name, + conversion_guest.hashed_email, + session, + ) + + if matched_hashed_customer: + guest_to_hashed_customer[guest_id] = matched_hashed_customer + _LOGGER.debug( + "Phase 3b: Matched guest %s to hashed_customer %d", + guest_id, + matched_hashed_customer.id, + ) + else: + guest_to_hashed_customer[guest_id] = None + _LOGGER.debug("Phase 3b: No match found for guest %s", guest_id) + + return guest_to_hashed_customer + + async def _link_matched_guests_to_reservations( + self, + guest_to_hashed_customer: dict[str, HashedCustomer], + session: AsyncSession, + stats: dict[str, int], + ) -> None: + """Phase 3c: Link conversions from matched guests to reservations based on dates. + + For each guest matched to a hashed_customer, find all conversions from that guest + that don't have a reservation yet, and try to link them to reservations based on + date matching. + + Args: + guest_to_hashed_customer: Mapping from guest_id to matched HashedCustomer + session: AsyncSession for database queries + stats: Shared stats dictionary to update + """ + for guest_id, matched_hashed_customer in guest_to_hashed_customer.items(): + if not matched_hashed_customer or not matched_hashed_customer.customer_id: + continue + + # Find all unmatched conversions from this guest + result = await session.execute( + select(Conversion) + .where( + (Conversion.guest_id == guest_id) + & (Conversion.reservation_id.is_(None)) + & (Conversion.customer_id.is_(None)) + ) + .options(selectinload(Conversion.conversion_rooms)) + ) + conversions = result.scalars().all() + + if not conversions: + continue + + _LOGGER.debug( + "Phase 3c: Processing %d conversions for guest %s (customer_id=%d)", + len(conversions), + guest_id, + matched_hashed_customer.customer_id, + ) + + # Try to link each conversion to a reservation for this customer + for conversion in conversions: + matched_reservation, is_attributable = await self._check_if_attributable( + matched_hashed_customer.customer_id, conversion, session + ) + + if matched_reservation and is_attributable: + conversion.reservation_id = matched_reservation.id + conversion.customer_id = matched_hashed_customer.customer_id + conversion.hashed_customer_id = matched_hashed_customer.id + conversion.directly_attributable = False + conversion.guest_matched = True + conversion.updated_at = datetime.now() + stats["matched_to_reservation"] += 1 + + _LOGGER.info( + "Phase 3c: Linked conversion (pms_id=%s) to reservation %d via guest matching", + conversion.pms_reservation_id, + matched_reservation.id, + ) + elif matched_hashed_customer: + # No attributable reservation found, but link to customer/hashed customer + conversion.customer_id = matched_hashed_customer.customer_id + conversion.hashed_customer_id = matched_hashed_customer.id + conversion.directly_attributable = False + conversion.guest_matched = True + conversion.updated_at = datetime.now() + stats["matched_to_customer"] += 1 + async def _match_conversions_from_db_sequential( self, pms_reservation_ids: list[str], stats: dict[str, int] ) -> None: - """Phase 2: Match conversions sequentially using database data only.""" + """Phase 3a: Match conversions sequentially using database data only. + + Performs ID-based matching for all conversions. Then extracts unique guests + and performs guest detail matching in phases 3b and 3c. + """ semaphore = asyncio.Semaphore(1) # Process one at a time async with asyncio.TaskGroup() as tg: for pms_id in pms_reservation_ids: @@ -1334,10 +1482,36 @@ class ConversionService: self._match_conversion_from_db_safe(pms_id, semaphore, stats) ) + # After Phase 3a (ID-based matching), do Phase 3b and 3c (guest detail matching) + if self.session_maker: + session = await self.session_maker.create_session() + else: + session = self.session + + try: + # Phase 3b: Extract and cache unique guests from unmatched conversions + guest_to_hashed_customer = await self._extract_unmatched_guests(session) + + # Phase 3c: Link matched guests to reservations + if guest_to_hashed_customer: + await self._link_matched_guests_to_reservations( + guest_to_hashed_customer, session, stats + ) + await session.commit() + except Exception as e: + await session.rollback() + _LOGGER.exception("Error in Phase 3b/3c guest matching: %s", e) + finally: + if self.session_maker: + await session.close() + async def _match_conversions_from_db_concurrent( self, pms_reservation_ids: list[str], stats: dict[str, int] ) -> None: - """Phase 2: Match conversions concurrently using database data only. + """Phase 3a: Match conversions concurrently using database data only. + + Performs ID-based matching for all conversions concurrently. Then extracts unique guests + and performs guest detail matching sequentially in phases 3b and 3c. Each concurrent task gets its own independent database session from the SessionMaker. @@ -1357,6 +1531,26 @@ class ConversionService: self._match_conversion_from_db_safe(pms_id, semaphore, stats) ) + # After Phase 3a (ID-based matching), do Phase 3b and 3c (guest detail matching) + # Use sequential processing for guest matching to avoid duplicating work + session = await self.session_maker.create_session() + + try: + # Phase 3b: Extract and cache unique guests from unmatched conversions + guest_to_hashed_customer = await self._extract_unmatched_guests(session) + + # Phase 3c: Link matched guests to reservations + if guest_to_hashed_customer: + await self._link_matched_guests_to_reservations( + guest_to_hashed_customer, session, stats + ) + await session.commit() + except Exception as e: + await session.rollback() + _LOGGER.exception("Error in Phase 3b/3c guest matching: %s", e) + finally: + await session.close() + async def _match_conversion_from_db_safe( self, pms_reservation_id: str, @@ -1414,18 +1608,14 @@ class ConversionService: session: AsyncSession | None = None, stats: dict[str, int] | None = None, ) -> None: - """Phase 2: Match a conversion using only persisted database data. + """Phase 3a: Match a conversion using ID-based matching only. This method reads both the conversion and conversion_guest from the database - and uses their stored hashed data to match to existing reservations/customers. - No XML parsing, no re-hashing - complete separation of concerns. + and uses their stored hashed data to match to existing reservations via + advertising data (fbclid/gclid/md5_unique_id). - Matching rules: - 1. Guest detail matches: Record in conversion_guests table with hashed_customer reference - 2. ID-based matches (md5_hash/click_id): Can also record directly in conversions-reservations - with directly_attributable=True - 3. Regular guest detection: Check if conversions exist with dates before all reservations, - or if conversion_room dates match reservation dates + Guest detail matching is deferred to Phase 3b/3c to avoid reprocessing the same + guest multiple times. Updates stats dictionary in-place if provided. @@ -1466,22 +1656,33 @@ class ConversionService: hashed_last_name = conversion_guest.hashed_last_name hashed_email = conversion_guest.hashed_email - # Perform matching using already-hashed data from database - match_result = await self._find_matching_entities( - conversion.advertising_campagne, - conversion.hotel_id, - conversion.reservation_date, - hashed_first_name, - hashed_last_name, - hashed_email, - conversion.advertising_partner, - session, - ) + # Phase 3a: Only try ID-based matching (fbclid/gclid/md5_unique_id) + # Guest detail matching is deferred to Phase 3b/3c + matched_reservation = None + matched_customer = None + matched_hashed_customer = None - matched_reservation = match_result["reservation"] - matched_customer = match_result["customer"] - matched_hashed_customer = match_result["hashed_customer"] - match_type = match_result.get("match_type") # "id" or "guest_details" + if conversion.advertising_campagne: + matched_reservation = await self._match_by_advertising( + conversion.advertising_campagne, + conversion.hotel_id, + hashed_first_name, + hashed_last_name, + hashed_email, + conversion.advertising_partner, + session, + ) + + if matched_reservation: + matched_customer = matched_reservation.customer + if matched_customer and matched_customer.hashed_version: + matched_hashed_customer = matched_customer.hashed_version + + _LOGGER.info( + "Phase 3a: Matched conversion by advertising ID (pms_id=%s, reservation_id=%d)", + pms_reservation_id, + matched_reservation.id, + ) # Update the conversion with matched entities if found if matched_reservation or matched_customer or matched_hashed_customer: @@ -1495,22 +1696,11 @@ class ConversionService: matched_hashed_customer.id if matched_hashed_customer else None ) - # Set attribution flags based on match type and check for regular guest - is_attributable = False - if match_type == "id": - # ID-based matches (fbclid/gclid/md5_unique_id) are always directly attributable - conversion.directly_attributable = True - conversion.guest_matched = False - is_attributable = True - elif match_type == "guest_details": - # Guest detail matches: check if guest is regular or dates match - is_attributable = await self._check_if_attributable( - matched_reservation, conversion, session - ) - conversion.directly_attributable = is_attributable - conversion.guest_matched = True + # ID-based matches are always directly attributable + conversion.directly_attributable = True + conversion.guest_matched = False - # Check if guest is regular for both ID and guest detail matches + # Check if guest is regular if matched_reservation: await self._check_if_regular(conversion, matched_reservation, session) @@ -1597,49 +1787,71 @@ class ConversionService: async def _check_if_attributable( self, - matched_reservation: Reservation, + matched_customer_id: int, conversion: Conversion, session: AsyncSession, - ) -> bool: - """Check if a guest detail matched conversion should be marked as attributable. + exclude_reservation_id: int | None = None, + ) -> tuple[Reservation | None, bool]: + """Search for and check if a customer has an attributable reservation for this conversion. - A conversion is attributable ONLY if the conversion_room dates match the reservation dates closely. + Finds reservations linked to the customer and checks if any have dates matching this conversion. + A conversion is attributable ONLY if the conversion_room dates match a reservation's dates closely. Args: - matched_reservation: The matched Reservation record + matched_customer_id: The customer ID to search reservations for conversion: The Conversion record being evaluated session: AsyncSession for database queries + exclude_reservation_id: Reservation ID to exclude from search (if already matched) Returns: - True if the conversion should be marked as attributable (based on date matching), False otherwise + Tuple of (matched_reservation, is_attributable) where: + - matched_reservation: The Reservation that matches (if any) + - is_attributable: True if the reservation's dates match this conversion """ - # Check if conversion_room dates match reservation dates (criterion for attributability) - if not conversion.conversion_rooms or not matched_reservation: - return False + # Check if conversion_room dates exist (criterion for attributability) + if not conversion.conversion_rooms: + return None, False - for room in conversion.conversion_rooms: - if ( - room.arrival_date - and room.departure_date - and matched_reservation.start_date - and matched_reservation.end_date - ): - # Check if dates match or mostly match (within 1 day tolerance) - arrival_match = abs( - (room.arrival_date - matched_reservation.start_date).days - ) <= 7 - departure_match = abs( - (room.departure_date - matched_reservation.end_date).days - ) <= 7 + # Find reservations for this customer + query = select(Reservation).where( + Reservation.customer_id == matched_customer_id + ) + if exclude_reservation_id: + query = query.where(Reservation.id != exclude_reservation_id) - if arrival_match and departure_match: - _LOGGER.info( - "Marking conversion as attributable: room dates %s-%s match reservation dates %s-%s", - room.arrival_date, - room.departure_date, - matched_reservation.start_date, - matched_reservation.end_date, - ) - return True + db_result = await session.execute(query) + reservations = db_result.scalars().all() - return False + if not reservations: + return None, False + + # Check each reservation for date match + for reservation in reservations: + for room in conversion.conversion_rooms: + if ( + room.arrival_date + and room.departure_date + and reservation.start_date + and reservation.end_date + ): + # Check if dates match or mostly match (within 7 day tolerance) + arrival_match = abs( + (room.arrival_date - reservation.start_date).days + ) <= 7 + departure_match = abs( + (room.departure_date - reservation.end_date).days + ) <= 7 + + if arrival_match and departure_match: + _LOGGER.info( + "Found attributable reservation for customer %d: " + "room dates %s-%s match reservation dates %s-%s", + matched_customer_id, + room.arrival_date, + room.departure_date, + reservation.start_date, + reservation.end_date, + ) + return reservation, True + + return None, False diff --git a/tests/test_conversion_service.py b/tests/test_conversion_service.py index 2b40440..8ee22ba 100644 --- a/tests/test_conversion_service.py +++ b/tests/test_conversion_service.py @@ -159,9 +159,12 @@ class TestConversionServiceWithImportedData: EXPECTED_TOTAL_ROOMS = 539 # Note: Currently no matches by tracking ID because XML data uses different formats # This is expected with the test data. Real PMS data would have higher match rates. - EXPECTED_MATCHED_TO_RESERVATION = 0 + # With the refactored Phase 3b/3c matching logic, we now properly link guest-matched + # conversions to reservations when dates match, so we get 19 matched to reservation + # instead of just matched to customer. + EXPECTED_MATCHED_TO_RESERVATION = 19 - EXPECTED_MATCHED_TO_CUSTOMER = 19 + EXPECTED_MATCHED_TO_CUSTOMER = 0 print(f"\nBaseline Match Counts:") print(f" Total reservations in XML: {EXPECTED_TOTAL_RESERVATIONS}")