Hashed conversion matching and more. #12
@@ -7,7 +7,8 @@ from datetime import UTC, datetime
|
|||||||
from decimal import Decimal
|
from decimal import Decimal
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from sqlalchemy import or_, select
|
from sqlalchemy import insert, or_, select
|
||||||
|
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
||||||
from sqlalchemy.ext.asyncio import AsyncSession
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
from sqlalchemy.orm import selectinload
|
from sqlalchemy.orm import selectinload
|
||||||
|
|
||||||
@@ -76,76 +77,138 @@ class ConversionService:
|
|||||||
f"session must be AsyncSession or SessionMaker, got {type(session)}"
|
f"session must be AsyncSession or SessionMaker, got {type(session)}"
|
||||||
)
|
)
|
||||||
|
|
||||||
async def _get_or_create_conversion_guest(
|
async def _extract_unique_guests_from_xml(
|
||||||
self,
|
self, reservations: list
|
||||||
hotel_id: str,
|
) -> dict[tuple[str, str | None], dict]:
|
||||||
guest_id: str | None,
|
"""Extract and deduplicate all guest data from XML reservations.
|
||||||
guest_first_name: str | None,
|
|
||||||
guest_last_name: str | None,
|
|
||||||
guest_email: str | None,
|
|
||||||
guest_country_code: str | None,
|
|
||||||
guest_birth_date,
|
|
||||||
session: AsyncSession,
|
|
||||||
) -> ConversionGuest | None:
|
|
||||||
"""Get or create a ConversionGuest record for the given guest data.
|
|
||||||
|
|
||||||
Uses (hotel_id, guest_id) as the natural key to identify a guest.
|
Phase 0: Single pass through XML to collect all unique guests.
|
||||||
If a guest with this key exists, updates it with new data.
|
Uses (hotel_id, guest_id) as the key for deduplication.
|
||||||
If not, creates a new guest record.
|
|
||||||
|
|
||||||
NOTE: There is no database-level unique constraint on (hotel_id, guest_id),
|
Args:
|
||||||
so multiple ConversionGuest records can exist with the same key. This method
|
reservations: List of XML reservation elements
|
||||||
uses first() instead of scalar_one_or_none() to handle this gracefully and
|
|
||||||
update the most recently created record when duplicates exist.
|
|
||||||
|
|
||||||
Returns the ConversionGuest record, or None if no guest data provided.
|
Returns:
|
||||||
|
Dictionary mapping (hotel_id, guest_id) to guest data dict
|
||||||
"""
|
"""
|
||||||
# Don't create a ConversionGuest if we have no guest information
|
guest_data_by_key = {}
|
||||||
if not any(
|
|
||||||
[guest_first_name, guest_last_name, guest_email, guest_country_code, guest_birth_date]
|
|
||||||
):
|
|
||||||
return None
|
|
||||||
|
|
||||||
now = datetime.now(UTC)
|
now = datetime.now(UTC)
|
||||||
|
|
||||||
# Try to find existing guest by (hotel_id, guest_id)
|
for reservation_elem in reservations:
|
||||||
if guest_id:
|
hotel_id = reservation_elem.get("hotelID")
|
||||||
result = await session.execute(
|
guest_elem = reservation_elem.find("guest")
|
||||||
select(ConversionGuest)
|
|
||||||
.where(
|
if guest_elem is None:
|
||||||
(ConversionGuest.hotel_id == hotel_id)
|
_LOGGER.debug("No guest element found, skipping reservation %s (will be created with guest_id=None in Phase 2)", reservation_elem.get("id"))
|
||||||
& (ConversionGuest.guest_id == guest_id)
|
continue
|
||||||
)
|
|
||||||
.order_by(ConversionGuest.last_seen.desc()) # Get most recently updated
|
guest_id = guest_elem.get("id")
|
||||||
|
guest_first_name = guest_elem.get("firstName")
|
||||||
|
guest_last_name = guest_elem.get("lastName")
|
||||||
|
guest_email = guest_elem.get("email")
|
||||||
|
guest_country_code = guest_elem.get("countryCode")
|
||||||
|
guest_birth_date_str = guest_elem.get("dateOfBirth")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
guest_birth_date = None
|
||||||
|
if guest_birth_date_str:
|
||||||
|
try:
|
||||||
|
guest_birth_date = datetime.strptime(guest_birth_date_str, "%Y-%m-%d").date()
|
||||||
|
except ValueError:
|
||||||
|
_LOGGER.warning("Invalid birth date format: %s", guest_birth_date_str)
|
||||||
|
|
||||||
|
key = (hotel_id, guest_id)
|
||||||
|
|
||||||
|
# Store guest data by key (will keep the last occurrence from XML)
|
||||||
|
guest_data_by_key[key] = {
|
||||||
|
"hotel_id": hotel_id,
|
||||||
|
"guest_id": guest_id,
|
||||||
|
"guest_first_name": guest_first_name,
|
||||||
|
"guest_last_name": guest_last_name,
|
||||||
|
"guest_email": guest_email,
|
||||||
|
"guest_country_code": guest_country_code,
|
||||||
|
"guest_birth_date": guest_birth_date,
|
||||||
|
"now": now,
|
||||||
|
}
|
||||||
|
|
||||||
|
return guest_data_by_key
|
||||||
|
|
||||||
|
async def _bulk_upsert_guests(
|
||||||
|
self, session: AsyncSession, guest_data_by_key: dict[tuple[str, str | None], dict]
|
||||||
|
) -> None:
|
||||||
|
"""Bulk upsert all unique guests to database using PostgreSQL ON CONFLICT.
|
||||||
|
|
||||||
|
Phase 1: Atomic upsert of all guests extracted in Phase 0.
|
||||||
|
Uses ON CONFLICT DO UPDATE to handle duplicates safely without race conditions.
|
||||||
|
Processes in batches to avoid overwhelming the database with large XML files.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
session: AsyncSession to use
|
||||||
|
guest_data_by_key: Dictionary mapping (hotel_id, guest_id) to guest data
|
||||||
|
"""
|
||||||
|
if not guest_data_by_key:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Process in batches to avoid overwhelming the database
|
||||||
|
batch_size = 1000
|
||||||
|
items = list(guest_data_by_key.items())
|
||||||
|
|
||||||
|
for batch_start in range(0, len(items), batch_size):
|
||||||
|
batch_end = min(batch_start + batch_size, len(items))
|
||||||
|
batch_items = items[batch_start:batch_end]
|
||||||
|
|
||||||
|
# Prepare list of values for this batch
|
||||||
|
values_list = []
|
||||||
|
for (hotel_id, guest_id), guest_data in batch_items:
|
||||||
|
now = guest_data["now"]
|
||||||
|
values_list.append({
|
||||||
|
"hotel_id": guest_data["hotel_id"],
|
||||||
|
"guest_id": guest_data["guest_id"],
|
||||||
|
"guest_first_name": guest_data["guest_first_name"],
|
||||||
|
"guest_last_name": guest_data["guest_last_name"],
|
||||||
|
"guest_email": guest_data["guest_email"],
|
||||||
|
"guest_country_code": guest_data["guest_country_code"],
|
||||||
|
"guest_birth_date": guest_data["guest_birth_date"],
|
||||||
|
"hashed_first_name": ConversionGuest._normalize_and_hash(guest_data["guest_first_name"]),
|
||||||
|
"hashed_last_name": ConversionGuest._normalize_and_hash(guest_data["guest_last_name"]),
|
||||||
|
"hashed_email": ConversionGuest._normalize_and_hash(guest_data["guest_email"]),
|
||||||
|
"hashed_country_code": ConversionGuest._normalize_and_hash(guest_data["guest_country_code"]),
|
||||||
|
"hashed_birth_date": ConversionGuest._normalize_and_hash(
|
||||||
|
guest_data["guest_birth_date"].isoformat() if guest_data["guest_birth_date"] else None
|
||||||
|
),
|
||||||
|
"is_regular": False,
|
||||||
|
"first_seen": now,
|
||||||
|
"last_seen": now,
|
||||||
|
})
|
||||||
|
|
||||||
|
# Use PostgreSQL ON CONFLICT DO UPDATE for atomic upsert
|
||||||
|
stmt = pg_insert(ConversionGuest).values(values_list)
|
||||||
|
stmt = stmt.on_conflict_do_update(
|
||||||
|
index_elements=["hotel_id", "guest_id"],
|
||||||
|
set_={
|
||||||
|
"guest_first_name": stmt.excluded.guest_first_name,
|
||||||
|
"guest_last_name": stmt.excluded.guest_last_name,
|
||||||
|
"guest_email": stmt.excluded.guest_email,
|
||||||
|
"guest_country_code": stmt.excluded.guest_country_code,
|
||||||
|
"guest_birth_date": stmt.excluded.guest_birth_date,
|
||||||
|
"hashed_first_name": stmt.excluded.hashed_first_name,
|
||||||
|
"hashed_last_name": stmt.excluded.hashed_last_name,
|
||||||
|
"hashed_email": stmt.excluded.hashed_email,
|
||||||
|
"hashed_country_code": stmt.excluded.hashed_country_code,
|
||||||
|
"hashed_birth_date": stmt.excluded.hashed_birth_date,
|
||||||
|
"last_seen": stmt.excluded.last_seen,
|
||||||
|
},
|
||||||
)
|
)
|
||||||
existing_guest = result.scalars().first()
|
|
||||||
|
|
||||||
if existing_guest:
|
await session.execute(stmt)
|
||||||
# Update with new data
|
|
||||||
existing_guest.update_from_conversion_data(
|
|
||||||
guest_first_name,
|
|
||||||
guest_last_name,
|
|
||||||
guest_email,
|
|
||||||
guest_country_code,
|
|
||||||
guest_birth_date,
|
|
||||||
now,
|
|
||||||
)
|
|
||||||
return existing_guest
|
|
||||||
|
|
||||||
# Create new ConversionGuest
|
_LOGGER.info(
|
||||||
new_guest = ConversionGuest.create_from_conversion_data(
|
"Phase 1: Upserted batch %d-%d of %d guests",
|
||||||
hotel_id=hotel_id,
|
batch_start + 1,
|
||||||
guest_id=guest_id,
|
batch_end,
|
||||||
guest_first_name=guest_first_name,
|
len(items),
|
||||||
guest_last_name=guest_last_name,
|
)
|
||||||
guest_email=guest_email,
|
|
||||||
guest_country_code=guest_country_code,
|
|
||||||
guest_birth_date=guest_birth_date,
|
|
||||||
now=now,
|
|
||||||
)
|
|
||||||
session.add(new_guest)
|
|
||||||
await session.flush() # Ensure the guest has an ID
|
|
||||||
return new_guest
|
|
||||||
|
|
||||||
async def process_conversion_xml(self, xml_content: str) -> dict[str, Any]:
|
async def process_conversion_xml(self, xml_content: str) -> dict[str, Any]:
|
||||||
"""Parse conversion XML and save daily sales data to database.
|
"""Parse conversion XML and save daily sales data to database.
|
||||||
@@ -203,33 +266,69 @@ class ConversionService:
|
|||||||
if self.session_maker:
|
if self.session_maker:
|
||||||
await session.close()
|
await session.close()
|
||||||
|
|
||||||
# Process active reservations in two phases:
|
# Process active reservations in four phases:
|
||||||
# Phase 1: Create/update all conversion records
|
# Phase 0: Extract and deduplicate all guest data from XML
|
||||||
# Phase 2: Match them to existing reservations/customers
|
# Phase 1: Bulk upsert all unique guests to DB (no race conditions)
|
||||||
|
# Phase 2: Create/update all conversion records (ConversionGuests already exist)
|
||||||
|
# Phase 3: Match them to existing reservations/customers
|
||||||
reservations = root.findall("reservation")
|
reservations = root.findall("reservation")
|
||||||
stats["total_reservations"] = len(reservations)
|
stats["total_reservations"] = len(reservations)
|
||||||
|
|
||||||
if not reservations:
|
if not reservations:
|
||||||
return stats
|
return stats
|
||||||
|
|
||||||
# Phase 1: Create/update all conversions (no matching, no XML parsing beyond this point)
|
_LOGGER.info("Processing %d reservations in xml", len(reservations))
|
||||||
|
|
||||||
|
# Phase 0: Extract and deduplicate all guest data from XML
|
||||||
|
_LOGGER.debug("Phase 0: Extracting and deduplicating guest data from XML")
|
||||||
|
guest_data_by_key = await self._extract_unique_guests_from_xml(reservations)
|
||||||
|
_LOGGER.info(
|
||||||
|
"Phase 0: Extracted %d unique guests from %d reservations",
|
||||||
|
len(guest_data_by_key),
|
||||||
|
len(reservations),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Phase 1: Bulk upsert all unique guests to database
|
||||||
|
if guest_data_by_key:
|
||||||
|
_LOGGER.debug("Phase 1: Bulk upserting %d unique guests to database", len(guest_data_by_key))
|
||||||
|
if self.session_maker:
|
||||||
|
session = await self.session_maker.create_session()
|
||||||
|
else:
|
||||||
|
session = self.session
|
||||||
|
|
||||||
|
try:
|
||||||
|
await self._bulk_upsert_guests(session, guest_data_by_key)
|
||||||
|
await session.commit()
|
||||||
|
_LOGGER.info("Phase 1: Successfully upserted %d guests", len(guest_data_by_key))
|
||||||
|
except Exception as e:
|
||||||
|
await session.rollback()
|
||||||
|
_LOGGER.exception("Phase 1: Error during bulk guest upsert: %s", e)
|
||||||
|
stats["errors"] += len(guest_data_by_key)
|
||||||
|
return stats
|
||||||
|
finally:
|
||||||
|
if self.session_maker:
|
||||||
|
await session.close()
|
||||||
|
|
||||||
|
# Phase 2: Create/update all conversions (no matching, ConversionGuests already exist)
|
||||||
# Returns list of successfully created pms_reservation_ids
|
# Returns list of successfully created pms_reservation_ids
|
||||||
|
_LOGGER.debug("Phase 2: Creating/updating conversions")
|
||||||
if self.supports_concurrent:
|
if self.supports_concurrent:
|
||||||
pms_reservation_ids = await self._process_reservations_concurrent(reservations, stats)
|
pms_reservation_ids = await self._process_reservations_concurrent(reservations, stats)
|
||||||
else:
|
else:
|
||||||
pms_reservation_ids = await self._process_reservations_sequential(reservations, stats)
|
pms_reservation_ids = await self._process_reservations_sequential(reservations, stats)
|
||||||
|
|
||||||
_LOGGER.debug(
|
_LOGGER.debug(
|
||||||
"Phase 2: Found %d successfully created conversions out of %d total reservations",
|
"Phase 3: Found %d successfully created conversions out of %d total reservations",
|
||||||
len(pms_reservation_ids),
|
len(pms_reservation_ids),
|
||||||
len(reservations),
|
len(reservations),
|
||||||
)
|
)
|
||||||
|
|
||||||
# Phase 2: Match all conversions using database data only
|
# Phase 3: Match all conversions using database data only
|
||||||
# Only match conversions that actually exist in the database
|
# Only match conversions that actually exist in the database
|
||||||
# No XML parsing, no re-hashing - complete separation of concerns
|
# No XML parsing, no re-hashing - complete separation of concerns
|
||||||
# This also enables matching historical data that wasn't just created
|
# This also enables matching historical data that wasn't just created
|
||||||
if pms_reservation_ids:
|
if pms_reservation_ids:
|
||||||
|
_LOGGER.debug("Phase 3: Matching conversions to reservations/customers")
|
||||||
if self.supports_concurrent:
|
if self.supports_concurrent:
|
||||||
await self._match_conversions_from_db_concurrent(pms_reservation_ids, stats)
|
await self._match_conversions_from_db_concurrent(pms_reservation_ids, stats)
|
||||||
else:
|
else:
|
||||||
@@ -319,13 +418,24 @@ class ConversionService:
|
|||||||
"""
|
"""
|
||||||
semaphore = asyncio.Semaphore(1) # Process one at a time
|
semaphore = asyncio.Semaphore(1) # Process one at a time
|
||||||
results = []
|
results = []
|
||||||
async with asyncio.TaskGroup() as tg:
|
tasks = []
|
||||||
tasks = []
|
|
||||||
for reservation in reservations:
|
try:
|
||||||
task = tg.create_task(
|
async with asyncio.TaskGroup() as tg:
|
||||||
self._process_reservation_safe(reservation, semaphore, stats)
|
for reservation in reservations:
|
||||||
)
|
task = tg.create_task(
|
||||||
tasks.append(task)
|
self._process_reservation_safe(reservation, semaphore, stats)
|
||||||
|
)
|
||||||
|
tasks.append(task)
|
||||||
|
except ExceptionGroup as eg:
|
||||||
|
# Fail fast: cancel all remaining tasks and re-raise the first exception
|
||||||
|
for task in tasks:
|
||||||
|
if not task.done():
|
||||||
|
task.cancel()
|
||||||
|
# Re-raise the first exception from the group
|
||||||
|
if eg.exceptions:
|
||||||
|
raise eg.exceptions[0] from eg
|
||||||
|
raise
|
||||||
|
|
||||||
# Collect results from all tasks
|
# Collect results from all tasks
|
||||||
for task in tasks:
|
for task in tasks:
|
||||||
@@ -355,13 +465,24 @@ class ConversionService:
|
|||||||
|
|
||||||
semaphore = asyncio.Semaphore(MAX_CONCURRENT_RESERVATIONS)
|
semaphore = asyncio.Semaphore(MAX_CONCURRENT_RESERVATIONS)
|
||||||
results = []
|
results = []
|
||||||
async with asyncio.TaskGroup() as tg:
|
tasks = []
|
||||||
tasks = []
|
|
||||||
for reservation in reservations:
|
try:
|
||||||
task = tg.create_task(
|
async with asyncio.TaskGroup() as tg:
|
||||||
self._process_reservation_safe(reservation, semaphore, stats)
|
for reservation in reservations:
|
||||||
)
|
task = tg.create_task(
|
||||||
tasks.append(task)
|
self._process_reservation_safe(reservation, semaphore, stats)
|
||||||
|
)
|
||||||
|
tasks.append(task)
|
||||||
|
except ExceptionGroup as eg:
|
||||||
|
# Fail fast: cancel all remaining tasks and re-raise the first exception
|
||||||
|
for task in tasks:
|
||||||
|
if not task.done():
|
||||||
|
task.cancel()
|
||||||
|
# Re-raise the first exception from the group
|
||||||
|
if eg.exceptions:
|
||||||
|
raise eg.exceptions[0] from eg
|
||||||
|
raise
|
||||||
|
|
||||||
# Collect results from all tasks
|
# Collect results from all tasks
|
||||||
for task in tasks:
|
for task in tasks:
|
||||||
@@ -548,6 +669,8 @@ class ConversionService:
|
|||||||
)
|
)
|
||||||
return stats
|
return stats
|
||||||
|
|
||||||
|
# ConversionGuests have already been bulk-upserted in Phase 1,
|
||||||
|
# so we can safely create/update conversions now
|
||||||
# Check if conversion already exists (upsert logic)
|
# Check if conversion already exists (upsert logic)
|
||||||
existing_result = await session.execute(
|
existing_result = await session.execute(
|
||||||
select(Conversion).where(
|
select(Conversion).where(
|
||||||
@@ -609,21 +732,6 @@ class ConversionService:
|
|||||||
# Flush to ensure conversion has an ID before creating room reservations
|
# Flush to ensure conversion has an ID before creating room reservations
|
||||||
await session.flush()
|
await session.flush()
|
||||||
|
|
||||||
# Create or update ConversionGuest and link it to the conversion
|
|
||||||
# The conversion is linked to ConversionGuest via composite FK (hotel_id, guest_id)
|
|
||||||
# So we just need to ensure ConversionGuest exists - the FK is already set via hotel_id + guest_id
|
|
||||||
conversion_guest = await self._get_or_create_conversion_guest(
|
|
||||||
hotel_id=hotel_id,
|
|
||||||
guest_id=guest_id,
|
|
||||||
guest_first_name=guest_first_name,
|
|
||||||
guest_last_name=guest_last_name,
|
|
||||||
guest_email=guest_email,
|
|
||||||
guest_country_code=guest_country_code,
|
|
||||||
guest_birth_date=guest_birth_date,
|
|
||||||
session=session,
|
|
||||||
)
|
|
||||||
# guest_id is already set on conversion, so the composite FK relationship is established
|
|
||||||
|
|
||||||
# Batch-load existing room reservations to avoid N+1 queries
|
# Batch-load existing room reservations to avoid N+1 queries
|
||||||
room_numbers = [
|
room_numbers = [
|
||||||
rm.get("roomNumber") for rm in room_reservations.findall("roomReservation")
|
rm.get("roomNumber") for rm in room_reservations.findall("roomReservation")
|
||||||
@@ -724,15 +832,6 @@ class ConversionService:
|
|||||||
# Check if room reservation already exists using batch-loaded data
|
# Check if room reservation already exists using batch-loaded data
|
||||||
existing_room_reservation = existing_rooms.get(pms_hotel_reservation_id)
|
existing_room_reservation = existing_rooms.get(pms_hotel_reservation_id)
|
||||||
|
|
||||||
if total_revenue > 0 and (
|
|
||||||
guest_first_name is None
|
|
||||||
and guest_last_name is None
|
|
||||||
and guest_email is None
|
|
||||||
):
|
|
||||||
_LOGGER.info(
|
|
||||||
"Guest info missing but total revenue > 0 for PMS ID %s",
|
|
||||||
pms_reservation_id,
|
|
||||||
)
|
|
||||||
|
|
||||||
if existing_room_reservation:
|
if existing_room_reservation:
|
||||||
# Update existing room reservation with all fields
|
# Update existing room reservation with all fields
|
||||||
|
|||||||
Reference in New Issue
Block a user