From f033abf76e4bc8096597b8fb527738f40471e601 Mon Sep 17 00:00:00 2001 From: Jonas Linter <{email_address}> Date: Wed, 3 Dec 2025 17:05:58 +0100 Subject: [PATCH] Seems to mostly work now. Regular matching is still wrong --- sql_analysis.md | 2 +- src/alpine_bits_python/api.py | 2 +- src/alpine_bits_python/conversion_service.py | 1245 ++++-------------- tests/test_conversion_service.py | 20 +- 4 files changed, 301 insertions(+), 968 deletions(-) diff --git a/sql_analysis.md b/sql_analysis.md index 786f848..61a572e 100644 --- a/sql_analysis.md +++ b/sql_analysis.md @@ -42,7 +42,7 @@ select res.id, res.created_at, con.created_at as "Con Created at", con.updated_a left join alpinebits.conversions as con on con.reservation_id = res.id left join alpinebits.conversion_guests as g on g.guest_id = con.guest_id - where hotel_code = '39054_001' + where hotel_id = '39054_001' order by res.created_at desc limit 400 diff --git a/src/alpine_bits_python/api.py b/src/alpine_bits_python/api.py index 20a3b98..c6ba9e2 100644 --- a/src/alpine_bits_python/api.py +++ b/src/alpine_bits_python/api.py @@ -1197,7 +1197,7 @@ async def _process_conversion_xml_background( # Now process the conversion XML _LOGGER.info("Starting database processing of %s", filename) conversion_service = ConversionService(session_maker, hotel.hotel_id) - processing_stats = await conversion_service.process_conversion_xml(xml_content) + processing_stats = await conversion_service.process_conversion_xml(xml_content, run_full_guest_matching=True) _LOGGER.info( "Conversion processing complete for %s: %s", filename, processing_stats diff --git a/src/alpine_bits_python/conversion_service.py b/src/alpine_bits_python/conversion_service.py index 7b53558..4130ebc 100644 --- a/src/alpine_bits_python/conversion_service.py +++ b/src/alpine_bits_python/conversion_service.py @@ -446,11 +446,16 @@ class ConversionService: len(items), ) - async def process_conversion_xml(self, xml_content: str) -> dict[str, Any]: + async def process_conversion_xml( + self, xml_content: str, *, run_full_guest_matching: bool = False + ) -> dict[str, Any]: """Parse conversion XML and save daily sales data to database. Args: xml_content: XML string containing reservation and daily sales data + run_full_guest_matching: If True, run guest-based matching for all unmatched + conversions of this hotel after processing the XML. When False (default), + guest matching only runs for the conversions touched in this XML. Returns: Dictionary with processing statistics @@ -575,10 +580,7 @@ class ConversionService: len(reservations), ) - # Phase 3: Match all conversions using database data only - # Only match conversions that actually exist in the database - # No XML parsing, no re-hashing - complete separation of concerns - # This also enables matching historical data that wasn't just created + # Phase 3: Match all conversions using database data only via advertising IDs if pms_reservation_ids: _LOGGER.debug("Phase 3: Matching conversions to reservations/customers") if self.supports_concurrent: @@ -590,6 +592,11 @@ class ConversionService: pms_reservation_ids, stats ) + # Guest matching (optional full scan or limited to current XML) + await self._run_guest_matching( + pms_reservation_ids, stats, run_full_guest_matching + ) + return stats async def _load_reservation_cache(self) -> None: @@ -667,7 +674,7 @@ class ConversionService: async def _process_reservations_sequential( self, reservations: list, stats: dict[str, int] - ) -> list[str]: + ) -> list[int]: """Process reservations one at a time (original behavior). Returns: @@ -705,7 +712,7 @@ class ConversionService: async def _process_reservations_concurrent( self, reservations: list, stats: dict[str, int] - ) -> list[str]: + ) -> list[int]: """Process reservations concurrently with semaphore limiting. Each concurrent task gets its own independent database session @@ -1055,25 +1062,17 @@ class ConversionService: hashed_first_name: str | None, hashed_last_name: str | None, hashed_email: str | None, - advertising_partner: str | None, session: AsyncSession | None = None, - raw_first_name: str | None = None, - raw_last_name: str | None = None, - raw_email: str | None = None, ) -> Reservation | None: """Match reservation by advertising tracking data (fbclid/gclid/md5_unique_id). Args: - advertising_campagne: Tracking ID from PMS (could be truncated click_id or md5_unique_id) + advertising_campagne: Tracking ID from PMS (typically md5_unique_id) hotel_id: Hotel ID for filtering hashed_first_name: Guest first name (hashed) for disambiguation hashed_last_name: Guest last name (hashed) for disambiguation hashed_email: Guest email (hashed) for disambiguation - advertising_partner: Partner info (matches utm_medium) session: AsyncSession to use. If None, uses self.session. - raw_first_name: Plain guest first name (optional fallback) - raw_last_name: Plain guest last name (optional fallback) - raw_email: Plain guest email (optional fallback) Returns: Matched Reservation or None @@ -1081,548 +1080,117 @@ class ConversionService: """ if session is None: session = self.session - # Find reservations where: - # - fbclid/gclid starts with the advertising_campagne value, OR - # - md5_unique_id matches exactly (for direct ID matching) - query = select(Reservation).where( + + if not advertising_campagne: + return None + + base_query = select(Reservation).options(selectinload(Reservation.customer)) + if hotel_id: + base_query = base_query.where(Reservation.hotel_id == hotel_id) + + # Normal flow: md5 hash matches exactly + is_md5_lookup = len(advertising_campagne or "") == 32 + if is_md5_lookup: + md5_query = base_query.where( + Reservation.md5_unique_id == advertising_campagne + ) + md5_result = await session.execute(md5_query) + md5_matches = md5_result.scalars().all() + if md5_matches: + return self._select_reservation_by_guest_hashes( + md5_matches, + hashed_first_name, + hashed_last_name, + hashed_email, + ) + + # Fallback: advertising ids (fbclid/gclid) are truncated, so only match prefix + like_pattern = f"{advertising_campagne}%" + advertising_query = base_query.where( or_( - Reservation.fbclid.like(f"{advertising_campagne}%"), - Reservation.gclid.like(f"{advertising_campagne}%"), - Reservation.md5_unique_id == advertising_campagne, + Reservation.fbclid.like(like_pattern), + Reservation.gclid.like(like_pattern), ) ) - - # Eagerly load the customer relationship - query = query.options(selectinload(Reservation.customer)) - - # Add hotel filter if available - if hotel_id: - query = query.where(Reservation.hotel_id == hotel_id) - - # Execute query - db_result = await session.execute(query) - reservations = db_result.scalars().all() + advertising_result = await session.execute(advertising_query) + reservations = advertising_result.scalars().all() if not reservations: return None - # Determine if this looks like a md5_unique_id (32 hex characters) or a click_id - is_md5_lookup = len(advertising_campagne or "") == 32 - - needs_filtering = len(reservations) > 1 or not is_md5_lookup - - if not needs_filtering: - # Confident single match via md5_unique_id - return reservations[0] - - # If multiple matches or click-id matches, try to narrow down using hashed guest details - _LOGGER.info( - ( - "Ambiguous advertising match for %s (hotel=%s, candidates=%d, md5_lookup=%s). " - "Applying guest detail filtering." - ), - advertising_campagne, - hotel_id, - len(reservations), - is_md5_lookup, - ) - - matched_reservation = self._filter_reservations_by_guest_details( - reservations, - raw_first_name, - raw_last_name, - raw_email, - advertising_partner, - hashed_first_name=hashed_first_name, - hashed_last_name=hashed_last_name, - hashed_email=hashed_email, - ) - - if matched_reservation is None: - # If we still can't narrow it down, use the first match and log warning - _LOGGER.warning( - "Could not narrow down multiple reservations for advertisingCampagne %s " - "(hotel=%s, guest=%s %s, email=%s). Using first match.", + if len(reservations) > 1: + _LOGGER.info( + ( + "Ambiguous advertising match for %s (hotel=%s, candidates=%d). " + "Using hashed guest data to deduplicate." + ), advertising_campagne, hotel_id, - raw_first_name, - raw_last_name, - raw_email, - ) - matched_reservation = reservations[0] - - return matched_reservation - - async def _match_by_guest_details_hashed( - self, - guest_first_name: str | None, - guest_last_name: str | None, - guest_email: str | None, - session: AsyncSession | None = None, - *, - conversion_guest: ConversionGuest | None = None, - ) -> Customer | None: - """Match guest by name and email directly to Customer (no Reservation needed). - - This method bypasses the Reservation table entirely and matches directly against - hashed customer data. Used for guest-detail matching where we don't need to link - to a specific reservation. - - Args: - guest_first_name: Guest first name (pre-hashed) - guest_last_name: Guest last name (pre-hashed) - guest_email: Guest email (pre-hashed) - session: AsyncSession to use. If None, uses self.session. - - Returns: - Matched Customer or None - - """ - if session is None: - session = self.session - - # Query all hashed customers that match the guest details - query = select(Customer) - - # Build filter conditions - conditions = [] - if guest_email: - conditions.append(Customer.hashed_email == guest_email) - if guest_first_name and guest_last_name: - conditions.append( - (Customer.hashed_given_name == guest_first_name) - & (Customer.hashed_surname == guest_last_name) + len(reservations), ) - if not conditions: - return None - - # Combine conditions with OR (match if email matches OR name matches) - query = query.where(or_(*conditions)) - - db_result = await session.execute(query) - matches = db_result.scalars().all() - - if not matches: - return None - - # If single match, return it - if len(matches) == 1: - return matches[0] - - best_customer: Customer | None = None - best_score = -1 - tie = False - - for candidate in matches: - candidate_score = self._score_guest_customer_match( - conversion_guest, - candidate, - hashed_first_name=guest_first_name, - hashed_last_name=guest_last_name, - hashed_email=guest_email, - ) - if candidate_score > best_score: - best_score = candidate_score - best_customer = candidate - tie = False - elif candidate_score == best_score: - tie = True - - if best_customer and best_score > 0 and not tie: - _LOGGER.debug( - "Multiple hashed customer matches; selected candidate %s via score %s", - best_customer.id, - best_score, - ) - return best_customer - - _LOGGER.warning( - "Multiple hashed customer matches found for guest details, using first match" + return self._select_reservation_by_guest_hashes( + reservations, + hashed_first_name, + hashed_last_name, + hashed_email, ) - return matches[0] - def _filter_reservations_by_guest_details( + def _select_reservation_by_guest_hashes( self, reservations: list[Reservation], - guest_first_name: str | None, - guest_last_name: str | None, - guest_email: str | None, - advertising_partner: str | None, - *, - hashed_first_name: str | None = None, - hashed_last_name: str | None = None, - hashed_email: str | None = None, + hashed_first_name: str | None, + hashed_last_name: str | None, + hashed_email: str | None, ) -> Reservation | None: - """Filter reservations using guest details to find a single match. + """Select the best matching reservation using hashed guest info.""" + if not reservations: + return None - Prefers hashed comparisons (exact match on hashed email or hashed name pair) and - falls back to plaintext comparison if hashes are unavailable. Finally tries - advertising partner vs utm_medium. - - Args: - reservations: List of candidate reservations - guest_first_name: Guest first name (plaintext, optional) - guest_last_name: Guest last name (plaintext, optional) - guest_email: Guest email (plaintext, optional) - advertising_partner: Partner info (e.g., "Facebook_Mobile_Feed") - hashed_first_name: Hashed first name for cross-checking - hashed_last_name: Hashed last name for cross-checking - hashed_email: Hashed email for cross-checking - - Returns: - Single best-match Reservation, or None if no good match found - - """ - candidates = reservations - - # Attempt hashed email match first - if hashed_email: - email_matches = [ - reservation - for reservation in candidates - if reservation.customer - and reservation.customer.hashed_email + def _matches_email(reservation: Reservation) -> bool: + return ( + hashed_email is not None + and reservation.customer is not None and reservation.customer.hashed_email == hashed_email - ] - if len(email_matches) == 1: - _LOGGER.debug("Found unique match via hashed email") - return email_matches[0] - if email_matches: - candidates = email_matches + ) - # Attempt hashed name match (first + last) - if hashed_first_name and hashed_last_name: - name_matches = [ - reservation - for reservation in candidates - if reservation.customer + def _matches_full_name(reservation: Reservation) -> bool: + return ( + hashed_first_name is not None + and hashed_last_name is not None + and reservation.customer is not None and reservation.customer.hashed_given_name == hashed_first_name and reservation.customer.hashed_surname == hashed_last_name - ] - if len(name_matches) == 1: - _LOGGER.debug("Found unique match via hashed names") - return name_matches[0] - if name_matches: - candidates = name_matches - - # Fallback to plaintext comparison if provided - if guest_first_name or guest_last_name or guest_email: - for reservation in candidates: - customer = reservation.customer - if not customer: - continue - - name_match = True - email_match = True - - if guest_first_name: - name_match = name_match and ( - customer.given_name - and customer.given_name.lower() == guest_first_name.lower() - ) - - if guest_last_name: - name_match = name_match and ( - customer.surname - and customer.surname.lower() == guest_last_name.lower() - ) - - if guest_email: - email_match = email_match and ( - customer.email_address - and customer.email_address.lower() == guest_email.lower() - ) - - if name_match and email_match: - _LOGGER.debug( - "Found exact plaintext match on guest details for %s %s", - guest_first_name, - guest_last_name, - ) - return reservation - - # Try to narrow down by advertising_partner matching utm_medium - if advertising_partner: - for reservation in candidates: - if ( - reservation.utm_medium - and reservation.utm_medium.lower() == advertising_partner.lower() - ): - _LOGGER.debug( - "Found match on advertising_partner=%s matching utm_medium", - advertising_partner, - ) - return reservation - - # No single clear match found - return None - - async def _extract_unmatched_guests( - self, session: AsyncSession - ) -> dict[str, Customer]: - """Phase 3b: Extract unique guests from unmatched conversions and match them to customers. - - Returns a mapping of guest_id -> Customer for all unique guests found in - unmatched conversions. Only processes each guest once. - - This includes: - 1. Conversions with no customer match at all (reservation_id IS NULL AND customer_id IS NULL) - 2. Conversions matched to a customer but not a reservation (reservation_id IS NULL AND customer_id IS NOT NULL) - - These may have been matched in a previous run and need re-evaluation for reservation linking - - Args: - session: AsyncSession for database queries - - Returns: - Dictionary mapping guest_id to matched Customer (or None if no match) - - """ - # Find all conversions that either: - # - Have no match at all (reservation_id IS NULL AND customer_id IS NULL), OR - # - Have a customer but no reservation (for re-linking in case new reservations were added) - result = await session.execute( - select(Conversion) - .where( - (Conversion.reservation_id.is_(None)) - & (Conversion.guest_id.isnot(None)) - ) - .options(selectinload(Conversion.guest)) - ) - unmatched_conversions = result.scalars().all() - - if not unmatched_conversions: - _LOGGER.debug("Phase 3b: No unmatched conversions with guests") - return {} - - # Extract unique guests (by guest_id) - unique_guests: dict[str, Conversion] = {} - for conversion in unmatched_conversions: - if conversion.guest_id not in unique_guests: - unique_guests[conversion.guest_id] = conversion - - _LOGGER.info( - "Phase 3b: Found %d unique guests from %d unmatched conversions", - len(unique_guests), - len(unmatched_conversions), - ) - - # Match each unique guest to a hashed customer - guest_to_hashed_customer: dict[str, Customer] = {} - for guest_id, conversion in unique_guests.items(): - conversion_guest = conversion.guest - if not conversion_guest: - continue - - # Try to match by guest details - matched_hashed_customer = await self._match_by_guest_details_hashed( - conversion_guest.hashed_first_name, - conversion_guest.hashed_last_name, - conversion_guest.hashed_email, - session, - conversion_guest=conversion_guest, ) - if matched_hashed_customer: - guest_to_hashed_customer[guest_id] = matched_hashed_customer - _LOGGER.debug( - "Phase 3b: Matched guest %s to hashed_customer %d", - guest_id, - matched_hashed_customer.id, + email_matches = [res for res in reservations if _matches_email(res)] + if email_matches: + if len(email_matches) > 1: + _LOGGER.warning( + "Multiple reservations matched hashed email; using first candidate" ) - else: - guest_to_hashed_customer[guest_id] = None - _LOGGER.debug("Phase 3b: No match found for guest %s", guest_id) + return email_matches[0] - return guest_to_hashed_customer - - async def _link_matched_guests_to_reservations( - self, - guest_to_customer_dict: dict[str, Customer], - session: AsyncSession, - stats: dict[str, int], - ) -> None: - """Phase 3c: Link conversions from matched guests to reservations based on dates. - - For each guest matched to a hashed_customer, find all conversions from that guest - that don't have a reservation yet, and try to link them to reservations based on - date matching. - - This includes: - 1. Conversions with no customer match (will link customer first) - 2. Conversions already linked to a customer from a previous run (will try to link to reservation) - - After all conversions for a guest are processed, check if the guest is a regular - by looking at whether they have paying conversions that predate any reservations. - - Args: - guest_to_customer: Mapping from guest_id to matched Customer - session: AsyncSession for database queries - stats: Shared stats dictionary to update - - """ - for guest_id, matched_hashed_customer in guest_to_customer_dict.items(): - if not matched_hashed_customer or not matched_hashed_customer.id: - continue - - # Find all conversions from this guest that don't have a reservation - # (whether or not they have a customer match - we might be re-running after new reservations added) - result = await session.execute( - select(Conversion) - .where( - (Conversion.guest_id == guest_id) - & (Conversion.reservation_id.is_(None)) + name_matches = [res for res in reservations if _matches_full_name(res)] + if name_matches: + if len(name_matches) > 1: + _LOGGER.warning( + "Multiple reservations matched hashed name; using first candidate" ) - .options( - selectinload(Conversion.conversion_rooms), - selectinload(Conversion.guest), - ) - ) - conversions = result.scalars().all() + return name_matches[0] - if not conversions: - continue - - _LOGGER.debug( - "Phase 3c: Processing %d conversions for guest %s (customer_id=%d)", - len(conversions), - guest_id, - matched_hashed_customer.id, + if len(reservations) > 1: + _LOGGER.warning( + "Unable to disambiguate %d reservations without hashed guest data; using first match", + len(reservations), ) - # Try to link each conversion to a reservation for this customer - for conversion in conversions: - ( - matched_reservation, - is_attributable, - ) = await self._check_if_attributable( - matched_hashed_customer.id, conversion, session - ) - - if matched_reservation and is_attributable: - # Only update stats if this is a NEW match (wasn't matched before) - was_previously_matched = conversion.customer_id is not None - - conversion.reservation_id = matched_reservation.id - conversion.customer_id = matched_hashed_customer.id - conversion.directly_attributable = True - conversion.guest_matched = True - conversion.updated_at = datetime.now() - - if not was_previously_matched: - stats["matched_to_reservation"] += 1 - - _LOGGER.info( - "Phase 3c: Linked conversion (pms_id=%s) to reservation %d via guest matching", - conversion.pms_reservation_id, - matched_reservation.id, - ) - elif matched_hashed_customer and conversion.customer_id is None: - # Only count new customer matches (conversions that didn't have a customer before) - conversion.customer_id = matched_hashed_customer.id - conversion.directly_attributable = False - conversion.guest_matched = True - conversion.updated_at = datetime.now() - stats["matched_to_customer"] += 1 - - # After all conversions for this guest are processed, check if guest is regular - # Look at ALL conversions from this guest to see if there are pre-dated payments - if conversions and conversions[0].guest: - await self._check_if_guest_is_regular( - guest_id, matched_hashed_customer.id, session - ) - - async def _check_regularity_for_all_matched_guests( - self, session: AsyncSession - ) -> None: - """Phase 3d: Check regularity for ALL matched guests (both ID-matched and guest-detail-matched). - - This is called after all matching is complete to evaluate every guest that has been - matched to a customer, regardless of match type. This ensures consistent regularity - evaluation across all matched conversions. - - This is run on ALL matched guests, not just newly matched ones, to ensure that if - the regularity logic changes, it gets re-applied to all guests on the next run. - This maintains idempotency of the matching process. - - Args: - session: AsyncSession for database queries - - """ - # Collect every (hotel, guest) -> customer pair derived from conversions. - result = await session.execute( - select(Conversion.hotel_id, Conversion.guest_id, Conversion.customer_id).where( - Conversion.guest_id.isnot(None), Conversion.customer_id.isnot(None) - ) - ) - guest_customer_rows = result.all() - - if not guest_customer_rows: - _LOGGER.debug("Phase 3d: No matched guests to check for regularity") - return - - # Group by guest and by customer to detect conflicts in both directions. - guest_customer_sets: dict[tuple[str | None, int], set[int]] = {} - customer_guest_sets: dict[int, set[tuple[str | None, int]]] = {} - for hotel_id, guest_id, customer_id in guest_customer_rows: - if hotel_id is None or guest_id is None or customer_id is None: - continue - guest_key = (hotel_id, guest_id) - guest_customer_sets.setdefault(guest_key, set()).add(customer_id) - customer_guest_sets.setdefault(customer_id, set()).add(guest_key) - - if not guest_customer_sets: - _LOGGER.debug("Phase 3d: No matched guests to check for regularity") - return - - guest_duplicates = { - key: customer_ids - for key, customer_ids in guest_customer_sets.items() - if len(customer_ids) > 1 - } - if guest_duplicates: - await self._deduplicate_guest_customer_links(guest_duplicates, session) - - customer_duplicates = { - customer_id: guest_keys - for customer_id, guest_keys in customer_guest_sets.items() - if len(guest_keys) > 1 - } - if customer_duplicates: - await self._deduplicate_customer_guest_links(customer_duplicates, session) - - refreshed = await session.execute( - select( - Conversion.hotel_id, Conversion.guest_id, Conversion.customer_id - ).where(Conversion.guest_id.isnot(None), Conversion.customer_id.isnot(None)) - ) - guest_to_customer: dict[tuple[str | None, int], int] = {} - for hotel_id, guest_id, customer_id in refreshed.all(): - if hotel_id is None or guest_id is None or customer_id is None: - continue - guest_to_customer[(hotel_id, guest_id)] = customer_id - - if not guest_to_customer: - _LOGGER.debug( - "Phase 3d: No guests remained linked to customers after deduplication" - ) - return - - _LOGGER.debug( - "Phase 3d: Checking regularity for %d matched guests", len(guest_to_customer) - ) - - for (hotel_id, guest_id), customer_id in guest_to_customer.items(): - await self._check_if_guest_is_regular(guest_id, customer_id, session) + return reservations[0] async def _match_conversions_from_db_sequential( - self, pms_reservation_ids: list[str], stats: dict[str, int] + self, pms_reservation_ids: list[int], stats: dict[str, int] ) -> None: - """Phase 3a: Match conversions sequentially using database data only. - - Performs ID-based matching for all conversions. Then extracts unique guests - and performs guest detail matching in phases 3b and 3c. - """ + """Match conversions sequentially using database data only.""" semaphore = asyncio.Semaphore(1) # Process one at a time async with asyncio.TaskGroup() as tg: for pms_id in pms_reservation_ids: @@ -1630,44 +1198,10 @@ class ConversionService: self._match_conversion_from_db_safe(pms_id, semaphore, stats) ) - # After Phase 3a (ID-based matching), do Phase 3b and 3c (guest detail matching) - if self.session_maker: - session = await self.session_maker.create_session() - else: - session = self.session - - try: - # Phase 3b: Extract and cache unique guests from unmatched conversions - guest_to_hashed_customer = await self._extract_unmatched_guests(session) - - # Phase 3c: Link matched guests to reservations - if guest_to_hashed_customer: - await self._link_matched_guests_to_reservations( - guest_to_hashed_customer, session, stats - ) - - # Phase 3d: Check regularity for all matched guests (both ID and guest-detail matched) - await self._check_regularity_for_all_matched_guests(session) - - await session.commit() - except Exception as e: - await session.rollback() - _LOGGER.exception("Error in Phase 3b/3c/3d guest matching: %s", e) - finally: - if self.session_maker: - await session.close() - async def _match_conversions_from_db_concurrent( - self, pms_reservation_ids: list[str], stats: dict[str, int] + self, pms_reservation_ids: list[int], stats: dict[str, int] ) -> None: - """Phase 3a: Match conversions concurrently using database data only. - - Performs ID-based matching for all conversions concurrently. Then extracts unique guests - and performs guest detail matching sequentially in phases 3b and 3c. - - Each concurrent task gets its own independent database session - from the SessionMaker. - """ + """Match conversions concurrently using database data only.""" if not self.session_maker: _LOGGER.error( "Concurrent matching requested but SessionMaker not available. " @@ -1683,30 +1217,6 @@ class ConversionService: self._match_conversion_from_db_safe(pms_id, semaphore, stats) ) - # After Phase 3a (ID-based matching), do Phase 3b and 3c (guest detail matching) - # Use sequential processing for guest matching to avoid duplicating work - session = await self.session_maker.create_session() - - try: - # Phase 3b: Extract and cache unique guests from unmatched conversions - guest_to_hashed_customer = await self._extract_unmatched_guests(session) - - # Phase 3c: Link matched guests to reservations - if guest_to_hashed_customer: - await self._link_matched_guests_to_reservations( - guest_to_hashed_customer, session, stats - ) - - # Phase 3d: Check regularity for all matched guests (both ID and guest-detail matched) - await self._check_regularity_for_all_matched_guests(session) - - await session.commit() - except Exception as e: - await session.rollback() - _LOGGER.exception("Error in Phase 3b/3c/3d guest matching: %s", e) - finally: - await session.close() - async def _match_conversion_from_db_safe( self, pms_reservation_id: int, @@ -1764,16 +1274,10 @@ class ConversionService: session: AsyncSession | None = None, stats: dict[str, int] | None = None, ) -> None: - """Phase 3a: Match a conversion using ID-based matching only. + """Match a conversion to an existing reservation using tracking IDs. - This method reads both the conversion and conversion_guest from the database - and uses their stored hashed data to match to existing reservations via - advertising data (fbclid/gclid/md5_unique_id). - - Guest detail matching is deferred to Phase 3b/3c to avoid reprocessing the same - guest multiple times. - - Updates stats dictionary in-place if provided. + Uses the stored advertising tracking ID (md5_unique_id/fbclid/gclid) plus hashed + guest details for deduplication when multiple reservations share the same tracking ID. Args: pms_reservation_id: PMS reservation ID to match @@ -1819,20 +1323,12 @@ class ConversionService: hashed_first_name = None hashed_last_name = None hashed_email = None - raw_first_name = None - raw_last_name = None - raw_email = None if conversion_guest: hashed_first_name = conversion_guest.hashed_first_name hashed_last_name = conversion_guest.hashed_last_name hashed_email = conversion_guest.hashed_email - raw_first_name = conversion_guest.guest_first_name - raw_last_name = conversion_guest.guest_last_name - raw_email = conversion_guest.guest_email - # Phase 3a: Only try ID-based matching (fbclid/gclid/md5_unique_id) - # Guest detail matching is deferred to Phase 3b/3c matched_reservation = None matched_customer = None @@ -1843,30 +1339,22 @@ class ConversionService: hashed_first_name, hashed_last_name, hashed_email, - conversion.advertising_partner, session, - raw_first_name=raw_first_name, - raw_last_name=raw_last_name, - raw_email=raw_email, ) if matched_reservation: matched_customer = matched_reservation.customer _LOGGER.info( - "Phase 3a: Matched conversion by advertising ID (pms_id=%s, reservation_id=%d)", + "Matched conversion by advertising ID (pms_id=%s, reservation_id=%d)", pms_reservation_id, matched_reservation.id, ) # Update the conversion with matched entities if found - if matched_reservation or matched_customer: - conversion.reservation_id = ( - matched_reservation.id if matched_reservation else None - ) + if matched_reservation: + conversion.reservation_id = matched_reservation.id conversion.customer_id = matched_customer.id if matched_customer else None - - # ID-based matches are always directly attributable conversion.directly_attributable = True conversion.guest_matched = False @@ -1877,383 +1365,239 @@ class ConversionService: if stats is not None: if matched_reservation: stats["matched_to_reservation"] += 1 - elif matched_customer: - stats["matched_to_customer"] += 1 else: stats["unmatched"] += 1 - async def _check_if_guest_is_regular( + async def _run_guest_matching( self, - guest_id: str, - customer_id: int, - session: AsyncSession, + pms_reservation_ids: list[int], + stats: dict[str, int], + run_full_guest_matching: bool, ) -> None: - """Check if a guest is a regular customer based on conversion and reservation history. - - A guest is regular if they have conversions with paying bookings that predate their first - reservation sent by us. This indicates they were already a customer before we started tracking. - - This check is done AFTER all conversions for a guest have been processed and matched, - so it can evaluate the complete picture of their payment history vs our reservation history. + """Run guest-detail matching for conversions. Args: - guest_id: The guest ID to evaluate - customer_id: The matched customer ID - session: AsyncSession for database queries + pms_reservation_ids: Reservation IDs processed in the current XML run + stats: Shared stats dictionary + run_full_guest_matching: If True, scan all guest conversions (including those already matched) """ - # Get the ConversionGuest record - guest_result = await session.execute( - select(ConversionGuest).where(ConversionGuest.guest_id == guest_id) - ) - conversion_guest = guest_result.scalar_one_or_none() - - if not conversion_guest: + if not self.hotel_id: + _LOGGER.warning("Guest matching skipped: hotel_id not set") return - # Find the earliest paying conversion for this guest (across all hotels) - # Look for conversions with actual revenue - earliest_paying_conversion_result = await session.execute( - select(Conversion) - .join(ConversionRoom, Conversion.id == ConversionRoom.conversion_id) - .where( - Conversion.guest_id == guest_id, - ConversionRoom.total_revenue.isnot(None), - ConversionRoom.total_revenue > Decimal(0), - ) - .order_by(Conversion.reservation_date.asc()) - .limit(1) - ) - earliest_paying_conversion = ( - earliest_paying_conversion_result.scalar_one_or_none() - ) - - if not earliest_paying_conversion: - # No paying conversions found for this guest - conversion_guest.is_regular = False + if not run_full_guest_matching and not pms_reservation_ids: return - # Find the earliest reservation sent to this customer - earliest_reservation_result = await session.execute( - select(Reservation) - .where(Reservation.customer_id == customer_id) - .order_by(Reservation.start_date.asc()) - .limit(1) - ) - earliest_reservation = earliest_reservation_result.scalar_one_or_none() + if self.session_maker: + session = await self.session_maker.create_session() + close_session = True + else: + session = self.session + close_session = False - if not earliest_reservation: - # No reservations for this customer yet, can't determine regularity - conversion_guest.is_regular = False + if not session: + _LOGGER.warning("Guest matching skipped: no active session") return - # Guest is regular if their earliest paying conversion predates our first reservation - # (meaning they were already a customer before we sent them a reservation) - # Compare against the reservation's creation date (when WE created/sent it), not check-in date - # Convert created_at to date for comparison with reservation_date (both are dates) - is_regular = ( - earliest_paying_conversion.reservation_date - < earliest_reservation.created_at.date() - ) - conversion_guest.is_regular = is_regular + try: + base_conditions = [ + Conversion.hotel_id == self.hotel_id, + Conversion.guest_id.isnot(None), + ] + mode_label = "full-scan" if run_full_guest_matching else "new-only" - async def _deduplicate_guest_customer_links( - self, - duplicates: dict[tuple[str | None, int], set[int]], - session: AsyncSession, - ) -> None: - """Resolve guest/customer conflicts by comparing hashed details and severing bad links.""" - for (hotel_id, guest_id), customer_ids in duplicates.items(): - guest_result = await session.execute( - select(ConversionGuest).where( - ConversionGuest.hotel_id == hotel_id, - ConversionGuest.guest_id == guest_id, - ) - ) - conversion_guest = guest_result.scalar_one_or_none() - - if not conversion_guest: - _LOGGER.warning( - "Guest %s (hotel=%s) missing when resolving duplicates; removing links to customers %s", - guest_id, - hotel_id, - sorted(customer_ids), - ) - for customer_id in customer_ids: - await self._sever_guest_customer_link( - hotel_id, guest_id, customer_id, session - ) - continue - - preferred_customer_id = await self._choose_best_customer_for_guest( - conversion_guest, customer_ids, session + query = select(Conversion).where(*base_conditions).options( + selectinload(Conversion.guest), + selectinload(Conversion.conversion_rooms), + selectinload(Conversion.customer), ) - if preferred_customer_id: - _LOGGER.warning( - "Guest %s (hotel=%s) linked to multiple customers %s; keeping %s based on hashed data", - guest_id, - hotel_id, - sorted(customer_ids), - preferred_customer_id, + target_ids: set[int] = set() + if run_full_guest_matching: + _LOGGER.info( + "Guest matching (%s): scanning all conversions with guests for hotel %s", + mode_label, + self.hotel_id, ) else: - _LOGGER.warning( - "Guest %s (hotel=%s) linked to multiple customers %s but none matched hashed data. Removing all links.", - guest_id, - hotel_id, - sorted(customer_ids), + query = query.where(Conversion.reservation_id.is_(None)) + if pms_reservation_ids: + target_ids = { + int(pms_id) + for pms_id in pms_reservation_ids + if pms_id is not None + } + if not target_ids: + return + query = query.where(Conversion.pms_reservation_id.in_(target_ids)) + _LOGGER.debug( + "Guest matching (%s): scanning %d recent conversions", + mode_label, + len(target_ids), ) - for customer_id in customer_ids: - if customer_id != preferred_customer_id: - await self._sever_guest_customer_link( - hotel_id, guest_id, customer_id, session - ) + result = await session.execute(query) + conversions = result.scalars().all() - def _score_guest_customer_match( + if not conversions: + return + + for conversion in conversions: + had_reservation = conversion.reservation_id is not None + had_customer = conversion.customer_id is not None + + matched_reservation, matched_customer = await self._apply_guest_matching( + conversion, session, rematch_existing=run_full_guest_matching + ) + + if matched_reservation and not had_reservation: + stats["matched_to_reservation"] += 1 + if stats["unmatched"] > 0: + stats["unmatched"] -= 1 + elif matched_customer and not had_customer: + stats["matched_to_customer"] += 1 + if stats["unmatched"] > 0: + stats["unmatched"] -= 1 + + await session.commit() + except Exception as exc: + await session.rollback() + _LOGGER.exception("Guest matching failed: %s", exc) + finally: + if close_session: + await session.close() + + async def _apply_guest_matching( self, - conversion_guest: ConversionGuest | None, - customer: Customer | None, - *, - hashed_first_name: str | None = None, - hashed_last_name: str | None = None, - hashed_email: str | None = None, - ) -> int: - """Score how well a guest matches a customer using hashed data.""" - if not customer: - return -1 - - score = 0 - guest_email_hash = ( - hashed_email or (conversion_guest.hashed_email if conversion_guest else None) - ) - guest_first_hash = ( - hashed_first_name - or (conversion_guest.hashed_first_name if conversion_guest else None) - ) - guest_last_hash = ( - hashed_last_name - or (conversion_guest.hashed_last_name if conversion_guest else None) - ) - - if guest_email_hash and customer.hashed_email == guest_email_hash: - score += 200 - if guest_first_hash and guest_last_hash: - if ( - customer.hashed_given_name == guest_first_hash - and customer.hashed_surname == guest_last_hash - ): - score += 50 - elif guest_first_hash and customer.hashed_given_name == guest_first_hash: - score += 10 - elif guest_last_hash and customer.hashed_surname == guest_last_hash: - score += 10 - - if conversion_guest: - if ( - conversion_guest.hashed_country_code - and customer.hashed_country_code - == conversion_guest.hashed_country_code - ): - score += 5 - if ( - conversion_guest.hashed_birth_date - and customer.hashed_birth_date == conversion_guest.hashed_birth_date - ): - score += 2 - - return score - - async def _choose_best_customer_for_guest( - self, - conversion_guest: ConversionGuest, - candidate_customer_ids: set[int], + conversion: Conversion, session: AsyncSession, - ) -> int | None: - """Pick the most likely customer based on hashed data.""" - if not candidate_customer_ids: + *, + rematch_existing: bool, + ) -> tuple[Reservation | None, Customer | None]: + """Apply guest-detail matching to a single conversion.""" + guest = conversion.guest + if not guest: + return None, None + + existing_customer = conversion.customer + matched_customer = existing_customer + + if rematch_existing or matched_customer is None: + candidate = await self._find_customer_for_guest(guest, session) + if candidate is not None: + matched_customer = candidate + elif matched_customer is None: + return None, None + + matched_reservation = None + if matched_customer: + matched_reservation = await self._find_reservation_for_customer( + matched_customer.id, conversion, session + ) + + if matched_customer: + conversion.customer_id = matched_customer.id + conversion.guest_matched = True + + if matched_reservation: + conversion.reservation_id = matched_reservation.id + conversion.directly_attributable = True + elif conversion.reservation_id is None: + conversion.directly_attributable = False + + conversion.updated_at = datetime.now() + + return matched_reservation, matched_customer + + async def _find_customer_for_guest( + self, guest: ConversionGuest, session: AsyncSession + ) -> Customer | None: + """Locate the best matching customer for the given guest hashes.""" + conditions = [] + if guest.hashed_email: + conditions.append(Customer.hashed_email == guest.hashed_email) + if guest.hashed_first_name and guest.hashed_last_name: + conditions.append( + (Customer.hashed_given_name == guest.hashed_first_name) + & (Customer.hashed_surname == guest.hashed_last_name) + ) + + if not conditions: return None - result = await session.execute( - select(Customer).where(Customer.id.in_(candidate_customer_ids)) - ) + query = select(Customer).where(or_(*conditions)) + result = await session.execute(query) candidates = result.scalars().all() if not candidates: return None - best_customer_id = None - best_score = -1 - is_tied = False + if len(candidates) == 1: + return candidates[0] - for customer in candidates: - score = self._score_guest_customer_match(conversion_guest, customer) + best_customer: Customer | None = None + best_score = -1 + tie = False + for candidate in candidates: + score = self._score_guest_customer_match(guest, candidate) if score > best_score: best_score = score - best_customer_id = customer.id - is_tied = False - elif score == best_score and score != -1: - is_tied = True + best_customer = candidate + tie = False + elif score == best_score: + tie = True - if best_score <= 0 or is_tied: - return None + if best_customer and best_score > 0 and not tie: + return best_customer - return best_customer_id + return candidates[0] - async def _deduplicate_customer_guest_links( + def _score_guest_customer_match( + self, guest: ConversionGuest, customer: Customer + ) -> int: + """Score how well a guest matches a given customer using hashed data.""" + score = 0 + + if guest.hashed_email and customer.hashed_email == guest.hashed_email: + score += 200 + + if guest.hashed_first_name and guest.hashed_last_name: + if ( + customer.hashed_given_name == guest.hashed_first_name + and customer.hashed_surname == guest.hashed_last_name + ): + score += 100 + else: + if guest.hashed_first_name and customer.hashed_given_name == guest.hashed_first_name: + score += 40 + if guest.hashed_last_name and customer.hashed_surname == guest.hashed_last_name: + score += 40 + + if guest.hashed_country_code and customer.hashed_country_code == guest.hashed_country_code: + score += 5 + if guest.hashed_birth_date and customer.hashed_birth_date == guest.hashed_birth_date: + score += 2 + + return score + + async def _find_reservation_for_customer( self, - duplicates: dict[int, set[tuple[str | None, int]]], - session: AsyncSession, - ) -> None: - """Ensure each customer is linked to at most one guest.""" - for customer_id, guest_keys in duplicates.items(): - customer_result = await session.execute( - select(Customer).where(Customer.id == customer_id) - ) - customer = customer_result.scalar_one_or_none() - - guest_records: list[tuple[str | None, int, ConversionGuest | None]] = [] - for hotel_id, guest_id in guest_keys: - guest_result = await session.execute( - select(ConversionGuest).where( - ConversionGuest.hotel_id == hotel_id, - ConversionGuest.guest_id == guest_id, - ) - ) - guest_records.append((hotel_id, guest_id, guest_result.scalar_one_or_none())) - - if not customer: - _LOGGER.warning( - "Customer %s missing while deduplicating guests; severing links %s", - customer_id, - guest_keys, - ) - for hotel_id, guest_id, _ in guest_records: - await self._sever_guest_customer_link( - hotel_id, guest_id, customer_id, session - ) - continue - - best_key: tuple[str | None, int] | None = None - best_score = -1 - is_tied = False - for hotel_id, guest_id, guest in guest_records: - score = self._score_guest_customer_match(guest, customer) - if score > best_score: - best_score = score - best_key = (hotel_id, guest_id) - is_tied = False - elif score == best_score: - is_tied = True - - if not best_key or best_score <= 0 or is_tied: - _LOGGER.warning( - "Customer %s linked to guests %s but no clear match; removing all links", - customer_id, - guest_keys, - ) - for hotel_id, guest_id, _ in guest_records: - await self._sever_guest_customer_link( - hotel_id, guest_id, customer_id, session - ) - continue - - _LOGGER.warning( - "Customer %s linked to multiple guests %s; keeping guest %s (hotel=%s, score=%s)", - customer_id, - guest_keys, - best_key[1], - best_key[0], - best_score, - ) - for hotel_id, guest_id, _ in guest_records: - if (hotel_id, guest_id) != best_key: - await self._sever_guest_customer_link( - hotel_id, guest_id, customer_id, session - ) - - async def _sever_guest_customer_link( - self, - hotel_id: str | None, - guest_id: int, customer_id: int, - session: AsyncSession, - ) -> None: - """Remove incorrect guest/customer links from conversions.""" - result = await session.execute( - select(Conversion) - .where( - Conversion.hotel_id == hotel_id, - Conversion.guest_id == guest_id, - Conversion.customer_id == customer_id, - ) - .options(selectinload(Conversion.conversion_rooms)) - ) - conversions = result.scalars().all() - - if not conversions: - return - - for conversion in conversions: - conversion.customer_id = None - conversion.reservation_id = None - conversion.directly_attributable = False - conversion.guest_matched = False - conversion.updated_at = datetime.now() - - _LOGGER.warning( - "Removed %d conversion links for guest %s (hotel=%s) customer %s", - len(conversions), - guest_id, - hotel_id, - customer_id, - ) - - - - async def _check_if_attributable( - self, - matched_customer_id: int, conversion: Conversion, session: AsyncSession, - exclude_reservation_id: int | None = None, - ) -> tuple[Reservation | None, bool]: - """Search for and check if a customer has an attributable reservation for this conversion. - - Finds reservations linked to the customer and checks if any have dates matching this conversion. - A conversion is attributable ONLY if the conversion_room dates match a reservation's dates closely. - - Args: - matched_customer_id: The customer ID to search reservations for - conversion: The Conversion record being evaluated - session: AsyncSession for database queries - exclude_reservation_id: Reservation ID to exclude from search (if already matched) - - Returns: - Tuple of (matched_reservation, is_attributable) where: - - matched_reservation: The Reservation that matches (if any) - - is_attributable: True if the reservation's dates match this conversion - - """ - # Check if conversion_room dates exist (criterion for attributability) + ) -> Reservation | None: + """Find a reservation for the customer that matches this conversion's stay dates.""" if not conversion.conversion_rooms: - return None, False + return None - # Find reservations for this customer - query = select(Reservation).where( - Reservation.customer_id == matched_customer_id - ) - if exclude_reservation_id: - query = query.where(Reservation.id != exclude_reservation_id) - - db_result = await session.execute(query) - reservations = db_result.scalars().all() + query = select(Reservation).where(Reservation.customer_id == customer_id) + result = await session.execute(query) + reservations = result.scalars().all() if not reservations: - return None, False + return None - # Check each reservation for date match for reservation in reservations: for room in conversion.conversion_rooms: if ( @@ -2262,24 +1606,13 @@ class ConversionService: and reservation.start_date and reservation.end_date ): - # Check if dates match or mostly match (within 7 day tolerance) arrival_match = ( abs((room.arrival_date - reservation.start_date).days) <= 7 ) departure_match = ( abs((room.departure_date - reservation.end_date).days) <= 7 ) - if arrival_match and departure_match: - _LOGGER.info( - "Found attributable reservation for customer %d: " - "room dates %s-%s match reservation dates %s-%s", - matched_customer_id, - room.arrival_date, - room.departure_date, - reservation.start_date, - reservation.end_date, - ) - return reservation, True + return reservation - return None, False + return None diff --git a/tests/test_conversion_service.py b/tests/test_conversion_service.py index c180198..b1b5ace 100644 --- a/tests/test_conversion_service.py +++ b/tests/test_conversion_service.py @@ -142,7 +142,7 @@ class TestConversionServiceWithImportedData: ## Need to check if reservations and customers are now actually available in the db before proceeding - conversion_service = ConversionService(test_db_session) + conversion_service = ConversionService(test_db_session, hotel_id="39054_001") stats = await conversion_service.process_conversion_xml(xml_content) # BASELINE ASSERTIONS: @@ -224,7 +224,7 @@ class TestConversionServiceWithImportedData: # File already has proper XML structure, just use it as-is xml_content = xml_content.strip() - conversion_service = ConversionService(test_db_session) + conversion_service = ConversionService(test_db_session, hotel_id="39054_001") stats = await conversion_service.process_conversion_xml(xml_content) # Verify conversions were created @@ -300,7 +300,7 @@ class TestConversionServiceWithImportedData: # File already has proper XML structure, just use it as-is xml_content = xml_content.strip() - conversion_service = ConversionService(test_db_session) + conversion_service = ConversionService(test_db_session, hotel_id="39054_001") stats = await conversion_service.process_conversion_xml(xml_content) # Verify conversions were processed @@ -332,7 +332,7 @@ class TestConversionServiceWithImportedData: """Test ConversionService handles invalid XML gracefully.""" invalid_xml = "unclosed tag" - conversion_service = ConversionService(test_db_session) + conversion_service = ConversionService(test_db_session, hotel_id="39054_001") with pytest.raises(ValueError, match="Invalid XML"): await conversion_service.process_conversion_xml(invalid_xml) @@ -342,7 +342,7 @@ class TestConversionServiceWithImportedData: """Test ConversionService handles empty/minimal XML.""" minimal_xml = '' - conversion_service = ConversionService(test_db_session) + conversion_service = ConversionService(test_db_session, hotel_id="39054_001") stats = await conversion_service.process_conversion_xml(minimal_xml) assert stats["total_reservations"] == 0 @@ -421,7 +421,7 @@ class TestConversionServiceWithImportedData: xml_content1 = multi_builder1.build_xml() # Process first batch - service = ConversionService(test_db_session) + service = ConversionService(test_db_session, hotel_id="39054_001") stats1 = await service.process_conversion_xml(xml_content1) assert stats1["total_reservations"] == 2 @@ -577,7 +577,7 @@ class TestXMLBuilderUsage: ) # Process the XML - service = ConversionService(test_db_session) + service = ConversionService(test_db_session, hotel_id="39054_001") stats = await service.process_conversion_xml(xml_content) assert stats["total_reservations"] == 1 @@ -616,7 +616,7 @@ class TestXMLBuilderUsage: .build_xml() ) - service = ConversionService(test_db_session) + service = ConversionService(test_db_session, hotel_id="39054_001") stats = await service.process_conversion_xml(xml_content) assert stats["total_reservations"] == 1 @@ -677,7 +677,7 @@ class TestXMLBuilderUsage: xml_content = multi_builder.build_xml() # Process the XML - service = ConversionService(test_db_session) + service = ConversionService(test_db_session, hotel_id="39054_001") stats = await service.process_conversion_xml(xml_content) assert stats["total_reservations"] == 2 @@ -768,7 +768,7 @@ class TestHashedMatchingLogic: """ - service = ConversionService(test_db_session, hotel_id="hotel_1") + service = ConversionService(test_db_session, hotel_id="39054_001") stats = await service.process_conversion_xml(xml_content) # Verify conversion was created