Seems to work now

This commit is contained in:
Jonas Linter
2025-11-19 15:39:33 +01:00
parent d27e31b0c1
commit 8547326ffa

View File

@@ -213,21 +213,27 @@ class ConversionService:
return stats
# Phase 1: Create/update all conversions (no matching, no XML parsing beyond this point)
# Returns list of successfully created pms_reservation_ids
if self.supports_concurrent:
await self._process_reservations_concurrent(reservations, stats)
pms_reservation_ids = await self._process_reservations_concurrent(reservations, stats)
else:
await self._process_reservations_sequential(reservations, stats)
pms_reservation_ids = await self._process_reservations_sequential(reservations, stats)
# Collect PMS reservation IDs from Phase 1 for Phase 2
pms_reservation_ids = [res.get("id") for res in reservations]
_LOGGER.debug(
"Phase 2: Found %d successfully created conversions out of %d total reservations",
len(pms_reservation_ids),
len(reservations),
)
# Phase 2: Match all conversions using database data only
# Only match conversions that actually exist in the database
# No XML parsing, no re-hashing - complete separation of concerns
# This also enables matching historical data that wasn't just created
if self.supports_concurrent:
await self._match_conversions_from_db_concurrent(pms_reservation_ids, stats)
else:
await self._match_conversions_from_db_sequential(pms_reservation_ids, stats)
if pms_reservation_ids:
if self.supports_concurrent:
await self._match_conversions_from_db_concurrent(pms_reservation_ids, stats)
else:
await self._match_conversions_from_db_sequential(pms_reservation_ids, stats)
return stats
@@ -305,44 +311,72 @@ class ConversionService:
async def _process_reservations_sequential(
self, reservations: list, stats: dict[str, int]
) -> None:
"""Process reservations one at a time (original behavior)."""
) -> list[str]:
"""Process reservations one at a time (original behavior).
Returns:
List of pms_reservation_ids that were successfully created/updated
"""
semaphore = asyncio.Semaphore(1) # Process one at a time
results = []
async with asyncio.TaskGroup() as tg:
tasks = []
for reservation in reservations:
tg.create_task(
task = tg.create_task(
self._process_reservation_safe(reservation, semaphore, stats)
)
tasks.append(task)
# Collect results from all tasks
for task in tasks:
pms_id = task.result()
if pms_id is not None:
results.append(pms_id)
return results
async def _process_reservations_concurrent(
self, reservations: list, stats: dict[str, int]
) -> None:
) -> list[str]:
"""Process reservations concurrently with semaphore limiting.
Each concurrent task gets its own independent database session
from the SessionMaker.
Returns:
List of pms_reservation_ids that were successfully created/updated
"""
if not self.session_maker:
_LOGGER.error(
"Concurrent processing requested but SessionMaker not available. "
"Falling back to sequential processing."
)
await self._process_reservations_sequential(reservations, stats)
return
return await self._process_reservations_sequential(reservations, stats)
semaphore = asyncio.Semaphore(MAX_CONCURRENT_RESERVATIONS)
results = []
async with asyncio.TaskGroup() as tg:
tasks = []
for reservation in reservations:
tg.create_task(
task = tg.create_task(
self._process_reservation_safe(reservation, semaphore, stats)
)
tasks.append(task)
# Collect results from all tasks
for task in tasks:
pms_id = task.result()
if pms_id is not None:
results.append(pms_id)
return results
async def _process_reservation_safe(
self,
reservation_elem: Any,
semaphore: asyncio.Semaphore,
stats: dict[str, int],
) -> None:
) -> str | None:
"""Safely process a single reservation with semaphore and transaction management.
Phase 1: Creation/update only (no matching).
@@ -354,6 +388,9 @@ class ConversionService:
semaphore: Semaphore to limit concurrent operations
stats: Shared stats dictionary (thread-safe due to GIL)
Returns:
pms_reservation_id if successfully created/updated, None if error occurred
"""
pms_reservation_id = reservation_elem.get("id")
@@ -377,6 +414,7 @@ class ConversionService:
"Successfully created/updated conversion for reservation %s",
pms_reservation_id,
)
return pms_reservation_id
except Exception as e:
# Rollback this task's transaction
@@ -387,6 +425,7 @@ class ConversionService:
e,
)
stats["errors"] += 1
return None
finally:
# Close the session if it was created by SessionMaker
if self.session_maker:
@@ -1349,19 +1388,9 @@ class ConversionService:
try:
# Phase 2: Match conversion using only database data
match_stats = await self._match_conversion_using_db_data(
pms_reservation_id, session
await self._match_conversion_using_db_data(
pms_reservation_id, session, stats
)
stats["matched_to_reservation"] += match_stats.get(
"matched_to_reservation", 0
)
stats["matched_to_customer"] += match_stats.get(
"matched_to_customer", 0
)
stats["matched_to_hashed_customer"] += match_stats.get(
"matched_to_hashed_customer", 0
)
stats["unmatched"] += match_stats.get("unmatched", 0)
# Commit this task's transaction
await session.commit()
@@ -1388,7 +1417,8 @@ class ConversionService:
self,
pms_reservation_id: str,
session: AsyncSession | None = None,
) -> dict[str, int]:
stats: dict[str, int] | None = None,
) -> None:
"""Phase 2: Match a conversion using only persisted database data.
This method reads both the conversion and conversion_guest from the database
@@ -1400,23 +1430,16 @@ class ConversionService:
- Re-running matching logic independently
- Consistent hashing (using already-hashed data from DB)
Updates stats dictionary in-place if provided.
Args:
pms_reservation_id: PMS reservation ID to match
session: AsyncSession to use
Returns:
Dictionary with match statistics
stats: Shared stats dictionary to update (optional)
"""
if session is None:
session = self.session
stats = {
"matched_to_reservation": 0,
"matched_to_customer": 0,
"matched_to_hashed_customer": 0,
"unmatched": 0,
}
# Get the conversion from the database with related data
result = await session.execute(
select(Conversion)
@@ -1426,11 +1449,12 @@ class ConversionService:
conversion = result.scalar_one_or_none()
if not conversion:
_LOGGER.warning(
"Conversion not found for pms_reservation_id=%s during matching phase",
# This should be extremely rare since we filtered in process_conversion_xml
_LOGGER.debug(
"Conversion not found for pms_reservation_id=%s (may have been deleted)",
pms_reservation_id,
)
return stats
return
# Get conversion_guest if it exists (has the hashed data)
conversion_guest = conversion.guest
@@ -1474,14 +1498,13 @@ class ConversionService:
)
conversion.updated_at = datetime.now()
# Update stats
if matched_reservation:
stats["matched_to_reservation"] = 1
if matched_customer:
stats["matched_to_customer"] = 1
if matched_hashed_customer:
stats["matched_to_hashed_customer"] = 1
if not any([matched_reservation, matched_customer, matched_hashed_customer]):
stats["unmatched"] = 1
return stats
# Update stats if provided
if stats is not None:
if matched_reservation:
stats["matched_to_reservation"] += 1
elif matched_customer:
stats["matched_to_customer"] += 1
elif matched_hashed_customer:
stats["matched_to_hashed_customer"] += 1
else:
stats["unmatched"] += 1