"""Service for handling conversion data from hotel PMS XML files.""" import asyncio import xml.etree.ElementTree as ET from dataclasses import dataclass, field from datetime import UTC, date, datetime from decimal import Decimal from typing import Any from sqlalchemy import or_, select from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from .db import ( Conversion, ConversionGuest, ConversionRoom, Customer, HashedCustomer, Reservation, SessionMaker, ) from .logging_config import get_logger from .schemas import ConversionData, ConversionGuestData _LOGGER = get_logger(__name__) # Limit concurrent reservation processing to avoid overwhelming the database MAX_CONCURRENT_RESERVATIONS = 10 @dataclass(slots=True) class ParsedRoomReservation: """Typed representation of a single entry.""" pms_hotel_reservation_id: str room_number: str | None arrival_date: date | None departure_date: date | None room_status: str | None room_type: str | None num_adults: int | None rate_plan_code: str | None connected_room_type: str | None daily_sales: list[dict[str, str]] = field(default_factory=list) total_revenue: Decimal | None = None daily_sales_count: int = 0 @dataclass(slots=True) class ParsedReservationData: """Typed representation of reservation metadata and rooms.""" hotel_id: str | None pms_reservation_id: int guest_id: int | None reservation_number: str | None reservation_date: date | None creation_time: datetime | None reservation_type: str | None booking_channel: str | None advertising_medium: str | None advertising_partner: str | None advertising_campagne: str | None room_reservations: list[ParsedRoomReservation] = field(default_factory=list) class ConversionService: """Service for processing and storing conversion/daily sales data. Supports two modes of operation: 1. Sequential mode: Single AsyncSession passed in, uses sequential processing 2. Concurrent mode: SessionMaker passed in, creates independent sessions per task """ def __init__( self, session: AsyncSession | SessionMaker | None = None, hotel_id: str | None = None, ): """Initialize the ConversionService. Args: session: Can be either: - AsyncSession: Single session for sequential processing - SessionMaker: Factory for creating sessions in concurrent mode - None: Not recommended, but allowed for subclassing hotel_id: Hotel ID for this conversion service context (from authenticated user) """ self.session = None self.session_maker = None self.supports_concurrent = False self.hotel_id = hotel_id # Cache for reservation and customer data within a single XML processing run # Maps hotel_code -> list of (reservation, hashed_customer) tuples # This significantly speeds up matching when processing large XML files # Uses hashed data for matching to preserve privacy self._reservation_cache: dict[ str | None, list[tuple[Reservation, HashedCustomer | None]] ] = {} self._cache_initialized = False if isinstance(session, SessionMaker): self.session_maker = session self.supports_concurrent = True _LOGGER.debug( "ConversionService initialized in concurrent mode with SessionMaker" ) elif isinstance(session, AsyncSession): self.session = session self.supports_concurrent = False _LOGGER.debug( "ConversionService initialized in sequential mode with single session" ) elif session is not None: raise TypeError( f"session must be AsyncSession or SessionMaker, got {type(session)}" ) @staticmethod def _parse_required_int(value: str | None, field_name: str) -> int: """Parse an integer attribute that must be present.""" if value in (None, ""): raise ValueError(f"{field_name} is required") try: return int(value) except (TypeError, ValueError) as exc: raise ValueError( f"{field_name} must be an integer (value={value})" ) from exc @staticmethod def _parse_optional_int(value: str | None, field_name: str) -> int | None: """Parse an optional integer attribute, logging on failure.""" if value in (None, ""): return None try: return int(value) except (TypeError, ValueError): _LOGGER.warning("Invalid %s value: %s", field_name, value) return None @staticmethod def _parse_date(value: str | None, field_name: str) -> date | None: """Parse a YYYY-MM-DD formatted date string.""" if not value: return None try: return datetime.strptime(value, "%Y-%m-%d").date() except ValueError: _LOGGER.warning("Invalid %s format: %s", field_name, value) return None @staticmethod def _parse_datetime(value: str | None, field_name: str) -> datetime | None: """Parse an ISO timestamp string.""" if not value: return None try: normalized = value.replace("Z", "+00:00") return datetime.fromisoformat(normalized) except ValueError: _LOGGER.warning("Invalid %s format: %s", field_name, value) return None def _parse_daily_sales( self, daily_sales_elem: ET.Element | None ) -> tuple[list[dict[str, str]], Decimal | None, int]: """Extract the list of sale dictionaries and aggregate revenue information.""" if daily_sales_elem is None: return [], None, 0 daily_sales_list: list[dict[str, str]] = [] total_revenue = Decimal(0) sale_count = 0 for daily_sale in daily_sales_elem.findall("dailySale"): sale_count += 1 sale_data: dict[str, str] = {} sale_date_str = daily_sale.get("date") if sale_date_str: sale_data["date"] = sale_date_str revenue_total_str = daily_sale.get("revenueTotal") if revenue_total_str: sale_data["revenueTotal"] = revenue_total_str try: total_revenue += Decimal(revenue_total_str) except (ValueError, TypeError): _LOGGER.warning("Invalid revenueTotal value: %s", revenue_total_str) # Copy the remaining optional revenue buckets if present for field_name in ( "revenueLogis", "revenueBoard", "revenueFB", "revenueSpa", "revenueOther", ): value = daily_sale.get(field_name) if value: sale_data[field_name] = value if sale_data: daily_sales_list.append(sale_data) total_revenue_value = total_revenue if total_revenue > 0 else None return daily_sales_list, total_revenue_value, sale_count def _parse_room_reservation( self, room_elem: ET.Element, pms_reservation_id: int, room_index: int ) -> ParsedRoomReservation: """Convert a element into ParsedRoomReservation.""" arrival_date = self._parse_date(room_elem.get("arrival"), "arrival date") departure_date = self._parse_date(room_elem.get("departure"), "departure date") num_adults = self._parse_optional_int(room_elem.get("adults"), "adults") room_number = room_elem.get("roomNumber") if room_number is None: _LOGGER.debug( "Room reservation %s #%d has no roomNumber", pms_reservation_id, room_index, ) daily_sales, total_revenue, sale_count = self._parse_daily_sales( room_elem.find("dailySales") ) return ParsedRoomReservation( pms_hotel_reservation_id=f"{pms_reservation_id}_{room_number}", room_number=room_number, arrival_date=arrival_date, departure_date=departure_date, room_status=room_elem.get("status"), room_type=room_elem.get("roomType"), num_adults=num_adults, rate_plan_code=room_elem.get("ratePlanCode"), connected_room_type=room_elem.get("connectedRoomType"), daily_sales=daily_sales, total_revenue=total_revenue, daily_sales_count=sale_count, ) def _parse_reservation_element( self, reservation_elem: ET.Element ) -> ParsedReservationData | None: """Convert a element into a structured representation.""" try: pms_reservation_id = self._parse_required_int( reservation_elem.get("id"), "reservation id" ) except ValueError as exc: _LOGGER.error( "Invalid reservation metadata in reservation element: %s", exc ) return None room_reservations_elem = reservation_elem.find("roomReservations") if room_reservations_elem is None: _LOGGER.debug( "No roomReservations found for reservation %s", pms_reservation_id ) return None room_reservations = [ self._parse_room_reservation(room_elem, pms_reservation_id, idx) for idx, room_elem in enumerate( room_reservations_elem.findall("roomReservation") ) ] if not room_reservations: _LOGGER.debug( "Reservation %s has no roomReservation entries", pms_reservation_id ) return None guest_elem = reservation_elem.find("guest") guest_id = None if guest_elem is not None: guest_id = self._parse_optional_int(guest_elem.get("id"), "guest id") return ParsedReservationData( hotel_id=reservation_elem.get("hotelID"), pms_reservation_id=pms_reservation_id, guest_id=guest_id, reservation_number=reservation_elem.get("number"), reservation_date=self._parse_date( reservation_elem.get("date"), "reservation date" ), creation_time=self._parse_datetime( reservation_elem.get("creationTime"), "creation time" ), reservation_type=reservation_elem.get("type"), booking_channel=reservation_elem.get("bookingChannel"), advertising_medium=reservation_elem.get("advertisingMedium"), advertising_partner=reservation_elem.get("advertisingPartner"), advertising_campagne=reservation_elem.get("advertisingCampagne"), room_reservations=room_reservations, ) async def _extract_unique_guests_from_xml( self, reservations: list ) -> dict[tuple[str, int], ConversionGuestData]: """Extract and deduplicate all guest data from XML reservations. Phase 0: Single pass through XML to collect all unique guests. Uses (hotel_id, guest_id) as the key for deduplication. Validates each guest using Pydantic before storing. Args: reservations: List of XML reservation elements Returns: Dictionary mapping (hotel_id, guest_id) to validated ConversionGuestData """ guest_data_by_key = {} now = datetime.now(UTC) for reservation_elem in reservations: hotel_id = reservation_elem.get("hotelID") guest_elem = reservation_elem.find("guest") if guest_elem is None: _LOGGER.debug( "No guest element found, skipping reservation %s (will be created with guest_id=None in Phase 2)", reservation_elem.get("id"), ) continue guest_id = guest_elem.get("id") guest_first_name = guest_elem.get("firstName") guest_last_name = guest_elem.get("lastName") guest_email = guest_elem.get("email") guest_country_code = guest_elem.get("countryCode") guest_birth_date_str = guest_elem.get("dateOfBirth") guest_birth_date = None if guest_birth_date_str: try: guest_birth_date = datetime.strptime( guest_birth_date_str, "%Y-%m-%d" ).date() except ValueError: _LOGGER.warning( "Invalid birth date format: %s", guest_birth_date_str ) # Validate guest data with Pydantic during extraction try: validated_guest = ConversionGuestData( hotel_id=hotel_id, guest_id=guest_id, # Will be validated and converted to int guest_first_name=guest_first_name, guest_last_name=guest_last_name, guest_email=guest_email, guest_country_code=guest_country_code, guest_birth_date=guest_birth_date, first_seen=now, last_seen=now, ) # Use validated guest_id (now an int) for the key key = (hotel_id, validated_guest.guest_id) # Store validated guest data (will keep the last occurrence from XML) guest_data_by_key[key] = validated_guest except ValueError: _LOGGER.exception( "Failed to validate guest data for reservation %s", reservation_elem.get("id"), ) continue return guest_data_by_key async def _bulk_upsert_guests( self, session: AsyncSession, guest_data_by_key: dict[tuple[str, int], ConversionGuestData], ) -> None: """Bulk upsert all unique guests to database using PostgreSQL ON CONFLICT. Phase 1: Atomic upsert of all guests extracted in Phase 0. Uses ON CONFLICT DO UPDATE to handle duplicates safely without race conditions. Processes in batches to avoid overwhelming the database with large XML files. Args: session: AsyncSession to use guest_data_by_key: Dictionary mapping (hotel_id, guest_id) to validated ConversionGuestData """ if not guest_data_by_key: return # Process in batches to avoid overwhelming the database batch_size = 1000 items = list(guest_data_by_key.items()) for batch_start in range(0, len(items), batch_size): batch_end = min(batch_start + batch_size, len(items)) batch_items = items[batch_start:batch_end] # Prepare list of values for this batch (already validated) values_list = [] for (hotel_id, guest_id), validated_guest in batch_items: # Convert validated Pydantic model to dict for insertion # (all validations and hash calculations are already done) values_list.append(validated_guest.model_dump()) # Use PostgreSQL ON CONFLICT DO UPDATE for atomic upsert stmt = pg_insert(ConversionGuest).values(values_list) stmt = stmt.on_conflict_do_update( index_elements=["hotel_id", "guest_id"], set_={ "guest_first_name": stmt.excluded.guest_first_name, "guest_last_name": stmt.excluded.guest_last_name, "guest_email": stmt.excluded.guest_email, "guest_country_code": stmt.excluded.guest_country_code, "guest_birth_date": stmt.excluded.guest_birth_date, "hashed_first_name": stmt.excluded.hashed_first_name, "hashed_last_name": stmt.excluded.hashed_last_name, "hashed_email": stmt.excluded.hashed_email, "hashed_country_code": stmt.excluded.hashed_country_code, "hashed_birth_date": stmt.excluded.hashed_birth_date, "last_seen": stmt.excluded.last_seen, }, ) await session.execute(stmt) _LOGGER.debug( "Phase 1: Upserted batch %d-%d of %d guests", batch_start + 1, batch_end, len(items), ) async def process_conversion_xml(self, xml_content: str) -> dict[str, Any]: """Parse conversion XML and save daily sales data to database. Args: xml_content: XML string containing reservation and daily sales data Returns: Dictionary with processing statistics """ try: root = ET.fromstring(xml_content) except ET.ParseError as e: _LOGGER.error("Failed to parse conversion XML: %s", e) raise ValueError(f"Invalid XML content: {e}") from e # Initialize cache for this XML processing run await self._load_reservation_cache() stats = { "total_reservations": 0, "deleted_reservations": 0, "total_daily_sales": 0, "matched_to_reservation": 0, "matched_to_customer": 0, "matched_to_hashed_customer": 0, "unmatched": 0, "errors": 0, } # Get a session for deleted reservations processing if self.session_maker: session = await self.session_maker.create_session() else: session = self.session # Process deleted reservations for deleted_res in root.findall("Deletedreservation"): stats["deleted_reservations"] += 1 pms_reservation_id_str = deleted_res.get("ID") try: pms_reservation_id = ( int(pms_reservation_id_str) if pms_reservation_id_str else None ) if pms_reservation_id is None: _LOGGER.warning( "Deleted reservation missing ID attribute, skipping" ) continue await self._handle_deleted_reservation(pms_reservation_id, session) await session.commit() except Exception as e: await session.rollback() _LOGGER.exception( "Error deleting reservation %s: %s", pms_reservation_id, e, ) stats["errors"] += 1 # Close session if created by SessionMaker if self.session_maker: await session.close() # Process active reservations in four phases: # Phase 0: Extract and deduplicate all guest data from XML # Phase 1: Bulk upsert all unique guests to DB (no race conditions) # Phase 2: Create/update all conversion records (ConversionGuests already exist) # Phase 3: Match them to existing reservations/customers reservations = root.findall("reservation") stats["total_reservations"] = len(reservations) if not reservations: return stats _LOGGER.info("Processing %d reservations in xml", len(reservations)) # Phase 0: Extract and deduplicate all guest data from XML _LOGGER.debug("Phase 0: Extracting and deduplicating guest data from XML") guest_data_by_key = await self._extract_unique_guests_from_xml(reservations) _LOGGER.info( "Phase 0: Extracted %d unique guests from %d reservations", len(guest_data_by_key), len(reservations), ) # Phase 1: Bulk upsert all unique guests to database if guest_data_by_key: _LOGGER.debug( "Phase 1: Bulk upserting %d unique guests to database", len(guest_data_by_key), ) if self.session_maker: session = await self.session_maker.create_session() else: session = self.session try: await self._bulk_upsert_guests(session, guest_data_by_key) await session.commit() _LOGGER.info( "Phase 1: Successfully upserted %d guests", len(guest_data_by_key) ) except Exception as e: await session.rollback() _LOGGER.exception("Phase 1: Error during bulk guest upsert: %s", e) stats["errors"] += len(guest_data_by_key) return stats finally: if self.session_maker: await session.close() # Phase 2: Create/update all conversions (no matching, ConversionGuests already exist) # Returns list of successfully created pms_reservation_ids _LOGGER.debug("Phase 2: Creating/updating conversions") if self.supports_concurrent: pms_reservation_ids = await self._process_reservations_concurrent( reservations, stats ) else: pms_reservation_ids = await self._process_reservations_sequential( reservations, stats ) _LOGGER.debug( "Phase 3: Found %d successfully created conversions out of %d total reservations", len(pms_reservation_ids), len(reservations), ) # Phase 3: Match all conversions using database data only # Only match conversions that actually exist in the database # No XML parsing, no re-hashing - complete separation of concerns # This also enables matching historical data that wasn't just created if pms_reservation_ids: _LOGGER.debug("Phase 3: Matching conversions to reservations/customers") if self.supports_concurrent: await self._match_conversions_from_db_concurrent( pms_reservation_ids, stats ) else: await self._match_conversions_from_db_sequential( pms_reservation_ids, stats ) return stats async def _load_reservation_cache(self) -> None: """Load all reservations and hashed customers into cache for fast matching. This method is called once at the start of processing a large XML file. It loads all reservations with their associated hashed customers into an in-memory cache organized by hotel_code. This avoids repeated database queries during matching operations and uses hashed data for privacy-preserving matching. The cache structure: - Key: hotel_code (str or None) - Value: List of (reservation, hashed_customer) tuples This is especially beneficial for large XML files with many reservations where matching criteria is the same across multiple reservations. """ if self._cache_initialized: _LOGGER.debug("Reservation cache already initialized, skipping reload") return # Get a session for loading the cache if self.session_maker: session = await self.session_maker.create_session() close_session = True else: session = self.session close_session = False if not session: _LOGGER.warning("No session available for cache loading") return try: # Load all reservations with their hashed customers in one query from sqlalchemy.orm import selectinload query = select(Reservation).options( selectinload(Reservation.customer).selectinload( Customer.hashed_version ), selectinload(Reservation.hashed_customer), ) result = await session.execute(query) reservations = result.scalars().all() _LOGGER.info("Loaded %d reservations into cache", len(reservations)) # Organize by hotel_code for efficient lookup for reservation in reservations: hotel_code = reservation.hotel_code if hotel_code not in self._reservation_cache: self._reservation_cache[hotel_code] = [] # Cache the hashed customer - prefer direct relationship, fall back to customer relationship hashed_customer = None if reservation.hashed_customer: hashed_customer = reservation.hashed_customer elif reservation.customer and reservation.customer.hashed_version: hashed_customer = reservation.customer.hashed_version self._reservation_cache[hotel_code].append( (reservation, hashed_customer) ) self._cache_initialized = True _LOGGER.info( "Reservation cache initialized with %d hotel codes", len(self._reservation_cache), ) except Exception as e: _LOGGER.error("Failed to load reservation cache: %s", e) # Cache remains empty, fall back to direct queries self._cache_initialized = True finally: # Close session if we created it if close_session: await session.close() async def _process_reservations_sequential( self, reservations: list, stats: dict[str, int] ) -> list[str]: """Process reservations one at a time (original behavior). Returns: List of pms_reservation_ids that were successfully created/updated """ semaphore = asyncio.Semaphore(1) # Process one at a time results = [] tasks = [] try: async with asyncio.TaskGroup() as tg: for reservation in reservations: task = tg.create_task( self._process_reservation_safe(reservation, semaphore, stats) ) tasks.append(task) except ExceptionGroup as eg: # Fail fast: cancel all remaining tasks and re-raise the first exception for task in tasks: if not task.done(): task.cancel() # Re-raise the first exception from the group if eg.exceptions: raise eg.exceptions[0] from eg raise # Collect results from all tasks for task in tasks: pms_id = task.result() if pms_id is not None: results.append(pms_id) return results async def _process_reservations_concurrent( self, reservations: list, stats: dict[str, int] ) -> list[str]: """Process reservations concurrently with semaphore limiting. Each concurrent task gets its own independent database session from the SessionMaker. Returns: List of pms_reservation_ids that were successfully created/updated """ if not self.session_maker: _LOGGER.error( "Concurrent processing requested but SessionMaker not available. " "Falling back to sequential processing." ) return await self._process_reservations_sequential(reservations, stats) semaphore = asyncio.Semaphore(MAX_CONCURRENT_RESERVATIONS) results = [] tasks = [] try: async with asyncio.TaskGroup() as tg: for reservation in reservations: task = tg.create_task( self._process_reservation_safe(reservation, semaphore, stats) ) tasks.append(task) except ExceptionGroup as eg: # Fail fast: cancel all remaining tasks and re-raise the first exception for task in tasks: if not task.done(): task.cancel() # Re-raise the first exception from the group if eg.exceptions: raise eg.exceptions[0] from eg raise # Collect results from all tasks for task in tasks: pms_id = task.result() if pms_id is not None: results.append(pms_id) return results async def _process_reservation_safe( self, reservation_elem: Any, semaphore: asyncio.Semaphore, stats: dict[str, int], ) -> str | None: """Safely process a single reservation with semaphore and transaction management. Phase 1: Creation/update only (no matching). In concurrent mode, creates its own session from SessionMaker. In sequential mode, uses the shared session. Args: reservation_elem: XML element for the reservation semaphore: Semaphore to limit concurrent operations stats: Shared stats dictionary (thread-safe due to GIL) Returns: pms_reservation_id if successfully created/updated, None if error occurred """ pms_reservation_id = int(reservation_elem.get("id")) async with semaphore: # In concurrent mode, create a new session for this task if self.session_maker: session = await self.session_maker.create_session() else: session = self.session try: # Phase 1: Create/update conversion record (no matching) reservation_stats = await self._create_or_update_conversion( reservation_elem, session ) stats["total_daily_sales"] += reservation_stats["daily_sales_count"] # Commit this task's transaction await session.commit() _LOGGER.debug( "Successfully created/updated conversion for reservation %s", pms_reservation_id, ) return pms_reservation_id except Exception as e: # Rollback this task's transaction await session.rollback() _LOGGER.exception( "Error processing reservation %s: %s", pms_reservation_id, e, ) stats["errors"] += 1 return None finally: # Close the session if it was created by SessionMaker if self.session_maker: await session.close() async def _handle_deleted_reservation( self, pms_reservation_id: int, session: AsyncSession ): """Handle deleted reservation by marking conversions as deleted or removing them. Args: pms_reservation_id: PMS reservation ID to delete session: AsyncSession to use for the operation """ if not self.hotel_id: _LOGGER.error( "Cannot delete reservation: hotel_id not set in ConversionService" ) return _LOGGER.info( "Processing deleted reservation: Hotel %s, PMS ID %s", self.hotel_id, pms_reservation_id, ) # Delete conversion records for this hotel + pms_reservation_id result = await session.execute( select(Conversion).where( Conversion.hotel_id == self.hotel_id, Conversion.pms_reservation_id == pms_reservation_id, ) ) conversions = result.scalars().all() for conversion in conversions: await session.delete(conversion) if conversions: _LOGGER.info( "Deleted %d conversion records for hotel %s, PMS reservation %s", len(conversions), self.hotel_id, pms_reservation_id, ) async def _create_or_update_conversion( self, reservation_elem: ET.Element, session: AsyncSession | None = None ) -> dict[str, int]: """Create or update a conversion record from XML (Phase 1: no matching). Args: reservation_elem: XML element to process session: AsyncSession to use. If None, uses self.session. In concurrent mode, each task passes its own session. Returns statistics about daily sales processed. """ if session is None: session = self.session stats = { "daily_sales_count": 0, } parsed_reservation = self._parse_reservation_element(reservation_elem) if not parsed_reservation: return stats hotel_id = parsed_reservation.hotel_id pms_reservation_id = parsed_reservation.pms_reservation_id # ConversionGuests have already been bulk-upserted in Phase 1, # so we can safely create/update conversions now # Check if conversion already exists (upsert logic) existing_result = await session.execute( select(Conversion).where( Conversion.hotel_id == hotel_id, Conversion.pms_reservation_id == pms_reservation_id, ) ) existing_conversion = existing_result.scalar_one_or_none() if existing_conversion: # Update existing conversion - only update reservation metadata and advertising data # Guest info is stored in ConversionGuest table, not here # Don't clear reservation/customer links (matching logic will update if needed) existing_conversion.reservation_number = ( parsed_reservation.reservation_number ) existing_conversion.reservation_date = parsed_reservation.reservation_date existing_conversion.creation_time = parsed_reservation.creation_time existing_conversion.reservation_type = parsed_reservation.reservation_type existing_conversion.booking_channel = parsed_reservation.booking_channel existing_conversion.advertising_medium = ( parsed_reservation.advertising_medium ) existing_conversion.advertising_partner = ( parsed_reservation.advertising_partner ) existing_conversion.advertising_campagne = ( parsed_reservation.advertising_campagne ) existing_conversion.updated_at = datetime.now() conversion = existing_conversion _LOGGER.debug( "Updated conversion %s (pms_id=%s)", conversion.id, pms_reservation_id, ) else: # Create new conversion entry (without matching - will be done later) # Note: Guest information (first_name, last_name, email, etc) is stored in ConversionGuest table conversion_data = ConversionData( # Links to existing entities (nullable, will be filled in after matching) # Reservation metadata hotel_id=hotel_id, guest_id=parsed_reservation.guest_id, # Links to ConversionGuest pms_reservation_id=pms_reservation_id, reservation_number=parsed_reservation.reservation_number, reservation_date=parsed_reservation.reservation_date, creation_time=parsed_reservation.creation_time, reservation_type=parsed_reservation.reservation_type, booking_channel=parsed_reservation.booking_channel, # Advertising data advertising_medium=parsed_reservation.advertising_medium, advertising_partner=parsed_reservation.advertising_partner, advertising_campagne=parsed_reservation.advertising_campagne, # Metadata ) conversion = Conversion(**conversion_data.model_dump()) session.add(conversion) _LOGGER.debug( "Created conversion (pms_id=%s)", pms_reservation_id, ) # Flush to ensure conversion has an ID before creating room reservations await session.flush() # Fetch ALL existing rooms for this conversion (not just the ones in current XML) existing_rooms_result = await session.execute( select(ConversionRoom).where(ConversionRoom.conversion_id == conversion.id) ) existing_rooms = { room.pms_hotel_reservation_id: room for room in existing_rooms_result.scalars().all() } # Track which room IDs are present in the current XML current_pms_hotel_reservation_ids = set() # Process room reservations for room_reservation in parsed_reservation.room_reservations: current_pms_hotel_reservation_ids.add( room_reservation.pms_hotel_reservation_id ) stats["daily_sales_count"] += room_reservation.daily_sales_count # Check if room reservation already exists using batch-loaded data existing_room_reservation = existing_rooms.get( room_reservation.pms_hotel_reservation_id ) if existing_room_reservation: # Update existing room reservation with all fields existing_room_reservation.arrival_date = room_reservation.arrival_date existing_room_reservation.departure_date = ( room_reservation.departure_date ) existing_room_reservation.room_status = room_reservation.room_status existing_room_reservation.room_type = room_reservation.room_type existing_room_reservation.num_adults = room_reservation.num_adults existing_room_reservation.rate_plan_code = ( room_reservation.rate_plan_code ) existing_room_reservation.connected_room_type = ( room_reservation.connected_room_type ) existing_room_reservation.daily_sales = ( room_reservation.daily_sales if room_reservation.daily_sales else None ) existing_room_reservation.total_revenue = room_reservation.total_revenue existing_room_reservation.updated_at = datetime.now() _LOGGER.debug( "Updated room reservation %s (pms_id=%s, room=%s)", existing_room_reservation.id, pms_reservation_id, room_reservation.room_number, ) else: # Create new room reservation room_reservation_record = ConversionRoom( conversion_id=conversion.id, pms_hotel_reservation_id=room_reservation.pms_hotel_reservation_id, arrival_date=room_reservation.arrival_date, departure_date=room_reservation.departure_date, room_status=room_reservation.room_status, room_type=room_reservation.room_type, room_number=room_reservation.room_number, num_adults=room_reservation.num_adults, rate_plan_code=room_reservation.rate_plan_code, connected_room_type=room_reservation.connected_room_type, daily_sales=( room_reservation.daily_sales if room_reservation.daily_sales else None ), total_revenue=room_reservation.total_revenue, created_at=datetime.now(), updated_at=datetime.now(), ) session.add(room_reservation_record) _LOGGER.debug( "Created room reservation (pms_id=%s, room=%s, adults=%s)", pms_reservation_id, room_reservation.room_number, room_reservation.num_adults, ) # Delete room entries that are no longer present in the current XML # This handles cases where a reservation is updated and room numbers change rooms_to_delete = [ room for pms_id, room in existing_rooms.items() if pms_id not in current_pms_hotel_reservation_ids ] if rooms_to_delete: for room in rooms_to_delete: await session.delete(room) _LOGGER.debug( "Deleted room reservation %s (pms_id=%s, room=%s) - no longer in current XML", room.id, room.pms_hotel_reservation_id, room.room_number, ) return stats async def _match_by_advertising( self, advertising_campagne: str, hotel_id: str | None, guest_first_name: str | None, guest_last_name: str | None, guest_email: str | None, advertising_partner: str | None, session: AsyncSession | None = None, ) -> Reservation | None: """Match reservation by advertising tracking data (fbclid/gclid/md5_unique_id). Args: advertising_campagne: Tracking ID from PMS (could be truncated click_id or md5_unique_id) hotel_id: Hotel ID for filtering guest_first_name: Guest first name for disambiguation guest_last_name: Guest last name for disambiguation guest_email: Guest email for disambiguation advertising_partner: Partner info (matches utm_medium) session: AsyncSession to use. If None, uses self.session. Returns: Matched Reservation or None """ if session is None: session = self.session # Find reservations where: # - fbclid/gclid starts with the advertising_campagne value, OR # - md5_unique_id matches exactly (for direct ID matching) query = select(Reservation).where( or_( Reservation.fbclid.like(f"{advertising_campagne}%"), Reservation.gclid.like(f"{advertising_campagne}%"), Reservation.md5_unique_id == advertising_campagne, ) ) # Eagerly load the customer relationship query = query.options(selectinload(Reservation.customer)) # Add hotel filter if available if hotel_id: query = query.where(Reservation.hotel_code == hotel_id) # Execute query db_result = await session.execute(query) reservations = db_result.scalars().all() if not reservations: return None # If single match, return it if len(reservations) == 1: return reservations[0] # If multiple matches, try to narrow down using guest details _LOGGER.debug( "Multiple reservations match advertisingCampagne %s (hotel=%s): found %d matches. " "Attempting to narrow down using guest details.", advertising_campagne, hotel_id, len(reservations), ) matched_reservation = self._filter_reservations_by_guest_details( reservations, guest_first_name, guest_last_name, guest_email, advertising_partner, ) if matched_reservation is None: # If we still can't narrow it down, use the first match and log warning _LOGGER.warning( "Could not narrow down multiple reservations for advertisingCampagne %s " "(hotel=%s, guest=%s %s, email=%s). Using first match.", advertising_campagne, hotel_id, guest_first_name, guest_last_name, guest_email, ) matched_reservation = reservations[0] return matched_reservation async def _match_by_guest_details_hashed( self, guest_first_name: str | None, guest_last_name: str | None, guest_email: str | None, session: AsyncSession | None = None, ) -> HashedCustomer | None: """Match guest by name and email directly to HashedCustomer (no Reservation needed). This method bypasses the Reservation table entirely and matches directly against hashed customer data. Used for guest-detail matching where we don't need to link to a specific reservation. Args: guest_first_name: Guest first name (pre-hashed) guest_last_name: Guest last name (pre-hashed) guest_email: Guest email (pre-hashed) session: AsyncSession to use. If None, uses self.session. Returns: Matched HashedCustomer or None """ if session is None: session = self.session # Query all hashed customers that match the guest details query = select(HashedCustomer).options(selectinload(HashedCustomer.customer)) # Build filter conditions conditions = [] if guest_email: conditions.append(HashedCustomer.hashed_email == guest_email) if guest_first_name and guest_last_name: conditions.append( (HashedCustomer.hashed_given_name == guest_first_name) & (HashedCustomer.hashed_surname == guest_last_name) ) if not conditions: return None # Combine conditions with OR (match if email matches OR name matches) query = query.where(or_(*conditions)) db_result = await session.execute(query) matches = db_result.scalars().all() if not matches: return None # If single match, return it if len(matches) == 1: return matches[0] # If multiple matches, prefer email match over name match for match in matches: if guest_email and match.hashed_email == guest_email: _LOGGER.debug( "Multiple hashed customer matches, preferring email match" ) return match # Otherwise return first match _LOGGER.warning( "Multiple hashed customer matches found for guest details, using first match" ) return matches[0] def _filter_reservations_by_guest_details( self, reservations: list[Reservation], guest_first_name: str | None, guest_last_name: str | None, guest_email: str | None, advertising_partner: str | None, ) -> Reservation | None: """Filter reservations using guest details to find a single match. First tries to match by guest name and email. If that doesn't yield a single match, tries matching by advertising_partner against utm_medium. Args: reservations: List of candidate reservations guest_first_name: Guest first name guest_last_name: Guest last name guest_email: Guest email advertising_partner: Partner info (e.g., "Facebook_Mobile_Feed") Returns: Single best-match Reservation, or None if no good match found """ candidates = reservations # Try to narrow down by guest name and email if guest_first_name or guest_last_name or guest_email: # First try exact match on all available fields for reservation in candidates: customer = reservation.customer if customer: name_match = True email_match = True if guest_first_name: name_match = name_match and ( customer.given_name and customer.given_name.lower() == guest_first_name.lower() ) if guest_last_name: name_match = name_match and ( customer.surname and customer.surname.lower() == guest_last_name.lower() ) if guest_email: email_match = ( customer.email_address and customer.email_address.lower() == guest_email.lower() ) if name_match and email_match: _LOGGER.debug( "Found exact match on guest name/email for %s %s", guest_first_name, guest_last_name, ) return reservation # Try to narrow down by advertising_partner matching utm_medium if advertising_partner: for reservation in candidates: if ( reservation.utm_medium and reservation.utm_medium.lower() == advertising_partner.lower() ): _LOGGER.debug( "Found match on advertising_partner=%s matching utm_medium", advertising_partner, ) return reservation # No single clear match found return None async def _extract_unmatched_guests( self, session: AsyncSession ) -> dict[str, HashedCustomer]: """Phase 3b: Extract unique guests from unmatched conversions and match them to customers. Returns a mapping of guest_id -> HashedCustomer for all unique guests found in unmatched conversions. Only processes each guest once. This includes: 1. Conversions with no customer match at all (reservation_id IS NULL AND customer_id IS NULL) 2. Conversions matched to a customer but not a reservation (reservation_id IS NULL AND customer_id IS NOT NULL) - These may have been matched in a previous run and need re-evaluation for reservation linking Args: session: AsyncSession for database queries Returns: Dictionary mapping guest_id to matched HashedCustomer (or None if no match) """ # Find all conversions that either: # - Have no match at all (reservation_id IS NULL AND customer_id IS NULL), OR # - Have a customer but no reservation (for re-linking in case new reservations were added) result = await session.execute( select(Conversion) .where( (Conversion.reservation_id.is_(None)) & (Conversion.guest_id.isnot(None)) ) .options(selectinload(Conversion.guest)) ) unmatched_conversions = result.scalars().all() if not unmatched_conversions: _LOGGER.debug("Phase 3b: No unmatched conversions with guests") return {} # Extract unique guests (by guest_id) unique_guests: dict[str, Conversion] = {} for conversion in unmatched_conversions: if conversion.guest_id not in unique_guests: unique_guests[conversion.guest_id] = conversion _LOGGER.info( "Phase 3b: Found %d unique guests from %d unmatched conversions", len(unique_guests), len(unmatched_conversions), ) # Match each unique guest to a hashed customer guest_to_hashed_customer: dict[str, HashedCustomer] = {} for guest_id, conversion in unique_guests.items(): conversion_guest = conversion.guest if not conversion_guest: continue # Try to match by guest details matched_hashed_customer = await self._match_by_guest_details_hashed( conversion_guest.hashed_first_name, conversion_guest.hashed_last_name, conversion_guest.hashed_email, session, ) if matched_hashed_customer: guest_to_hashed_customer[guest_id] = matched_hashed_customer _LOGGER.debug( "Phase 3b: Matched guest %s to hashed_customer %d", guest_id, matched_hashed_customer.id, ) else: guest_to_hashed_customer[guest_id] = None _LOGGER.debug("Phase 3b: No match found for guest %s", guest_id) return guest_to_hashed_customer async def _link_matched_guests_to_reservations( self, guest_to_hashed_customer: dict[str, HashedCustomer], session: AsyncSession, stats: dict[str, int], ) -> None: """Phase 3c: Link conversions from matched guests to reservations based on dates. For each guest matched to a hashed_customer, find all conversions from that guest that don't have a reservation yet, and try to link them to reservations based on date matching. This includes: 1. Conversions with no customer match (will link customer first) 2. Conversions already linked to a customer from a previous run (will try to link to reservation) After all conversions for a guest are processed, check if the guest is a regular by looking at whether they have paying conversions that predate any reservations. Args: guest_to_hashed_customer: Mapping from guest_id to matched HashedCustomer session: AsyncSession for database queries stats: Shared stats dictionary to update """ for guest_id, matched_hashed_customer in guest_to_hashed_customer.items(): if not matched_hashed_customer or not matched_hashed_customer.customer_id: continue # Find all conversions from this guest that don't have a reservation # (whether or not they have a customer match - we might be re-running after new reservations added) result = await session.execute( select(Conversion) .where( (Conversion.guest_id == guest_id) & (Conversion.reservation_id.is_(None)) ) .options( selectinload(Conversion.conversion_rooms), selectinload(Conversion.guest), ) ) conversions = result.scalars().all() if not conversions: continue _LOGGER.debug( "Phase 3c: Processing %d conversions for guest %s (customer_id=%d)", len(conversions), guest_id, matched_hashed_customer.customer_id, ) # Try to link each conversion to a reservation for this customer for conversion in conversions: ( matched_reservation, is_attributable, ) = await self._check_if_attributable( matched_hashed_customer.customer_id, conversion, session ) if matched_reservation and is_attributable: # Only update stats if this is a NEW match (wasn't matched before) was_previously_matched = conversion.customer_id is not None conversion.reservation_id = matched_reservation.id conversion.customer_id = matched_hashed_customer.customer_id conversion.hashed_customer_id = matched_hashed_customer.id conversion.directly_attributable = True conversion.guest_matched = True conversion.updated_at = datetime.now() if not was_previously_matched: stats["matched_to_reservation"] += 1 _LOGGER.info( "Phase 3c: Linked conversion (pms_id=%s) to reservation %d via guest matching", conversion.pms_reservation_id, matched_reservation.id, ) elif matched_hashed_customer and conversion.customer_id is None: # Only count new customer matches (conversions that didn't have a customer before) conversion.customer_id = matched_hashed_customer.customer_id conversion.hashed_customer_id = matched_hashed_customer.id conversion.directly_attributable = False conversion.guest_matched = True conversion.updated_at = datetime.now() stats["matched_to_customer"] += 1 # After all conversions for this guest are processed, check if guest is regular # Look at ALL conversions from this guest to see if there are pre-dated payments if conversions and conversions[0].guest: await self._check_if_guest_is_regular( guest_id, matched_hashed_customer.customer_id, session ) async def _check_regularity_for_all_matched_guests( self, session: AsyncSession ) -> None: """Phase 3d: Check regularity for ALL matched guests (both ID-matched and guest-detail-matched). This is called after all matching is complete to evaluate every guest that has been matched to a customer, regardless of match type. This ensures consistent regularity evaluation across all matched conversions. This is run on ALL matched guests, not just newly matched ones, to ensure that if the regularity logic changes, it gets re-applied to all guests on the next run. This maintains idempotency of the matching process. Args: session: AsyncSession for database queries """ # Get all ConversionGuests that have ANY customer link # This includes: # 1. Guests matched via guest-details (hashed_customer_id is not null) # 2. Guests matched via ID-based matching (customer_id is not null via conversion) result = await session.execute( select(ConversionGuest).where( ConversionGuest.hashed_customer_id.isnot(None) ) ) matched_guests = result.scalars().all() if not matched_guests: _LOGGER.debug("Phase 3d: No matched guests to check for regularity") return _LOGGER.debug( "Phase 3d: Checking regularity for %d matched guests", len(matched_guests) ) for conversion_guest in matched_guests: if not conversion_guest.hashed_customer_id: continue # Get the customer ID from the hashed_customer hashed_customer_result = await session.execute( select(HashedCustomer).where( HashedCustomer.id == conversion_guest.hashed_customer_id ) ) hashed_customer = hashed_customer_result.scalar_one_or_none() if hashed_customer and hashed_customer.customer_id: await self._check_if_guest_is_regular( conversion_guest.guest_id, hashed_customer.customer_id, session ) async def _match_conversions_from_db_sequential( self, pms_reservation_ids: list[str], stats: dict[str, int] ) -> None: """Phase 3a: Match conversions sequentially using database data only. Performs ID-based matching for all conversions. Then extracts unique guests and performs guest detail matching in phases 3b and 3c. """ semaphore = asyncio.Semaphore(1) # Process one at a time async with asyncio.TaskGroup() as tg: for pms_id in pms_reservation_ids: tg.create_task( self._match_conversion_from_db_safe(pms_id, semaphore, stats) ) # After Phase 3a (ID-based matching), do Phase 3b and 3c (guest detail matching) if self.session_maker: session = await self.session_maker.create_session() else: session = self.session try: # Phase 3b: Extract and cache unique guests from unmatched conversions guest_to_hashed_customer = await self._extract_unmatched_guests(session) # Phase 3c: Link matched guests to reservations if guest_to_hashed_customer: await self._link_matched_guests_to_reservations( guest_to_hashed_customer, session, stats ) # Phase 3d: Check regularity for all matched guests (both ID and guest-detail matched) await self._check_regularity_for_all_matched_guests(session) await session.commit() except Exception as e: await session.rollback() _LOGGER.exception("Error in Phase 3b/3c/3d guest matching: %s", e) finally: if self.session_maker: await session.close() async def _match_conversions_from_db_concurrent( self, pms_reservation_ids: list[str], stats: dict[str, int] ) -> None: """Phase 3a: Match conversions concurrently using database data only. Performs ID-based matching for all conversions concurrently. Then extracts unique guests and performs guest detail matching sequentially in phases 3b and 3c. Each concurrent task gets its own independent database session from the SessionMaker. """ if not self.session_maker: _LOGGER.error( "Concurrent matching requested but SessionMaker not available. " "Falling back to sequential matching." ) await self._match_conversions_from_db_sequential(pms_reservation_ids, stats) return semaphore = asyncio.Semaphore(MAX_CONCURRENT_RESERVATIONS) async with asyncio.TaskGroup() as tg: for pms_id in pms_reservation_ids: tg.create_task( self._match_conversion_from_db_safe(pms_id, semaphore, stats) ) # After Phase 3a (ID-based matching), do Phase 3b and 3c (guest detail matching) # Use sequential processing for guest matching to avoid duplicating work session = await self.session_maker.create_session() try: # Phase 3b: Extract and cache unique guests from unmatched conversions guest_to_hashed_customer = await self._extract_unmatched_guests(session) # Phase 3c: Link matched guests to reservations if guest_to_hashed_customer: await self._link_matched_guests_to_reservations( guest_to_hashed_customer, session, stats ) # Phase 3d: Check regularity for all matched guests (both ID and guest-detail matched) await self._check_regularity_for_all_matched_guests(session) await session.commit() except Exception as e: await session.rollback() _LOGGER.exception("Error in Phase 3b/3c/3d guest matching: %s", e) finally: await session.close() async def _match_conversion_from_db_safe( self, pms_reservation_id: int, semaphore: asyncio.Semaphore, stats: dict[str, int], ) -> None: """Phase 2: Safely match a conversion using only database data with transaction management. In concurrent mode, creates its own session from SessionMaker. In sequential mode, uses the shared session. Args: pms_reservation_id: PMS reservation ID to match semaphore: Semaphore to limit concurrent operations stats: Shared stats dictionary (thread-safe due to GIL) """ async with semaphore: # In concurrent mode, create a new session for this task if self.session_maker: session = await self.session_maker.create_session() else: session = self.session try: # Phase 2: Match conversion using only database data await self._match_conversion_using_db_data( pms_reservation_id, session, stats ) # Commit this task's transaction await session.commit() _LOGGER.debug( "Successfully matched conversion for reservation %s", pms_reservation_id, ) except Exception as e: # Rollback this task's transaction await session.rollback() _LOGGER.exception( "Error matching conversion for reservation %s: %s", pms_reservation_id, e, ) stats["errors"] += 1 finally: # Close the session if it was created by SessionMaker if self.session_maker: await session.close() async def _match_conversion_using_db_data( self, pms_reservation_id: int, session: AsyncSession | None = None, stats: dict[str, int] | None = None, ) -> None: """Phase 3a: Match a conversion using ID-based matching only. This method reads both the conversion and conversion_guest from the database and uses their stored hashed data to match to existing reservations via advertising data (fbclid/gclid/md5_unique_id). Guest detail matching is deferred to Phase 3b/3c to avoid reprocessing the same guest multiple times. Updates stats dictionary in-place if provided. Args: pms_reservation_id: PMS reservation ID to match session: AsyncSession to use stats: Shared stats dictionary to update (optional) """ if session is None: session = self.session if not self.hotel_id: _LOGGER.error( "Cannot match conversion: hotel_id not set in ConversionService" ) return # Get the conversion from the database with related data result = await session.execute( select(Conversion) .where( Conversion.hotel_id == self.hotel_id, Conversion.pms_reservation_id == pms_reservation_id, ) .options( selectinload(Conversion.guest), selectinload(Conversion.conversion_rooms), ) ) conversion = result.scalar_one_or_none() if not conversion: # This should be extremely rare since we filtered in process_conversion_xml _LOGGER.debug( "Conversion not found for pms_reservation_id=%s (may have been deleted)", pms_reservation_id, ) return # Get conversion_guest if it exists (has the hashed data) conversion_guest = conversion.guest # Extract hashed data from conversion_guest (already hashed) hashed_first_name = None hashed_last_name = None hashed_email = None if conversion_guest: hashed_first_name = conversion_guest.hashed_first_name hashed_last_name = conversion_guest.hashed_last_name hashed_email = conversion_guest.hashed_email # Phase 3a: Only try ID-based matching (fbclid/gclid/md5_unique_id) # Guest detail matching is deferred to Phase 3b/3c matched_reservation = None matched_customer = None matched_hashed_customer = None if conversion.advertising_campagne: matched_reservation = await self._match_by_advertising( conversion.advertising_campagne, conversion.hotel_id, hashed_first_name, hashed_last_name, hashed_email, conversion.advertising_partner, session, ) if matched_reservation: matched_customer = matched_reservation.customer if matched_customer and matched_customer.hashed_version: matched_hashed_customer = matched_customer.hashed_version _LOGGER.info( "Phase 3a: Matched conversion by advertising ID (pms_id=%s, reservation_id=%d)", pms_reservation_id, matched_reservation.id, ) # Update the conversion with matched entities if found if matched_reservation or matched_customer or matched_hashed_customer: conversion.reservation_id = ( matched_reservation.id if matched_reservation else None ) conversion.customer_id = matched_customer.id if matched_customer else None conversion.hashed_customer_id = ( matched_hashed_customer.id if matched_hashed_customer else None ) # ID-based matches are always directly attributable conversion.directly_attributable = True conversion.guest_matched = False # Update conversion_guest with hashed_customer reference if matched if conversion_guest and matched_hashed_customer: conversion_guest.hashed_customer_id = matched_hashed_customer.id conversion.updated_at = datetime.now() # Update stats if provided if stats is not None: if matched_reservation: stats["matched_to_reservation"] += 1 elif matched_customer: stats["matched_to_customer"] += 1 elif matched_hashed_customer: stats["matched_to_hashed_customer"] += 1 else: stats["unmatched"] += 1 async def _check_if_guest_is_regular( self, guest_id: str, customer_id: int, session: AsyncSession, ) -> None: """Check if a guest is a regular customer based on conversion and reservation history. A guest is regular if they have conversions with paying bookings that predate their first reservation sent by us. This indicates they were already a customer before we started tracking. This check is done AFTER all conversions for a guest have been processed and matched, so it can evaluate the complete picture of their payment history vs our reservation history. Args: guest_id: The guest ID to evaluate customer_id: The matched customer ID session: AsyncSession for database queries """ # Get the ConversionGuest record guest_result = await session.execute( select(ConversionGuest).where(ConversionGuest.guest_id == guest_id) ) conversion_guest = guest_result.scalar_one_or_none() if not conversion_guest: return # Find the earliest paying conversion for this guest (across all hotels) # Look for conversions with actual revenue earliest_paying_conversion_result = await session.execute( select(Conversion) .join(ConversionRoom, Conversion.id == ConversionRoom.conversion_id) .where( Conversion.guest_id == guest_id, ConversionRoom.total_revenue.isnot(None), ConversionRoom.total_revenue > Decimal(0), ) .order_by(Conversion.reservation_date.asc()) .limit(1) ) earliest_paying_conversion = ( earliest_paying_conversion_result.scalar_one_or_none() ) if not earliest_paying_conversion: # No paying conversions found for this guest conversion_guest.is_regular = False return # Find the earliest reservation sent to this customer earliest_reservation_result = await session.execute( select(Reservation) .where(Reservation.customer_id == customer_id) .order_by(Reservation.start_date.asc()) .limit(1) ) earliest_reservation = earliest_reservation_result.scalar_one_or_none() if not earliest_reservation: # No reservations for this customer yet, can't determine regularity conversion_guest.is_regular = False return # Guest is regular if their earliest paying conversion predates our first reservation # (meaning they were already a customer before we sent them a reservation) # Compare against the reservation's creation date (when WE created/sent it), not check-in date # Convert created_at to date for comparison with reservation_date (both are dates) is_regular = ( earliest_paying_conversion.reservation_date < earliest_reservation.created_at.date() ) conversion_guest.is_regular = is_regular if is_regular: _LOGGER.debug( "Marking guest %s as regular: earliest paying conversion %s predates first reservation created at %s", guest_id, earliest_paying_conversion.reservation_date, earliest_reservation.created_at, ) else: _LOGGER.debug( "Guest %s is not regular: first paying conversion %s is after/equal to first reservation created at %s", guest_id, earliest_paying_conversion.reservation_date, earliest_reservation.created_at, ) async def _check_if_attributable( self, matched_customer_id: int, conversion: Conversion, session: AsyncSession, exclude_reservation_id: int | None = None, ) -> tuple[Reservation | None, bool]: """Search for and check if a customer has an attributable reservation for this conversion. Finds reservations linked to the customer and checks if any have dates matching this conversion. A conversion is attributable ONLY if the conversion_room dates match a reservation's dates closely. Args: matched_customer_id: The customer ID to search reservations for conversion: The Conversion record being evaluated session: AsyncSession for database queries exclude_reservation_id: Reservation ID to exclude from search (if already matched) Returns: Tuple of (matched_reservation, is_attributable) where: - matched_reservation: The Reservation that matches (if any) - is_attributable: True if the reservation's dates match this conversion """ # Check if conversion_room dates exist (criterion for attributability) if not conversion.conversion_rooms: return None, False # Find reservations for this customer query = select(Reservation).where( Reservation.customer_id == matched_customer_id ) if exclude_reservation_id: query = query.where(Reservation.id != exclude_reservation_id) db_result = await session.execute(query) reservations = db_result.scalars().all() if not reservations: return None, False # Check each reservation for date match for reservation in reservations: for room in conversion.conversion_rooms: if ( room.arrival_date and room.departure_date and reservation.start_date and reservation.end_date ): # Check if dates match or mostly match (within 7 day tolerance) arrival_match = ( abs((room.arrival_date - reservation.start_date).days) <= 7 ) departure_match = ( abs((room.departure_date - reservation.end_date).days) <= 7 ) if arrival_match and departure_match: _LOGGER.info( "Found attributable reservation for customer %d: " "room dates %s-%s match reservation dates %s-%s", matched_customer_id, room.arrival_date, room.departure_date, reservation.start_date, reservation.end_date, ) return reservation, True return None, False