Finally it works

This commit is contained in:
Jonas Linter
2025-11-19 17:27:47 +01:00
parent 0854352726
commit 93207c3877

View File

@@ -7,7 +7,8 @@ from datetime import UTC, datetime
from decimal import Decimal from decimal import Decimal
from typing import Any from typing import Any
from sqlalchemy import or_, select from sqlalchemy import insert, or_, select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload from sqlalchemy.orm import selectinload
@@ -76,76 +77,138 @@ class ConversionService:
f"session must be AsyncSession or SessionMaker, got {type(session)}" f"session must be AsyncSession or SessionMaker, got {type(session)}"
) )
async def _get_or_create_conversion_guest( async def _extract_unique_guests_from_xml(
self, self, reservations: list
hotel_id: str, ) -> dict[tuple[str, str | None], dict]:
guest_id: str | None, """Extract and deduplicate all guest data from XML reservations.
guest_first_name: str | None,
guest_last_name: str | None,
guest_email: str | None,
guest_country_code: str | None,
guest_birth_date,
session: AsyncSession,
) -> ConversionGuest | None:
"""Get or create a ConversionGuest record for the given guest data.
Uses (hotel_id, guest_id) as the natural key to identify a guest. Phase 0: Single pass through XML to collect all unique guests.
If a guest with this key exists, updates it with new data. Uses (hotel_id, guest_id) as the key for deduplication.
If not, creates a new guest record.
NOTE: There is no database-level unique constraint on (hotel_id, guest_id), Args:
so multiple ConversionGuest records can exist with the same key. This method reservations: List of XML reservation elements
uses first() instead of scalar_one_or_none() to handle this gracefully and
update the most recently created record when duplicates exist.
Returns the ConversionGuest record, or None if no guest data provided. Returns:
Dictionary mapping (hotel_id, guest_id) to guest data dict
""" """
# Don't create a ConversionGuest if we have no guest information guest_data_by_key = {}
if not any(
[guest_first_name, guest_last_name, guest_email, guest_country_code, guest_birth_date]
):
return None
now = datetime.now(UTC) now = datetime.now(UTC)
# Try to find existing guest by (hotel_id, guest_id) for reservation_elem in reservations:
if guest_id: hotel_id = reservation_elem.get("hotelID")
result = await session.execute( guest_elem = reservation_elem.find("guest")
select(ConversionGuest)
.where( if guest_elem is None:
(ConversionGuest.hotel_id == hotel_id) _LOGGER.debug("No guest element found, skipping reservation %s (will be created with guest_id=None in Phase 2)", reservation_elem.get("id"))
& (ConversionGuest.guest_id == guest_id) continue
)
.order_by(ConversionGuest.last_seen.desc()) # Get most recently updated guest_id = guest_elem.get("id")
guest_first_name = guest_elem.get("firstName")
guest_last_name = guest_elem.get("lastName")
guest_email = guest_elem.get("email")
guest_country_code = guest_elem.get("countryCode")
guest_birth_date_str = guest_elem.get("dateOfBirth")
guest_birth_date = None
if guest_birth_date_str:
try:
guest_birth_date = datetime.strptime(guest_birth_date_str, "%Y-%m-%d").date()
except ValueError:
_LOGGER.warning("Invalid birth date format: %s", guest_birth_date_str)
key = (hotel_id, guest_id)
# Store guest data by key (will keep the last occurrence from XML)
guest_data_by_key[key] = {
"hotel_id": hotel_id,
"guest_id": guest_id,
"guest_first_name": guest_first_name,
"guest_last_name": guest_last_name,
"guest_email": guest_email,
"guest_country_code": guest_country_code,
"guest_birth_date": guest_birth_date,
"now": now,
}
return guest_data_by_key
async def _bulk_upsert_guests(
self, session: AsyncSession, guest_data_by_key: dict[tuple[str, str | None], dict]
) -> None:
"""Bulk upsert all unique guests to database using PostgreSQL ON CONFLICT.
Phase 1: Atomic upsert of all guests extracted in Phase 0.
Uses ON CONFLICT DO UPDATE to handle duplicates safely without race conditions.
Processes in batches to avoid overwhelming the database with large XML files.
Args:
session: AsyncSession to use
guest_data_by_key: Dictionary mapping (hotel_id, guest_id) to guest data
"""
if not guest_data_by_key:
return
# Process in batches to avoid overwhelming the database
batch_size = 1000
items = list(guest_data_by_key.items())
for batch_start in range(0, len(items), batch_size):
batch_end = min(batch_start + batch_size, len(items))
batch_items = items[batch_start:batch_end]
# Prepare list of values for this batch
values_list = []
for (hotel_id, guest_id), guest_data in batch_items:
now = guest_data["now"]
values_list.append({
"hotel_id": guest_data["hotel_id"],
"guest_id": guest_data["guest_id"],
"guest_first_name": guest_data["guest_first_name"],
"guest_last_name": guest_data["guest_last_name"],
"guest_email": guest_data["guest_email"],
"guest_country_code": guest_data["guest_country_code"],
"guest_birth_date": guest_data["guest_birth_date"],
"hashed_first_name": ConversionGuest._normalize_and_hash(guest_data["guest_first_name"]),
"hashed_last_name": ConversionGuest._normalize_and_hash(guest_data["guest_last_name"]),
"hashed_email": ConversionGuest._normalize_and_hash(guest_data["guest_email"]),
"hashed_country_code": ConversionGuest._normalize_and_hash(guest_data["guest_country_code"]),
"hashed_birth_date": ConversionGuest._normalize_and_hash(
guest_data["guest_birth_date"].isoformat() if guest_data["guest_birth_date"] else None
),
"is_regular": False,
"first_seen": now,
"last_seen": now,
})
# Use PostgreSQL ON CONFLICT DO UPDATE for atomic upsert
stmt = pg_insert(ConversionGuest).values(values_list)
stmt = stmt.on_conflict_do_update(
index_elements=["hotel_id", "guest_id"],
set_={
"guest_first_name": stmt.excluded.guest_first_name,
"guest_last_name": stmt.excluded.guest_last_name,
"guest_email": stmt.excluded.guest_email,
"guest_country_code": stmt.excluded.guest_country_code,
"guest_birth_date": stmt.excluded.guest_birth_date,
"hashed_first_name": stmt.excluded.hashed_first_name,
"hashed_last_name": stmt.excluded.hashed_last_name,
"hashed_email": stmt.excluded.hashed_email,
"hashed_country_code": stmt.excluded.hashed_country_code,
"hashed_birth_date": stmt.excluded.hashed_birth_date,
"last_seen": stmt.excluded.last_seen,
},
) )
existing_guest = result.scalars().first()
if existing_guest: await session.execute(stmt)
# Update with new data
existing_guest.update_from_conversion_data(
guest_first_name,
guest_last_name,
guest_email,
guest_country_code,
guest_birth_date,
now,
)
return existing_guest
# Create new ConversionGuest _LOGGER.info(
new_guest = ConversionGuest.create_from_conversion_data( "Phase 1: Upserted batch %d-%d of %d guests",
hotel_id=hotel_id, batch_start + 1,
guest_id=guest_id, batch_end,
guest_first_name=guest_first_name, len(items),
guest_last_name=guest_last_name, )
guest_email=guest_email,
guest_country_code=guest_country_code,
guest_birth_date=guest_birth_date,
now=now,
)
session.add(new_guest)
await session.flush() # Ensure the guest has an ID
return new_guest
async def process_conversion_xml(self, xml_content: str) -> dict[str, Any]: async def process_conversion_xml(self, xml_content: str) -> dict[str, Any]:
"""Parse conversion XML and save daily sales data to database. """Parse conversion XML and save daily sales data to database.
@@ -203,33 +266,69 @@ class ConversionService:
if self.session_maker: if self.session_maker:
await session.close() await session.close()
# Process active reservations in two phases: # Process active reservations in four phases:
# Phase 1: Create/update all conversion records # Phase 0: Extract and deduplicate all guest data from XML
# Phase 2: Match them to existing reservations/customers # Phase 1: Bulk upsert all unique guests to DB (no race conditions)
# Phase 2: Create/update all conversion records (ConversionGuests already exist)
# Phase 3: Match them to existing reservations/customers
reservations = root.findall("reservation") reservations = root.findall("reservation")
stats["total_reservations"] = len(reservations) stats["total_reservations"] = len(reservations)
if not reservations: if not reservations:
return stats return stats
# Phase 1: Create/update all conversions (no matching, no XML parsing beyond this point) _LOGGER.info("Processing %d reservations in xml", len(reservations))
# Phase 0: Extract and deduplicate all guest data from XML
_LOGGER.debug("Phase 0: Extracting and deduplicating guest data from XML")
guest_data_by_key = await self._extract_unique_guests_from_xml(reservations)
_LOGGER.info(
"Phase 0: Extracted %d unique guests from %d reservations",
len(guest_data_by_key),
len(reservations),
)
# Phase 1: Bulk upsert all unique guests to database
if guest_data_by_key:
_LOGGER.debug("Phase 1: Bulk upserting %d unique guests to database", len(guest_data_by_key))
if self.session_maker:
session = await self.session_maker.create_session()
else:
session = self.session
try:
await self._bulk_upsert_guests(session, guest_data_by_key)
await session.commit()
_LOGGER.info("Phase 1: Successfully upserted %d guests", len(guest_data_by_key))
except Exception as e:
await session.rollback()
_LOGGER.exception("Phase 1: Error during bulk guest upsert: %s", e)
stats["errors"] += len(guest_data_by_key)
return stats
finally:
if self.session_maker:
await session.close()
# Phase 2: Create/update all conversions (no matching, ConversionGuests already exist)
# Returns list of successfully created pms_reservation_ids # Returns list of successfully created pms_reservation_ids
_LOGGER.debug("Phase 2: Creating/updating conversions")
if self.supports_concurrent: if self.supports_concurrent:
pms_reservation_ids = await self._process_reservations_concurrent(reservations, stats) pms_reservation_ids = await self._process_reservations_concurrent(reservations, stats)
else: else:
pms_reservation_ids = await self._process_reservations_sequential(reservations, stats) pms_reservation_ids = await self._process_reservations_sequential(reservations, stats)
_LOGGER.debug( _LOGGER.debug(
"Phase 2: Found %d successfully created conversions out of %d total reservations", "Phase 3: Found %d successfully created conversions out of %d total reservations",
len(pms_reservation_ids), len(pms_reservation_ids),
len(reservations), len(reservations),
) )
# Phase 2: Match all conversions using database data only # Phase 3: Match all conversions using database data only
# Only match conversions that actually exist in the database # Only match conversions that actually exist in the database
# No XML parsing, no re-hashing - complete separation of concerns # No XML parsing, no re-hashing - complete separation of concerns
# This also enables matching historical data that wasn't just created # This also enables matching historical data that wasn't just created
if pms_reservation_ids: if pms_reservation_ids:
_LOGGER.debug("Phase 3: Matching conversions to reservations/customers")
if self.supports_concurrent: if self.supports_concurrent:
await self._match_conversions_from_db_concurrent(pms_reservation_ids, stats) await self._match_conversions_from_db_concurrent(pms_reservation_ids, stats)
else: else:
@@ -319,13 +418,24 @@ class ConversionService:
""" """
semaphore = asyncio.Semaphore(1) # Process one at a time semaphore = asyncio.Semaphore(1) # Process one at a time
results = [] results = []
async with asyncio.TaskGroup() as tg: tasks = []
tasks = []
for reservation in reservations: try:
task = tg.create_task( async with asyncio.TaskGroup() as tg:
self._process_reservation_safe(reservation, semaphore, stats) for reservation in reservations:
) task = tg.create_task(
tasks.append(task) self._process_reservation_safe(reservation, semaphore, stats)
)
tasks.append(task)
except ExceptionGroup as eg:
# Fail fast: cancel all remaining tasks and re-raise the first exception
for task in tasks:
if not task.done():
task.cancel()
# Re-raise the first exception from the group
if eg.exceptions:
raise eg.exceptions[0] from eg
raise
# Collect results from all tasks # Collect results from all tasks
for task in tasks: for task in tasks:
@@ -355,13 +465,24 @@ class ConversionService:
semaphore = asyncio.Semaphore(MAX_CONCURRENT_RESERVATIONS) semaphore = asyncio.Semaphore(MAX_CONCURRENT_RESERVATIONS)
results = [] results = []
async with asyncio.TaskGroup() as tg: tasks = []
tasks = []
for reservation in reservations: try:
task = tg.create_task( async with asyncio.TaskGroup() as tg:
self._process_reservation_safe(reservation, semaphore, stats) for reservation in reservations:
) task = tg.create_task(
tasks.append(task) self._process_reservation_safe(reservation, semaphore, stats)
)
tasks.append(task)
except ExceptionGroup as eg:
# Fail fast: cancel all remaining tasks and re-raise the first exception
for task in tasks:
if not task.done():
task.cancel()
# Re-raise the first exception from the group
if eg.exceptions:
raise eg.exceptions[0] from eg
raise
# Collect results from all tasks # Collect results from all tasks
for task in tasks: for task in tasks:
@@ -548,6 +669,8 @@ class ConversionService:
) )
return stats return stats
# ConversionGuests have already been bulk-upserted in Phase 1,
# so we can safely create/update conversions now
# Check if conversion already exists (upsert logic) # Check if conversion already exists (upsert logic)
existing_result = await session.execute( existing_result = await session.execute(
select(Conversion).where( select(Conversion).where(
@@ -609,21 +732,6 @@ class ConversionService:
# Flush to ensure conversion has an ID before creating room reservations # Flush to ensure conversion has an ID before creating room reservations
await session.flush() await session.flush()
# Create or update ConversionGuest and link it to the conversion
# The conversion is linked to ConversionGuest via composite FK (hotel_id, guest_id)
# So we just need to ensure ConversionGuest exists - the FK is already set via hotel_id + guest_id
conversion_guest = await self._get_or_create_conversion_guest(
hotel_id=hotel_id,
guest_id=guest_id,
guest_first_name=guest_first_name,
guest_last_name=guest_last_name,
guest_email=guest_email,
guest_country_code=guest_country_code,
guest_birth_date=guest_birth_date,
session=session,
)
# guest_id is already set on conversion, so the composite FK relationship is established
# Batch-load existing room reservations to avoid N+1 queries # Batch-load existing room reservations to avoid N+1 queries
room_numbers = [ room_numbers = [
rm.get("roomNumber") for rm in room_reservations.findall("roomReservation") rm.get("roomNumber") for rm in room_reservations.findall("roomReservation")
@@ -724,15 +832,6 @@ class ConversionService:
# Check if room reservation already exists using batch-loaded data # Check if room reservation already exists using batch-loaded data
existing_room_reservation = existing_rooms.get(pms_hotel_reservation_id) existing_room_reservation = existing_rooms.get(pms_hotel_reservation_id)
if total_revenue > 0 and (
guest_first_name is None
and guest_last_name is None
and guest_email is None
):
_LOGGER.info(
"Guest info missing but total revenue > 0 for PMS ID %s",
pms_reservation_id,
)
if existing_room_reservation: if existing_room_reservation:
# Update existing room reservation with all fields # Update existing room reservation with all fields