Hashed conversion matching and more. #12
@@ -203,19 +203,32 @@ class ConversionService:
|
||||
if self.session_maker:
|
||||
await session.close()
|
||||
|
||||
# Process active reservations
|
||||
# Process active reservations in two phases:
|
||||
# Phase 1: Create/update all conversion records
|
||||
# Phase 2: Match them to existing reservations/customers
|
||||
reservations = root.findall("reservation")
|
||||
stats["total_reservations"] = len(reservations)
|
||||
|
||||
if not reservations:
|
||||
return stats
|
||||
|
||||
# Use concurrent processing if supported, otherwise sequential
|
||||
# Phase 1: Create/update all conversions (no matching, no XML parsing beyond this point)
|
||||
if self.supports_concurrent:
|
||||
await self._process_reservations_concurrent(reservations, stats)
|
||||
else:
|
||||
await self._process_reservations_sequential(reservations, stats)
|
||||
|
||||
# Collect PMS reservation IDs from Phase 1 for Phase 2
|
||||
pms_reservation_ids = [res.get("id") for res in reservations]
|
||||
|
||||
# Phase 2: Match all conversions using database data only
|
||||
# No XML parsing, no re-hashing - complete separation of concerns
|
||||
# This also enables matching historical data that wasn't just created
|
||||
if self.supports_concurrent:
|
||||
await self._match_conversions_from_db_concurrent(pms_reservation_ids, stats)
|
||||
else:
|
||||
await self._match_conversions_from_db_sequential(pms_reservation_ids, stats)
|
||||
|
||||
return stats
|
||||
|
||||
async def _load_reservation_cache(self) -> None:
|
||||
@@ -332,6 +345,7 @@ class ConversionService:
|
||||
) -> None:
|
||||
"""Safely process a single reservation with semaphore and transaction management.
|
||||
|
||||
Phase 1: Creation/update only (no matching).
|
||||
In concurrent mode, creates its own session from SessionMaker.
|
||||
In sequential mode, uses the shared session.
|
||||
|
||||
@@ -351,26 +365,16 @@ class ConversionService:
|
||||
session = self.session
|
||||
|
||||
try:
|
||||
# Process reservation with this task's session
|
||||
reservation_stats = await self._process_reservation(
|
||||
# Phase 1: Create/update conversion record (no matching)
|
||||
reservation_stats = await self._create_or_update_conversion(
|
||||
reservation_elem, session
|
||||
)
|
||||
stats["total_daily_sales"] += reservation_stats["daily_sales_count"]
|
||||
stats["matched_to_reservation"] += reservation_stats.get(
|
||||
"matched_to_reservation", 0
|
||||
)
|
||||
stats["matched_to_customer"] += reservation_stats.get(
|
||||
"matched_to_customer", 0
|
||||
)
|
||||
stats["matched_to_hashed_customer"] += reservation_stats.get(
|
||||
"matched_to_hashed_customer", 0
|
||||
)
|
||||
stats["unmatched"] += reservation_stats.get("unmatched", 0)
|
||||
|
||||
# Commit this task's transaction
|
||||
await session.commit()
|
||||
_LOGGER.debug(
|
||||
"Successfully processed and committed reservation %s",
|
||||
"Successfully created/updated conversion for reservation %s",
|
||||
pms_reservation_id,
|
||||
)
|
||||
|
||||
@@ -420,27 +424,23 @@ class ConversionService:
|
||||
pms_reservation_id,
|
||||
)
|
||||
|
||||
async def _process_reservation(
|
||||
async def _create_or_update_conversion(
|
||||
self, reservation_elem: ET.Element, session: AsyncSession | None = None
|
||||
) -> dict[str, int]:
|
||||
"""Process a single reservation element and its daily sales.
|
||||
"""Create or update a conversion record from XML (Phase 1: no matching).
|
||||
|
||||
Args:
|
||||
reservation_elem: XML element to process
|
||||
session: AsyncSession to use. If None, uses self.session.
|
||||
In concurrent mode, each task passes its own session.
|
||||
|
||||
Returns statistics about what was matched.
|
||||
Returns statistics about daily sales processed.
|
||||
|
||||
"""
|
||||
if session is None:
|
||||
session = self.session
|
||||
stats = {
|
||||
"daily_sales_count": 0,
|
||||
"matched_to_reservation": 0,
|
||||
"matched_to_customer": 0,
|
||||
"matched_to_hashed_customer": 0,
|
||||
"unmatched": 0,
|
||||
}
|
||||
|
||||
# Extract reservation metadata
|
||||
@@ -760,26 +760,6 @@ class ConversionService:
|
||||
num_adults,
|
||||
)
|
||||
|
||||
# Now that conversion, conversion_guest, and conversion_room records exist,
|
||||
# perform matching using hashed guest data
|
||||
match_stats = await self._match_conversion(
|
||||
conversion,
|
||||
guest_first_name,
|
||||
guest_last_name,
|
||||
guest_email,
|
||||
advertising_campagne,
|
||||
advertising_partner,
|
||||
hotel_id,
|
||||
reservation_date,
|
||||
session,
|
||||
)
|
||||
|
||||
# Update stats
|
||||
stats["matched_to_reservation"] = match_stats["matched_to_reservation"]
|
||||
stats["matched_to_customer"] = match_stats["matched_to_customer"]
|
||||
stats["matched_to_hashed_customer"] = match_stats["matched_to_hashed_customer"]
|
||||
stats["unmatched"] = match_stats["unmatched"]
|
||||
|
||||
return stats
|
||||
|
||||
async def _match_conversion(
|
||||
@@ -1308,3 +1288,200 @@ class ConversionService:
|
||||
|
||||
# No single clear match found
|
||||
return None
|
||||
|
||||
async def _match_conversions_from_db_sequential(
|
||||
self, pms_reservation_ids: list[str], stats: dict[str, int]
|
||||
) -> None:
|
||||
"""Phase 2: Match conversions sequentially using database data only."""
|
||||
semaphore = asyncio.Semaphore(1) # Process one at a time
|
||||
async with asyncio.TaskGroup() as tg:
|
||||
for pms_id in pms_reservation_ids:
|
||||
tg.create_task(
|
||||
self._match_conversion_from_db_safe(pms_id, semaphore, stats)
|
||||
)
|
||||
|
||||
async def _match_conversions_from_db_concurrent(
|
||||
self, pms_reservation_ids: list[str], stats: dict[str, int]
|
||||
) -> None:
|
||||
"""Phase 2: Match conversions concurrently using database data only.
|
||||
|
||||
Each concurrent task gets its own independent database session
|
||||
from the SessionMaker.
|
||||
"""
|
||||
if not self.session_maker:
|
||||
_LOGGER.error(
|
||||
"Concurrent matching requested but SessionMaker not available. "
|
||||
"Falling back to sequential matching."
|
||||
)
|
||||
await self._match_conversions_from_db_sequential(pms_reservation_ids, stats)
|
||||
return
|
||||
|
||||
semaphore = asyncio.Semaphore(MAX_CONCURRENT_RESERVATIONS)
|
||||
async with asyncio.TaskGroup() as tg:
|
||||
for pms_id in pms_reservation_ids:
|
||||
tg.create_task(
|
||||
self._match_conversion_from_db_safe(pms_id, semaphore, stats)
|
||||
)
|
||||
|
||||
async def _match_conversion_from_db_safe(
|
||||
self,
|
||||
pms_reservation_id: str,
|
||||
semaphore: asyncio.Semaphore,
|
||||
stats: dict[str, int],
|
||||
) -> None:
|
||||
"""Phase 2: Safely match a conversion using only database data with transaction management.
|
||||
|
||||
In concurrent mode, creates its own session from SessionMaker.
|
||||
In sequential mode, uses the shared session.
|
||||
|
||||
Args:
|
||||
pms_reservation_id: PMS reservation ID to match
|
||||
semaphore: Semaphore to limit concurrent operations
|
||||
stats: Shared stats dictionary (thread-safe due to GIL)
|
||||
|
||||
"""
|
||||
async with semaphore:
|
||||
# In concurrent mode, create a new session for this task
|
||||
if self.session_maker:
|
||||
session = await self.session_maker.create_session()
|
||||
else:
|
||||
session = self.session
|
||||
|
||||
try:
|
||||
# Phase 2: Match conversion using only database data
|
||||
match_stats = await self._match_conversion_using_db_data(
|
||||
pms_reservation_id, session
|
||||
)
|
||||
stats["matched_to_reservation"] += match_stats.get(
|
||||
"matched_to_reservation", 0
|
||||
)
|
||||
stats["matched_to_customer"] += match_stats.get(
|
||||
"matched_to_customer", 0
|
||||
)
|
||||
stats["matched_to_hashed_customer"] += match_stats.get(
|
||||
"matched_to_hashed_customer", 0
|
||||
)
|
||||
stats["unmatched"] += match_stats.get("unmatched", 0)
|
||||
|
||||
# Commit this task's transaction
|
||||
await session.commit()
|
||||
_LOGGER.debug(
|
||||
"Successfully matched conversion for reservation %s",
|
||||
pms_reservation_id,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
# Rollback this task's transaction
|
||||
await session.rollback()
|
||||
_LOGGER.exception(
|
||||
"Error matching conversion for reservation %s: %s",
|
||||
pms_reservation_id,
|
||||
e,
|
||||
)
|
||||
stats["errors"] += 1
|
||||
finally:
|
||||
# Close the session if it was created by SessionMaker
|
||||
if self.session_maker:
|
||||
await session.close()
|
||||
|
||||
async def _match_conversion_using_db_data(
|
||||
self,
|
||||
pms_reservation_id: str,
|
||||
session: AsyncSession | None = None,
|
||||
) -> dict[str, int]:
|
||||
"""Phase 2: Match a conversion using only persisted database data.
|
||||
|
||||
This method reads both the conversion and conversion_guest from the database
|
||||
and uses their stored hashed data to match to existing reservations/customers.
|
||||
No XML parsing, no re-hashing - complete separation of concerns.
|
||||
|
||||
This enables:
|
||||
- Matching historical data that wasn't just created
|
||||
- Re-running matching logic independently
|
||||
- Consistent hashing (using already-hashed data from DB)
|
||||
|
||||
Args:
|
||||
pms_reservation_id: PMS reservation ID to match
|
||||
session: AsyncSession to use
|
||||
|
||||
Returns:
|
||||
Dictionary with match statistics
|
||||
"""
|
||||
if session is None:
|
||||
session = self.session
|
||||
|
||||
stats = {
|
||||
"matched_to_reservation": 0,
|
||||
"matched_to_customer": 0,
|
||||
"matched_to_hashed_customer": 0,
|
||||
"unmatched": 0,
|
||||
}
|
||||
|
||||
# Get the conversion from the database with related data
|
||||
result = await session.execute(
|
||||
select(Conversion)
|
||||
.where(Conversion.pms_reservation_id == pms_reservation_id)
|
||||
.options(selectinload(Conversion.guest))
|
||||
)
|
||||
conversion = result.scalar_one_or_none()
|
||||
|
||||
if not conversion:
|
||||
_LOGGER.warning(
|
||||
"Conversion not found for pms_reservation_id=%s during matching phase",
|
||||
pms_reservation_id,
|
||||
)
|
||||
return stats
|
||||
|
||||
# Get conversion_guest if it exists (has the hashed data)
|
||||
conversion_guest = conversion.guest
|
||||
|
||||
# Extract hashed data from conversion_guest (already hashed)
|
||||
hashed_first_name = None
|
||||
hashed_last_name = None
|
||||
hashed_email = None
|
||||
|
||||
if conversion_guest:
|
||||
hashed_first_name = conversion_guest.hashed_first_name
|
||||
hashed_last_name = conversion_guest.hashed_last_name
|
||||
hashed_email = conversion_guest.hashed_email
|
||||
|
||||
# Perform matching using already-hashed data from database
|
||||
match_result = await self._find_matching_entities(
|
||||
conversion.advertising_campagne,
|
||||
conversion.hotel_id,
|
||||
conversion.reservation_date,
|
||||
hashed_first_name,
|
||||
hashed_last_name,
|
||||
hashed_email,
|
||||
conversion.advertising_partner,
|
||||
session,
|
||||
)
|
||||
|
||||
matched_reservation = match_result["reservation"]
|
||||
matched_customer = match_result["customer"]
|
||||
matched_hashed_customer = match_result["hashed_customer"]
|
||||
|
||||
# Update the conversion with matched entities if found
|
||||
if matched_reservation or matched_customer or matched_hashed_customer:
|
||||
conversion.reservation_id = (
|
||||
matched_reservation.id if matched_reservation else None
|
||||
)
|
||||
conversion.customer_id = (
|
||||
matched_customer.id if matched_customer else None
|
||||
)
|
||||
conversion.hashed_customer_id = (
|
||||
matched_hashed_customer.id if matched_hashed_customer else None
|
||||
)
|
||||
conversion.updated_at = datetime.now()
|
||||
|
||||
# Update stats
|
||||
if matched_reservation:
|
||||
stats["matched_to_reservation"] = 1
|
||||
if matched_customer:
|
||||
stats["matched_to_customer"] = 1
|
||||
if matched_hashed_customer:
|
||||
stats["matched_to_hashed_customer"] = 1
|
||||
if not any([matched_reservation, matched_customer, matched_hashed_customer]):
|
||||
stats["unmatched"] = 1
|
||||
|
||||
return stats
|
||||
|
||||
Reference in New Issue
Block a user