From 42b353d24152ce2560c512a9019b17d9c4e60232 Mon Sep 17 00:00:00 2001 From: Jonas Linter <{email_address}> Date: Wed, 19 Nov 2025 17:27:47 +0100 Subject: [PATCH] Finally it works --- src/alpine_bits_python/conversion_service.py | 311 ++++++++++++------- 1 file changed, 205 insertions(+), 106 deletions(-) diff --git a/src/alpine_bits_python/conversion_service.py b/src/alpine_bits_python/conversion_service.py index be4c4f8..f01fc29 100644 --- a/src/alpine_bits_python/conversion_service.py +++ b/src/alpine_bits_python/conversion_service.py @@ -7,7 +7,8 @@ from datetime import UTC, datetime from decimal import Decimal from typing import Any -from sqlalchemy import or_, select +from sqlalchemy import insert, or_, select +from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload @@ -76,76 +77,138 @@ class ConversionService: f"session must be AsyncSession or SessionMaker, got {type(session)}" ) - async def _get_or_create_conversion_guest( - self, - hotel_id: str, - guest_id: str | None, - guest_first_name: str | None, - guest_last_name: str | None, - guest_email: str | None, - guest_country_code: str | None, - guest_birth_date, - session: AsyncSession, - ) -> ConversionGuest | None: - """Get or create a ConversionGuest record for the given guest data. + async def _extract_unique_guests_from_xml( + self, reservations: list + ) -> dict[tuple[str, str | None], dict]: + """Extract and deduplicate all guest data from XML reservations. - Uses (hotel_id, guest_id) as the natural key to identify a guest. - If a guest with this key exists, updates it with new data. - If not, creates a new guest record. + Phase 0: Single pass through XML to collect all unique guests. + Uses (hotel_id, guest_id) as the key for deduplication. - NOTE: There is no database-level unique constraint on (hotel_id, guest_id), - so multiple ConversionGuest records can exist with the same key. This method - uses first() instead of scalar_one_or_none() to handle this gracefully and - update the most recently created record when duplicates exist. + Args: + reservations: List of XML reservation elements - Returns the ConversionGuest record, or None if no guest data provided. + Returns: + Dictionary mapping (hotel_id, guest_id) to guest data dict """ - # Don't create a ConversionGuest if we have no guest information - if not any( - [guest_first_name, guest_last_name, guest_email, guest_country_code, guest_birth_date] - ): - return None - + guest_data_by_key = {} now = datetime.now(UTC) - # Try to find existing guest by (hotel_id, guest_id) - if guest_id: - result = await session.execute( - select(ConversionGuest) - .where( - (ConversionGuest.hotel_id == hotel_id) - & (ConversionGuest.guest_id == guest_id) - ) - .order_by(ConversionGuest.last_seen.desc()) # Get most recently updated + for reservation_elem in reservations: + hotel_id = reservation_elem.get("hotelID") + guest_elem = reservation_elem.find("guest") + + if guest_elem is None: + _LOGGER.debug("No guest element found, skipping reservation %s (will be created with guest_id=None in Phase 2)", reservation_elem.get("id")) + continue + + guest_id = guest_elem.get("id") + guest_first_name = guest_elem.get("firstName") + guest_last_name = guest_elem.get("lastName") + guest_email = guest_elem.get("email") + guest_country_code = guest_elem.get("countryCode") + guest_birth_date_str = guest_elem.get("dateOfBirth") + + + + guest_birth_date = None + if guest_birth_date_str: + try: + guest_birth_date = datetime.strptime(guest_birth_date_str, "%Y-%m-%d").date() + except ValueError: + _LOGGER.warning("Invalid birth date format: %s", guest_birth_date_str) + + key = (hotel_id, guest_id) + + # Store guest data by key (will keep the last occurrence from XML) + guest_data_by_key[key] = { + "hotel_id": hotel_id, + "guest_id": guest_id, + "guest_first_name": guest_first_name, + "guest_last_name": guest_last_name, + "guest_email": guest_email, + "guest_country_code": guest_country_code, + "guest_birth_date": guest_birth_date, + "now": now, + } + + return guest_data_by_key + + async def _bulk_upsert_guests( + self, session: AsyncSession, guest_data_by_key: dict[tuple[str, str | None], dict] + ) -> None: + """Bulk upsert all unique guests to database using PostgreSQL ON CONFLICT. + + Phase 1: Atomic upsert of all guests extracted in Phase 0. + Uses ON CONFLICT DO UPDATE to handle duplicates safely without race conditions. + Processes in batches to avoid overwhelming the database with large XML files. + + Args: + session: AsyncSession to use + guest_data_by_key: Dictionary mapping (hotel_id, guest_id) to guest data + """ + if not guest_data_by_key: + return + + # Process in batches to avoid overwhelming the database + batch_size = 1000 + items = list(guest_data_by_key.items()) + + for batch_start in range(0, len(items), batch_size): + batch_end = min(batch_start + batch_size, len(items)) + batch_items = items[batch_start:batch_end] + + # Prepare list of values for this batch + values_list = [] + for (hotel_id, guest_id), guest_data in batch_items: + now = guest_data["now"] + values_list.append({ + "hotel_id": guest_data["hotel_id"], + "guest_id": guest_data["guest_id"], + "guest_first_name": guest_data["guest_first_name"], + "guest_last_name": guest_data["guest_last_name"], + "guest_email": guest_data["guest_email"], + "guest_country_code": guest_data["guest_country_code"], + "guest_birth_date": guest_data["guest_birth_date"], + "hashed_first_name": ConversionGuest._normalize_and_hash(guest_data["guest_first_name"]), + "hashed_last_name": ConversionGuest._normalize_and_hash(guest_data["guest_last_name"]), + "hashed_email": ConversionGuest._normalize_and_hash(guest_data["guest_email"]), + "hashed_country_code": ConversionGuest._normalize_and_hash(guest_data["guest_country_code"]), + "hashed_birth_date": ConversionGuest._normalize_and_hash( + guest_data["guest_birth_date"].isoformat() if guest_data["guest_birth_date"] else None + ), + "is_regular": False, + "first_seen": now, + "last_seen": now, + }) + + # Use PostgreSQL ON CONFLICT DO UPDATE for atomic upsert + stmt = pg_insert(ConversionGuest).values(values_list) + stmt = stmt.on_conflict_do_update( + index_elements=["hotel_id", "guest_id"], + set_={ + "guest_first_name": stmt.excluded.guest_first_name, + "guest_last_name": stmt.excluded.guest_last_name, + "guest_email": stmt.excluded.guest_email, + "guest_country_code": stmt.excluded.guest_country_code, + "guest_birth_date": stmt.excluded.guest_birth_date, + "hashed_first_name": stmt.excluded.hashed_first_name, + "hashed_last_name": stmt.excluded.hashed_last_name, + "hashed_email": stmt.excluded.hashed_email, + "hashed_country_code": stmt.excluded.hashed_country_code, + "hashed_birth_date": stmt.excluded.hashed_birth_date, + "last_seen": stmt.excluded.last_seen, + }, ) - existing_guest = result.scalars().first() - if existing_guest: - # Update with new data - existing_guest.update_from_conversion_data( - guest_first_name, - guest_last_name, - guest_email, - guest_country_code, - guest_birth_date, - now, - ) - return existing_guest + await session.execute(stmt) - # Create new ConversionGuest - new_guest = ConversionGuest.create_from_conversion_data( - hotel_id=hotel_id, - guest_id=guest_id, - guest_first_name=guest_first_name, - guest_last_name=guest_last_name, - guest_email=guest_email, - guest_country_code=guest_country_code, - guest_birth_date=guest_birth_date, - now=now, - ) - session.add(new_guest) - await session.flush() # Ensure the guest has an ID - return new_guest + _LOGGER.info( + "Phase 1: Upserted batch %d-%d of %d guests", + batch_start + 1, + batch_end, + len(items), + ) async def process_conversion_xml(self, xml_content: str) -> dict[str, Any]: """Parse conversion XML and save daily sales data to database. @@ -203,33 +266,69 @@ class ConversionService: if self.session_maker: await session.close() - # Process active reservations in two phases: - # Phase 1: Create/update all conversion records - # Phase 2: Match them to existing reservations/customers + # Process active reservations in four phases: + # Phase 0: Extract and deduplicate all guest data from XML + # Phase 1: Bulk upsert all unique guests to DB (no race conditions) + # Phase 2: Create/update all conversion records (ConversionGuests already exist) + # Phase 3: Match them to existing reservations/customers reservations = root.findall("reservation") stats["total_reservations"] = len(reservations) if not reservations: return stats - # Phase 1: Create/update all conversions (no matching, no XML parsing beyond this point) + _LOGGER.info("Processing %d reservations in xml", len(reservations)) + + # Phase 0: Extract and deduplicate all guest data from XML + _LOGGER.debug("Phase 0: Extracting and deduplicating guest data from XML") + guest_data_by_key = await self._extract_unique_guests_from_xml(reservations) + _LOGGER.info( + "Phase 0: Extracted %d unique guests from %d reservations", + len(guest_data_by_key), + len(reservations), + ) + + # Phase 1: Bulk upsert all unique guests to database + if guest_data_by_key: + _LOGGER.debug("Phase 1: Bulk upserting %d unique guests to database", len(guest_data_by_key)) + if self.session_maker: + session = await self.session_maker.create_session() + else: + session = self.session + + try: + await self._bulk_upsert_guests(session, guest_data_by_key) + await session.commit() + _LOGGER.info("Phase 1: Successfully upserted %d guests", len(guest_data_by_key)) + except Exception as e: + await session.rollback() + _LOGGER.exception("Phase 1: Error during bulk guest upsert: %s", e) + stats["errors"] += len(guest_data_by_key) + return stats + finally: + if self.session_maker: + await session.close() + + # Phase 2: Create/update all conversions (no matching, ConversionGuests already exist) # Returns list of successfully created pms_reservation_ids + _LOGGER.debug("Phase 2: Creating/updating conversions") if self.supports_concurrent: pms_reservation_ids = await self._process_reservations_concurrent(reservations, stats) else: pms_reservation_ids = await self._process_reservations_sequential(reservations, stats) _LOGGER.debug( - "Phase 2: Found %d successfully created conversions out of %d total reservations", + "Phase 3: Found %d successfully created conversions out of %d total reservations", len(pms_reservation_ids), len(reservations), ) - # Phase 2: Match all conversions using database data only + # Phase 3: Match all conversions using database data only # Only match conversions that actually exist in the database # No XML parsing, no re-hashing - complete separation of concerns # This also enables matching historical data that wasn't just created if pms_reservation_ids: + _LOGGER.debug("Phase 3: Matching conversions to reservations/customers") if self.supports_concurrent: await self._match_conversions_from_db_concurrent(pms_reservation_ids, stats) else: @@ -319,13 +418,24 @@ class ConversionService: """ semaphore = asyncio.Semaphore(1) # Process one at a time results = [] - async with asyncio.TaskGroup() as tg: - tasks = [] - for reservation in reservations: - task = tg.create_task( - self._process_reservation_safe(reservation, semaphore, stats) - ) - tasks.append(task) + tasks = [] + + try: + async with asyncio.TaskGroup() as tg: + for reservation in reservations: + task = tg.create_task( + self._process_reservation_safe(reservation, semaphore, stats) + ) + tasks.append(task) + except ExceptionGroup as eg: + # Fail fast: cancel all remaining tasks and re-raise the first exception + for task in tasks: + if not task.done(): + task.cancel() + # Re-raise the first exception from the group + if eg.exceptions: + raise eg.exceptions[0] from eg + raise # Collect results from all tasks for task in tasks: @@ -355,13 +465,24 @@ class ConversionService: semaphore = asyncio.Semaphore(MAX_CONCURRENT_RESERVATIONS) results = [] - async with asyncio.TaskGroup() as tg: - tasks = [] - for reservation in reservations: - task = tg.create_task( - self._process_reservation_safe(reservation, semaphore, stats) - ) - tasks.append(task) + tasks = [] + + try: + async with asyncio.TaskGroup() as tg: + for reservation in reservations: + task = tg.create_task( + self._process_reservation_safe(reservation, semaphore, stats) + ) + tasks.append(task) + except ExceptionGroup as eg: + # Fail fast: cancel all remaining tasks and re-raise the first exception + for task in tasks: + if not task.done(): + task.cancel() + # Re-raise the first exception from the group + if eg.exceptions: + raise eg.exceptions[0] from eg + raise # Collect results from all tasks for task in tasks: @@ -548,6 +669,8 @@ class ConversionService: ) return stats + # ConversionGuests have already been bulk-upserted in Phase 1, + # so we can safely create/update conversions now # Check if conversion already exists (upsert logic) existing_result = await session.execute( select(Conversion).where( @@ -609,21 +732,6 @@ class ConversionService: # Flush to ensure conversion has an ID before creating room reservations await session.flush() - # Create or update ConversionGuest and link it to the conversion - # The conversion is linked to ConversionGuest via composite FK (hotel_id, guest_id) - # So we just need to ensure ConversionGuest exists - the FK is already set via hotel_id + guest_id - conversion_guest = await self._get_or_create_conversion_guest( - hotel_id=hotel_id, - guest_id=guest_id, - guest_first_name=guest_first_name, - guest_last_name=guest_last_name, - guest_email=guest_email, - guest_country_code=guest_country_code, - guest_birth_date=guest_birth_date, - session=session, - ) - # guest_id is already set on conversion, so the composite FK relationship is established - # Batch-load existing room reservations to avoid N+1 queries room_numbers = [ rm.get("roomNumber") for rm in room_reservations.findall("roomReservation") @@ -724,15 +832,6 @@ class ConversionService: # Check if room reservation already exists using batch-loaded data existing_room_reservation = existing_rooms.get(pms_hotel_reservation_id) - if total_revenue > 0 and ( - guest_first_name is None - and guest_last_name is None - and guest_email is None - ): - _LOGGER.info( - "Guest info missing but total revenue > 0 for PMS ID %s", - pms_reservation_id, - ) if existing_room_reservation: # Update existing room reservation with all fields