From 139613520844e769005226ab77229d998cee41cb Mon Sep 17 00:00:00 2001 From: Jonas Linter <{email_address}> Date: Wed, 19 Nov 2025 15:39:33 +0100 Subject: [PATCH] Seems to work now --- src/alpine_bits_python/conversion_service.py | 129 +++++++++++-------- 1 file changed, 76 insertions(+), 53 deletions(-) diff --git a/src/alpine_bits_python/conversion_service.py b/src/alpine_bits_python/conversion_service.py index 75a35e7..18d87c0 100644 --- a/src/alpine_bits_python/conversion_service.py +++ b/src/alpine_bits_python/conversion_service.py @@ -213,21 +213,27 @@ class ConversionService: return stats # Phase 1: Create/update all conversions (no matching, no XML parsing beyond this point) + # Returns list of successfully created pms_reservation_ids if self.supports_concurrent: - await self._process_reservations_concurrent(reservations, stats) + pms_reservation_ids = await self._process_reservations_concurrent(reservations, stats) else: - await self._process_reservations_sequential(reservations, stats) + pms_reservation_ids = await self._process_reservations_sequential(reservations, stats) - # Collect PMS reservation IDs from Phase 1 for Phase 2 - pms_reservation_ids = [res.get("id") for res in reservations] + _LOGGER.debug( + "Phase 2: Found %d successfully created conversions out of %d total reservations", + len(pms_reservation_ids), + len(reservations), + ) # Phase 2: Match all conversions using database data only + # Only match conversions that actually exist in the database # No XML parsing, no re-hashing - complete separation of concerns # This also enables matching historical data that wasn't just created - if self.supports_concurrent: - await self._match_conversions_from_db_concurrent(pms_reservation_ids, stats) - else: - await self._match_conversions_from_db_sequential(pms_reservation_ids, stats) + if pms_reservation_ids: + if self.supports_concurrent: + await self._match_conversions_from_db_concurrent(pms_reservation_ids, stats) + else: + await self._match_conversions_from_db_sequential(pms_reservation_ids, stats) return stats @@ -305,44 +311,72 @@ class ConversionService: async def _process_reservations_sequential( self, reservations: list, stats: dict[str, int] - ) -> None: - """Process reservations one at a time (original behavior).""" + ) -> list[str]: + """Process reservations one at a time (original behavior). + + Returns: + List of pms_reservation_ids that were successfully created/updated + """ semaphore = asyncio.Semaphore(1) # Process one at a time + results = [] async with asyncio.TaskGroup() as tg: + tasks = [] for reservation in reservations: - tg.create_task( + task = tg.create_task( self._process_reservation_safe(reservation, semaphore, stats) ) + tasks.append(task) + + # Collect results from all tasks + for task in tasks: + pms_id = task.result() + if pms_id is not None: + results.append(pms_id) + + return results async def _process_reservations_concurrent( self, reservations: list, stats: dict[str, int] - ) -> None: + ) -> list[str]: """Process reservations concurrently with semaphore limiting. Each concurrent task gets its own independent database session from the SessionMaker. + + Returns: + List of pms_reservation_ids that were successfully created/updated """ if not self.session_maker: _LOGGER.error( "Concurrent processing requested but SessionMaker not available. " "Falling back to sequential processing." ) - await self._process_reservations_sequential(reservations, stats) - return + return await self._process_reservations_sequential(reservations, stats) semaphore = asyncio.Semaphore(MAX_CONCURRENT_RESERVATIONS) + results = [] async with asyncio.TaskGroup() as tg: + tasks = [] for reservation in reservations: - tg.create_task( + task = tg.create_task( self._process_reservation_safe(reservation, semaphore, stats) ) + tasks.append(task) + + # Collect results from all tasks + for task in tasks: + pms_id = task.result() + if pms_id is not None: + results.append(pms_id) + + return results async def _process_reservation_safe( self, reservation_elem: Any, semaphore: asyncio.Semaphore, stats: dict[str, int], - ) -> None: + ) -> str | None: """Safely process a single reservation with semaphore and transaction management. Phase 1: Creation/update only (no matching). @@ -354,6 +388,9 @@ class ConversionService: semaphore: Semaphore to limit concurrent operations stats: Shared stats dictionary (thread-safe due to GIL) + Returns: + pms_reservation_id if successfully created/updated, None if error occurred + """ pms_reservation_id = reservation_elem.get("id") @@ -377,6 +414,7 @@ class ConversionService: "Successfully created/updated conversion for reservation %s", pms_reservation_id, ) + return pms_reservation_id except Exception as e: # Rollback this task's transaction @@ -387,6 +425,7 @@ class ConversionService: e, ) stats["errors"] += 1 + return None finally: # Close the session if it was created by SessionMaker if self.session_maker: @@ -1349,19 +1388,9 @@ class ConversionService: try: # Phase 2: Match conversion using only database data - match_stats = await self._match_conversion_using_db_data( - pms_reservation_id, session + await self._match_conversion_using_db_data( + pms_reservation_id, session, stats ) - stats["matched_to_reservation"] += match_stats.get( - "matched_to_reservation", 0 - ) - stats["matched_to_customer"] += match_stats.get( - "matched_to_customer", 0 - ) - stats["matched_to_hashed_customer"] += match_stats.get( - "matched_to_hashed_customer", 0 - ) - stats["unmatched"] += match_stats.get("unmatched", 0) # Commit this task's transaction await session.commit() @@ -1388,7 +1417,8 @@ class ConversionService: self, pms_reservation_id: str, session: AsyncSession | None = None, - ) -> dict[str, int]: + stats: dict[str, int] | None = None, + ) -> None: """Phase 2: Match a conversion using only persisted database data. This method reads both the conversion and conversion_guest from the database @@ -1400,23 +1430,16 @@ class ConversionService: - Re-running matching logic independently - Consistent hashing (using already-hashed data from DB) + Updates stats dictionary in-place if provided. + Args: pms_reservation_id: PMS reservation ID to match session: AsyncSession to use - - Returns: - Dictionary with match statistics + stats: Shared stats dictionary to update (optional) """ if session is None: session = self.session - stats = { - "matched_to_reservation": 0, - "matched_to_customer": 0, - "matched_to_hashed_customer": 0, - "unmatched": 0, - } - # Get the conversion from the database with related data result = await session.execute( select(Conversion) @@ -1426,11 +1449,12 @@ class ConversionService: conversion = result.scalar_one_or_none() if not conversion: - _LOGGER.warning( - "Conversion not found for pms_reservation_id=%s during matching phase", + # This should be extremely rare since we filtered in process_conversion_xml + _LOGGER.debug( + "Conversion not found for pms_reservation_id=%s (may have been deleted)", pms_reservation_id, ) - return stats + return # Get conversion_guest if it exists (has the hashed data) conversion_guest = conversion.guest @@ -1474,14 +1498,13 @@ class ConversionService: ) conversion.updated_at = datetime.now() - # Update stats - if matched_reservation: - stats["matched_to_reservation"] = 1 - if matched_customer: - stats["matched_to_customer"] = 1 - if matched_hashed_customer: - stats["matched_to_hashed_customer"] = 1 - if not any([matched_reservation, matched_customer, matched_hashed_customer]): - stats["unmatched"] = 1 - - return stats + # Update stats if provided + if stats is not None: + if matched_reservation: + stats["matched_to_reservation"] += 1 + elif matched_customer: + stats["matched_to_customer"] += 1 + elif matched_hashed_customer: + stats["matched_to_hashed_customer"] += 1 + else: + stats["unmatched"] += 1