Hashed conversion matching and more. #12

Merged
jonas merged 13 commits from hashed_conversion_matching into main 2025-11-19 19:40:07 +00:00
Showing only changes of commit 1396135208 - Show all commits

View File

@@ -213,17 +213,23 @@ class ConversionService:
return stats return stats
# Phase 1: Create/update all conversions (no matching, no XML parsing beyond this point) # Phase 1: Create/update all conversions (no matching, no XML parsing beyond this point)
# Returns list of successfully created pms_reservation_ids
if self.supports_concurrent: if self.supports_concurrent:
await self._process_reservations_concurrent(reservations, stats) pms_reservation_ids = await self._process_reservations_concurrent(reservations, stats)
else: else:
await self._process_reservations_sequential(reservations, stats) pms_reservation_ids = await self._process_reservations_sequential(reservations, stats)
# Collect PMS reservation IDs from Phase 1 for Phase 2 _LOGGER.debug(
pms_reservation_ids = [res.get("id") for res in reservations] "Phase 2: Found %d successfully created conversions out of %d total reservations",
len(pms_reservation_ids),
len(reservations),
)
# Phase 2: Match all conversions using database data only # Phase 2: Match all conversions using database data only
# Only match conversions that actually exist in the database
# No XML parsing, no re-hashing - complete separation of concerns # No XML parsing, no re-hashing - complete separation of concerns
# This also enables matching historical data that wasn't just created # This also enables matching historical data that wasn't just created
if pms_reservation_ids:
if self.supports_concurrent: if self.supports_concurrent:
await self._match_conversions_from_db_concurrent(pms_reservation_ids, stats) await self._match_conversions_from_db_concurrent(pms_reservation_ids, stats)
else: else:
@@ -305,44 +311,72 @@ class ConversionService:
async def _process_reservations_sequential( async def _process_reservations_sequential(
self, reservations: list, stats: dict[str, int] self, reservations: list, stats: dict[str, int]
) -> None: ) -> list[str]:
"""Process reservations one at a time (original behavior).""" """Process reservations one at a time (original behavior).
Returns:
List of pms_reservation_ids that were successfully created/updated
"""
semaphore = asyncio.Semaphore(1) # Process one at a time semaphore = asyncio.Semaphore(1) # Process one at a time
results = []
async with asyncio.TaskGroup() as tg: async with asyncio.TaskGroup() as tg:
tasks = []
for reservation in reservations: for reservation in reservations:
tg.create_task( task = tg.create_task(
self._process_reservation_safe(reservation, semaphore, stats) self._process_reservation_safe(reservation, semaphore, stats)
) )
tasks.append(task)
# Collect results from all tasks
for task in tasks:
pms_id = task.result()
if pms_id is not None:
results.append(pms_id)
return results
async def _process_reservations_concurrent( async def _process_reservations_concurrent(
self, reservations: list, stats: dict[str, int] self, reservations: list, stats: dict[str, int]
) -> None: ) -> list[str]:
"""Process reservations concurrently with semaphore limiting. """Process reservations concurrently with semaphore limiting.
Each concurrent task gets its own independent database session Each concurrent task gets its own independent database session
from the SessionMaker. from the SessionMaker.
Returns:
List of pms_reservation_ids that were successfully created/updated
""" """
if not self.session_maker: if not self.session_maker:
_LOGGER.error( _LOGGER.error(
"Concurrent processing requested but SessionMaker not available. " "Concurrent processing requested but SessionMaker not available. "
"Falling back to sequential processing." "Falling back to sequential processing."
) )
await self._process_reservations_sequential(reservations, stats) return await self._process_reservations_sequential(reservations, stats)
return
semaphore = asyncio.Semaphore(MAX_CONCURRENT_RESERVATIONS) semaphore = asyncio.Semaphore(MAX_CONCURRENT_RESERVATIONS)
results = []
async with asyncio.TaskGroup() as tg: async with asyncio.TaskGroup() as tg:
tasks = []
for reservation in reservations: for reservation in reservations:
tg.create_task( task = tg.create_task(
self._process_reservation_safe(reservation, semaphore, stats) self._process_reservation_safe(reservation, semaphore, stats)
) )
tasks.append(task)
# Collect results from all tasks
for task in tasks:
pms_id = task.result()
if pms_id is not None:
results.append(pms_id)
return results
async def _process_reservation_safe( async def _process_reservation_safe(
self, self,
reservation_elem: Any, reservation_elem: Any,
semaphore: asyncio.Semaphore, semaphore: asyncio.Semaphore,
stats: dict[str, int], stats: dict[str, int],
) -> None: ) -> str | None:
"""Safely process a single reservation with semaphore and transaction management. """Safely process a single reservation with semaphore and transaction management.
Phase 1: Creation/update only (no matching). Phase 1: Creation/update only (no matching).
@@ -354,6 +388,9 @@ class ConversionService:
semaphore: Semaphore to limit concurrent operations semaphore: Semaphore to limit concurrent operations
stats: Shared stats dictionary (thread-safe due to GIL) stats: Shared stats dictionary (thread-safe due to GIL)
Returns:
pms_reservation_id if successfully created/updated, None if error occurred
""" """
pms_reservation_id = reservation_elem.get("id") pms_reservation_id = reservation_elem.get("id")
@@ -377,6 +414,7 @@ class ConversionService:
"Successfully created/updated conversion for reservation %s", "Successfully created/updated conversion for reservation %s",
pms_reservation_id, pms_reservation_id,
) )
return pms_reservation_id
except Exception as e: except Exception as e:
# Rollback this task's transaction # Rollback this task's transaction
@@ -387,6 +425,7 @@ class ConversionService:
e, e,
) )
stats["errors"] += 1 stats["errors"] += 1
return None
finally: finally:
# Close the session if it was created by SessionMaker # Close the session if it was created by SessionMaker
if self.session_maker: if self.session_maker:
@@ -1349,19 +1388,9 @@ class ConversionService:
try: try:
# Phase 2: Match conversion using only database data # Phase 2: Match conversion using only database data
match_stats = await self._match_conversion_using_db_data( await self._match_conversion_using_db_data(
pms_reservation_id, session pms_reservation_id, session, stats
) )
stats["matched_to_reservation"] += match_stats.get(
"matched_to_reservation", 0
)
stats["matched_to_customer"] += match_stats.get(
"matched_to_customer", 0
)
stats["matched_to_hashed_customer"] += match_stats.get(
"matched_to_hashed_customer", 0
)
stats["unmatched"] += match_stats.get("unmatched", 0)
# Commit this task's transaction # Commit this task's transaction
await session.commit() await session.commit()
@@ -1388,7 +1417,8 @@ class ConversionService:
self, self,
pms_reservation_id: str, pms_reservation_id: str,
session: AsyncSession | None = None, session: AsyncSession | None = None,
) -> dict[str, int]: stats: dict[str, int] | None = None,
) -> None:
"""Phase 2: Match a conversion using only persisted database data. """Phase 2: Match a conversion using only persisted database data.
This method reads both the conversion and conversion_guest from the database This method reads both the conversion and conversion_guest from the database
@@ -1400,23 +1430,16 @@ class ConversionService:
- Re-running matching logic independently - Re-running matching logic independently
- Consistent hashing (using already-hashed data from DB) - Consistent hashing (using already-hashed data from DB)
Updates stats dictionary in-place if provided.
Args: Args:
pms_reservation_id: PMS reservation ID to match pms_reservation_id: PMS reservation ID to match
session: AsyncSession to use session: AsyncSession to use
stats: Shared stats dictionary to update (optional)
Returns:
Dictionary with match statistics
""" """
if session is None: if session is None:
session = self.session session = self.session
stats = {
"matched_to_reservation": 0,
"matched_to_customer": 0,
"matched_to_hashed_customer": 0,
"unmatched": 0,
}
# Get the conversion from the database with related data # Get the conversion from the database with related data
result = await session.execute( result = await session.execute(
select(Conversion) select(Conversion)
@@ -1426,11 +1449,12 @@ class ConversionService:
conversion = result.scalar_one_or_none() conversion = result.scalar_one_or_none()
if not conversion: if not conversion:
_LOGGER.warning( # This should be extremely rare since we filtered in process_conversion_xml
"Conversion not found for pms_reservation_id=%s during matching phase", _LOGGER.debug(
"Conversion not found for pms_reservation_id=%s (may have been deleted)",
pms_reservation_id, pms_reservation_id,
) )
return stats return
# Get conversion_guest if it exists (has the hashed data) # Get conversion_guest if it exists (has the hashed data)
conversion_guest = conversion.guest conversion_guest = conversion.guest
@@ -1474,14 +1498,13 @@ class ConversionService:
) )
conversion.updated_at = datetime.now() conversion.updated_at = datetime.now()
# Update stats # Update stats if provided
if stats is not None:
if matched_reservation: if matched_reservation:
stats["matched_to_reservation"] = 1 stats["matched_to_reservation"] += 1
if matched_customer: elif matched_customer:
stats["matched_to_customer"] = 1 stats["matched_to_customer"] += 1
if matched_hashed_customer: elif matched_hashed_customer:
stats["matched_to_hashed_customer"] = 1 stats["matched_to_hashed_customer"] += 1
if not any([matched_reservation, matched_customer, matched_hashed_customer]): else:
stats["unmatched"] = 1 stats["unmatched"] += 1
return stats