From d27e31b0c1b5a59fe99493ceb73621808adf0a99 Mon Sep 17 00:00:00 2001 From: Jonas Linter <{email_address}> Date: Wed, 19 Nov 2025 15:10:38 +0100 Subject: [PATCH] Complete seperation --- src/alpine_bits_python/conversion_service.py | 261 ++++++++++++++++--- 1 file changed, 219 insertions(+), 42 deletions(-) diff --git a/src/alpine_bits_python/conversion_service.py b/src/alpine_bits_python/conversion_service.py index 9e77579..75a35e7 100644 --- a/src/alpine_bits_python/conversion_service.py +++ b/src/alpine_bits_python/conversion_service.py @@ -203,19 +203,32 @@ class ConversionService: if self.session_maker: await session.close() - # Process active reservations + # Process active reservations in two phases: + # Phase 1: Create/update all conversion records + # Phase 2: Match them to existing reservations/customers reservations = root.findall("reservation") stats["total_reservations"] = len(reservations) if not reservations: return stats - # Use concurrent processing if supported, otherwise sequential + # Phase 1: Create/update all conversions (no matching, no XML parsing beyond this point) if self.supports_concurrent: await self._process_reservations_concurrent(reservations, stats) else: await self._process_reservations_sequential(reservations, stats) + # Collect PMS reservation IDs from Phase 1 for Phase 2 + pms_reservation_ids = [res.get("id") for res in reservations] + + # Phase 2: Match all conversions using database data only + # No XML parsing, no re-hashing - complete separation of concerns + # This also enables matching historical data that wasn't just created + if self.supports_concurrent: + await self._match_conversions_from_db_concurrent(pms_reservation_ids, stats) + else: + await self._match_conversions_from_db_sequential(pms_reservation_ids, stats) + return stats async def _load_reservation_cache(self) -> None: @@ -332,6 +345,7 @@ class ConversionService: ) -> None: """Safely process a single reservation with semaphore and transaction management. + Phase 1: Creation/update only (no matching). In concurrent mode, creates its own session from SessionMaker. In sequential mode, uses the shared session. @@ -351,26 +365,16 @@ class ConversionService: session = self.session try: - # Process reservation with this task's session - reservation_stats = await self._process_reservation( + # Phase 1: Create/update conversion record (no matching) + reservation_stats = await self._create_or_update_conversion( reservation_elem, session ) stats["total_daily_sales"] += reservation_stats["daily_sales_count"] - stats["matched_to_reservation"] += reservation_stats.get( - "matched_to_reservation", 0 - ) - stats["matched_to_customer"] += reservation_stats.get( - "matched_to_customer", 0 - ) - stats["matched_to_hashed_customer"] += reservation_stats.get( - "matched_to_hashed_customer", 0 - ) - stats["unmatched"] += reservation_stats.get("unmatched", 0) # Commit this task's transaction await session.commit() _LOGGER.debug( - "Successfully processed and committed reservation %s", + "Successfully created/updated conversion for reservation %s", pms_reservation_id, ) @@ -420,27 +424,23 @@ class ConversionService: pms_reservation_id, ) - async def _process_reservation( + async def _create_or_update_conversion( self, reservation_elem: ET.Element, session: AsyncSession | None = None ) -> dict[str, int]: - """Process a single reservation element and its daily sales. + """Create or update a conversion record from XML (Phase 1: no matching). Args: reservation_elem: XML element to process session: AsyncSession to use. If None, uses self.session. In concurrent mode, each task passes its own session. - Returns statistics about what was matched. + Returns statistics about daily sales processed. """ if session is None: session = self.session stats = { "daily_sales_count": 0, - "matched_to_reservation": 0, - "matched_to_customer": 0, - "matched_to_hashed_customer": 0, - "unmatched": 0, } # Extract reservation metadata @@ -760,26 +760,6 @@ class ConversionService: num_adults, ) - # Now that conversion, conversion_guest, and conversion_room records exist, - # perform matching using hashed guest data - match_stats = await self._match_conversion( - conversion, - guest_first_name, - guest_last_name, - guest_email, - advertising_campagne, - advertising_partner, - hotel_id, - reservation_date, - session, - ) - - # Update stats - stats["matched_to_reservation"] = match_stats["matched_to_reservation"] - stats["matched_to_customer"] = match_stats["matched_to_customer"] - stats["matched_to_hashed_customer"] = match_stats["matched_to_hashed_customer"] - stats["unmatched"] = match_stats["unmatched"] - return stats async def _match_conversion( @@ -1308,3 +1288,200 @@ class ConversionService: # No single clear match found return None + + async def _match_conversions_from_db_sequential( + self, pms_reservation_ids: list[str], stats: dict[str, int] + ) -> None: + """Phase 2: Match conversions sequentially using database data only.""" + semaphore = asyncio.Semaphore(1) # Process one at a time + async with asyncio.TaskGroup() as tg: + for pms_id in pms_reservation_ids: + tg.create_task( + self._match_conversion_from_db_safe(pms_id, semaphore, stats) + ) + + async def _match_conversions_from_db_concurrent( + self, pms_reservation_ids: list[str], stats: dict[str, int] + ) -> None: + """Phase 2: Match conversions concurrently using database data only. + + Each concurrent task gets its own independent database session + from the SessionMaker. + """ + if not self.session_maker: + _LOGGER.error( + "Concurrent matching requested but SessionMaker not available. " + "Falling back to sequential matching." + ) + await self._match_conversions_from_db_sequential(pms_reservation_ids, stats) + return + + semaphore = asyncio.Semaphore(MAX_CONCURRENT_RESERVATIONS) + async with asyncio.TaskGroup() as tg: + for pms_id in pms_reservation_ids: + tg.create_task( + self._match_conversion_from_db_safe(pms_id, semaphore, stats) + ) + + async def _match_conversion_from_db_safe( + self, + pms_reservation_id: str, + semaphore: asyncio.Semaphore, + stats: dict[str, int], + ) -> None: + """Phase 2: Safely match a conversion using only database data with transaction management. + + In concurrent mode, creates its own session from SessionMaker. + In sequential mode, uses the shared session. + + Args: + pms_reservation_id: PMS reservation ID to match + semaphore: Semaphore to limit concurrent operations + stats: Shared stats dictionary (thread-safe due to GIL) + + """ + async with semaphore: + # In concurrent mode, create a new session for this task + if self.session_maker: + session = await self.session_maker.create_session() + else: + session = self.session + + try: + # Phase 2: Match conversion using only database data + match_stats = await self._match_conversion_using_db_data( + pms_reservation_id, session + ) + stats["matched_to_reservation"] += match_stats.get( + "matched_to_reservation", 0 + ) + stats["matched_to_customer"] += match_stats.get( + "matched_to_customer", 0 + ) + stats["matched_to_hashed_customer"] += match_stats.get( + "matched_to_hashed_customer", 0 + ) + stats["unmatched"] += match_stats.get("unmatched", 0) + + # Commit this task's transaction + await session.commit() + _LOGGER.debug( + "Successfully matched conversion for reservation %s", + pms_reservation_id, + ) + + except Exception as e: + # Rollback this task's transaction + await session.rollback() + _LOGGER.exception( + "Error matching conversion for reservation %s: %s", + pms_reservation_id, + e, + ) + stats["errors"] += 1 + finally: + # Close the session if it was created by SessionMaker + if self.session_maker: + await session.close() + + async def _match_conversion_using_db_data( + self, + pms_reservation_id: str, + session: AsyncSession | None = None, + ) -> dict[str, int]: + """Phase 2: Match a conversion using only persisted database data. + + This method reads both the conversion and conversion_guest from the database + and uses their stored hashed data to match to existing reservations/customers. + No XML parsing, no re-hashing - complete separation of concerns. + + This enables: + - Matching historical data that wasn't just created + - Re-running matching logic independently + - Consistent hashing (using already-hashed data from DB) + + Args: + pms_reservation_id: PMS reservation ID to match + session: AsyncSession to use + + Returns: + Dictionary with match statistics + """ + if session is None: + session = self.session + + stats = { + "matched_to_reservation": 0, + "matched_to_customer": 0, + "matched_to_hashed_customer": 0, + "unmatched": 0, + } + + # Get the conversion from the database with related data + result = await session.execute( + select(Conversion) + .where(Conversion.pms_reservation_id == pms_reservation_id) + .options(selectinload(Conversion.guest)) + ) + conversion = result.scalar_one_or_none() + + if not conversion: + _LOGGER.warning( + "Conversion not found for pms_reservation_id=%s during matching phase", + pms_reservation_id, + ) + return stats + + # Get conversion_guest if it exists (has the hashed data) + conversion_guest = conversion.guest + + # Extract hashed data from conversion_guest (already hashed) + hashed_first_name = None + hashed_last_name = None + hashed_email = None + + if conversion_guest: + hashed_first_name = conversion_guest.hashed_first_name + hashed_last_name = conversion_guest.hashed_last_name + hashed_email = conversion_guest.hashed_email + + # Perform matching using already-hashed data from database + match_result = await self._find_matching_entities( + conversion.advertising_campagne, + conversion.hotel_id, + conversion.reservation_date, + hashed_first_name, + hashed_last_name, + hashed_email, + conversion.advertising_partner, + session, + ) + + matched_reservation = match_result["reservation"] + matched_customer = match_result["customer"] + matched_hashed_customer = match_result["hashed_customer"] + + # Update the conversion with matched entities if found + if matched_reservation or matched_customer or matched_hashed_customer: + conversion.reservation_id = ( + matched_reservation.id if matched_reservation else None + ) + conversion.customer_id = ( + matched_customer.id if matched_customer else None + ) + conversion.hashed_customer_id = ( + matched_hashed_customer.id if matched_hashed_customer else None + ) + conversion.updated_at = datetime.now() + + # Update stats + if matched_reservation: + stats["matched_to_reservation"] = 1 + if matched_customer: + stats["matched_to_customer"] = 1 + if matched_hashed_customer: + stats["matched_to_hashed_customer"] = 1 + if not any([matched_reservation, matched_customer, matched_hashed_customer]): + stats["unmatched"] = 1 + + return stats