Files
alpinebits_python/src/alpine_bits_python/conversion_service.py
Jonas Linter 3819b2bc95 Cleanup
2025-11-19 19:35:36 +01:00

1733 lines
69 KiB
Python

"""Service for handling conversion data from hotel PMS XML files."""
import asyncio
import hashlib
import xml.etree.ElementTree as ET
from datetime import UTC, datetime
from decimal import Decimal
from typing import Any
from sqlalchemy import insert, or_, select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from .db import (
Conversion,
ConversionGuest,
ConversionRoom,
Customer,
HashedCustomer,
Reservation,
SessionMaker,
)
from .logging_config import get_logger
_LOGGER = get_logger(__name__)
# Limit concurrent reservation processing to avoid overwhelming the database
MAX_CONCURRENT_RESERVATIONS = 10
class ConversionService:
"""Service for processing and storing conversion/daily sales data.
Supports two modes of operation:
1. Sequential mode: Single AsyncSession passed in, uses sequential processing
2. Concurrent mode: SessionMaker passed in, creates independent sessions per task
"""
def __init__(self, session: AsyncSession | SessionMaker | None = None):
"""Initialize the ConversionService.
Args:
session: Can be either:
- AsyncSession: Single session for sequential processing
- SessionMaker: Factory for creating sessions in concurrent mode
- None: Not recommended, but allowed for subclassing
"""
self.session = None
self.session_maker = None
self.supports_concurrent = False
# Cache for reservation and customer data within a single XML processing run
# Maps hotel_code -> list of (reservation, hashed_customer) tuples
# This significantly speeds up matching when processing large XML files
# Uses hashed data for matching to preserve privacy
self._reservation_cache: dict[
str | None, list[tuple[Reservation, HashedCustomer | None]]
] = {}
self._cache_initialized = False
if isinstance(session, SessionMaker):
self.session_maker = session
self.supports_concurrent = True
_LOGGER.info(
"ConversionService initialized in concurrent mode with SessionMaker"
)
elif isinstance(session, AsyncSession):
self.session = session
self.supports_concurrent = False
_LOGGER.info(
"ConversionService initialized in sequential mode with single session"
)
elif session is not None:
raise TypeError(
f"session must be AsyncSession or SessionMaker, got {type(session)}"
)
async def _extract_unique_guests_from_xml(
self, reservations: list
) -> dict[tuple[str, str | None], dict]:
"""Extract and deduplicate all guest data from XML reservations.
Phase 0: Single pass through XML to collect all unique guests.
Uses (hotel_id, guest_id) as the key for deduplication.
Args:
reservations: List of XML reservation elements
Returns:
Dictionary mapping (hotel_id, guest_id) to guest data dict
"""
guest_data_by_key = {}
now = datetime.now(UTC)
for reservation_elem in reservations:
hotel_id = reservation_elem.get("hotelID")
guest_elem = reservation_elem.find("guest")
if guest_elem is None:
_LOGGER.debug("No guest element found, skipping reservation %s (will be created with guest_id=None in Phase 2)", reservation_elem.get("id"))
continue
guest_id = guest_elem.get("id")
guest_first_name = guest_elem.get("firstName")
guest_last_name = guest_elem.get("lastName")
guest_email = guest_elem.get("email")
guest_country_code = guest_elem.get("countryCode")
guest_birth_date_str = guest_elem.get("dateOfBirth")
guest_birth_date = None
if guest_birth_date_str:
try:
guest_birth_date = datetime.strptime(guest_birth_date_str, "%Y-%m-%d").date()
except ValueError:
_LOGGER.warning("Invalid birth date format: %s", guest_birth_date_str)
key = (hotel_id, guest_id)
# Store guest data by key (will keep the last occurrence from XML)
guest_data_by_key[key] = {
"hotel_id": hotel_id,
"guest_id": guest_id,
"guest_first_name": guest_first_name,
"guest_last_name": guest_last_name,
"guest_email": guest_email,
"guest_country_code": guest_country_code,
"guest_birth_date": guest_birth_date,
"now": now,
}
return guest_data_by_key
async def _bulk_upsert_guests(
self, session: AsyncSession, guest_data_by_key: dict[tuple[str, str | None], dict]
) -> None:
"""Bulk upsert all unique guests to database using PostgreSQL ON CONFLICT.
Phase 1: Atomic upsert of all guests extracted in Phase 0.
Uses ON CONFLICT DO UPDATE to handle duplicates safely without race conditions.
Processes in batches to avoid overwhelming the database with large XML files.
Args:
session: AsyncSession to use
guest_data_by_key: Dictionary mapping (hotel_id, guest_id) to guest data
"""
if not guest_data_by_key:
return
# Process in batches to avoid overwhelming the database
batch_size = 1000
items = list(guest_data_by_key.items())
for batch_start in range(0, len(items), batch_size):
batch_end = min(batch_start + batch_size, len(items))
batch_items = items[batch_start:batch_end]
# Prepare list of values for this batch
values_list = []
for (hotel_id, guest_id), guest_data in batch_items:
now = guest_data["now"]
values_list.append({
"hotel_id": guest_data["hotel_id"],
"guest_id": guest_data["guest_id"],
"guest_first_name": guest_data["guest_first_name"],
"guest_last_name": guest_data["guest_last_name"],
"guest_email": guest_data["guest_email"],
"guest_country_code": guest_data["guest_country_code"],
"guest_birth_date": guest_data["guest_birth_date"],
"hashed_first_name": ConversionGuest._normalize_and_hash(guest_data["guest_first_name"]),
"hashed_last_name": ConversionGuest._normalize_and_hash(guest_data["guest_last_name"]),
"hashed_email": ConversionGuest._normalize_and_hash(guest_data["guest_email"]),
"hashed_country_code": ConversionGuest._normalize_and_hash(guest_data["guest_country_code"]),
"hashed_birth_date": ConversionGuest._normalize_and_hash(
guest_data["guest_birth_date"].isoformat() if guest_data["guest_birth_date"] else None
),
"first_seen": now,
"last_seen": now,
})
# Use PostgreSQL ON CONFLICT DO UPDATE for atomic upsert
stmt = pg_insert(ConversionGuest).values(values_list)
stmt = stmt.on_conflict_do_update(
index_elements=["hotel_id", "guest_id"],
set_={
"guest_first_name": stmt.excluded.guest_first_name,
"guest_last_name": stmt.excluded.guest_last_name,
"guest_email": stmt.excluded.guest_email,
"guest_country_code": stmt.excluded.guest_country_code,
"guest_birth_date": stmt.excluded.guest_birth_date,
"hashed_first_name": stmt.excluded.hashed_first_name,
"hashed_last_name": stmt.excluded.hashed_last_name,
"hashed_email": stmt.excluded.hashed_email,
"hashed_country_code": stmt.excluded.hashed_country_code,
"hashed_birth_date": stmt.excluded.hashed_birth_date,
"last_seen": stmt.excluded.last_seen,
},
)
await session.execute(stmt)
_LOGGER.info(
"Phase 1: Upserted batch %d-%d of %d guests",
batch_start + 1,
batch_end,
len(items),
)
async def process_conversion_xml(self, xml_content: str) -> dict[str, Any]:
"""Parse conversion XML and save daily sales data to database.
Args:
xml_content: XML string containing reservation and daily sales data
Returns:
Dictionary with processing statistics
"""
try:
root = ET.fromstring(xml_content)
except ET.ParseError as e:
_LOGGER.error("Failed to parse conversion XML: %s", e)
raise ValueError(f"Invalid XML content: {e}") from e
# Initialize cache for this XML processing run
await self._load_reservation_cache()
stats = {
"total_reservations": 0,
"deleted_reservations": 0,
"total_daily_sales": 0,
"matched_to_reservation": 0,
"matched_to_customer": 0,
"matched_to_hashed_customer": 0,
"unmatched": 0,
"errors": 0,
}
# Get a session for deleted reservations processing
if self.session_maker:
session = await self.session_maker.create_session()
else:
session = self.session
# Process deleted reservations
for deleted_res in root.findall("Deletedreservation"):
stats["deleted_reservations"] += 1
pms_reservation_id = deleted_res.get("ID")
try:
await self._handle_deleted_reservation(pms_reservation_id, session)
await session.commit()
except Exception as e:
await session.rollback()
_LOGGER.exception(
"Error deleting reservation %s: %s",
pms_reservation_id,
e,
)
stats["errors"] += 1
# Close session if created by SessionMaker
if self.session_maker:
await session.close()
# Process active reservations in four phases:
# Phase 0: Extract and deduplicate all guest data from XML
# Phase 1: Bulk upsert all unique guests to DB (no race conditions)
# Phase 2: Create/update all conversion records (ConversionGuests already exist)
# Phase 3: Match them to existing reservations/customers
reservations = root.findall("reservation")
stats["total_reservations"] = len(reservations)
if not reservations:
return stats
_LOGGER.info("Processing %d reservations in xml", len(reservations))
# Phase 0: Extract and deduplicate all guest data from XML
_LOGGER.debug("Phase 0: Extracting and deduplicating guest data from XML")
guest_data_by_key = await self._extract_unique_guests_from_xml(reservations)
_LOGGER.info(
"Phase 0: Extracted %d unique guests from %d reservations",
len(guest_data_by_key),
len(reservations),
)
# Phase 1: Bulk upsert all unique guests to database
if guest_data_by_key:
_LOGGER.debug("Phase 1: Bulk upserting %d unique guests to database", len(guest_data_by_key))
if self.session_maker:
session = await self.session_maker.create_session()
else:
session = self.session
try:
await self._bulk_upsert_guests(session, guest_data_by_key)
await session.commit()
_LOGGER.info("Phase 1: Successfully upserted %d guests", len(guest_data_by_key))
except Exception as e:
await session.rollback()
_LOGGER.exception("Phase 1: Error during bulk guest upsert: %s", e)
stats["errors"] += len(guest_data_by_key)
return stats
finally:
if self.session_maker:
await session.close()
# Phase 2: Create/update all conversions (no matching, ConversionGuests already exist)
# Returns list of successfully created pms_reservation_ids
_LOGGER.debug("Phase 2: Creating/updating conversions")
if self.supports_concurrent:
pms_reservation_ids = await self._process_reservations_concurrent(reservations, stats)
else:
pms_reservation_ids = await self._process_reservations_sequential(reservations, stats)
_LOGGER.debug(
"Phase 3: Found %d successfully created conversions out of %d total reservations",
len(pms_reservation_ids),
len(reservations),
)
# Phase 3: Match all conversions using database data only
# Only match conversions that actually exist in the database
# No XML parsing, no re-hashing - complete separation of concerns
# This also enables matching historical data that wasn't just created
if pms_reservation_ids:
_LOGGER.debug("Phase 3: Matching conversions to reservations/customers")
if self.supports_concurrent:
await self._match_conversions_from_db_concurrent(pms_reservation_ids, stats)
else:
await self._match_conversions_from_db_sequential(pms_reservation_ids, stats)
return stats
async def _load_reservation_cache(self) -> None:
"""Load all reservations and hashed customers into cache for fast matching.
This method is called once at the start of processing a large XML file.
It loads all reservations with their associated hashed customers into an in-memory
cache organized by hotel_code. This avoids repeated database queries during
matching operations and uses hashed data for privacy-preserving matching.
The cache structure:
- Key: hotel_code (str or None)
- Value: List of (reservation, hashed_customer) tuples
This is especially beneficial for large XML files with many reservations
where matching criteria is the same across multiple reservations.
"""
if self._cache_initialized:
_LOGGER.debug("Reservation cache already initialized, skipping reload")
return
# Get a session for loading the cache
if self.session_maker:
session = await self.session_maker.create_session()
close_session = True
else:
session = self.session
close_session = False
if not session:
_LOGGER.warning("No session available for cache loading")
return
try:
# Load all reservations with their hashed customers in one query
from sqlalchemy.orm import selectinload
query = select(Reservation).options(
selectinload(Reservation.customer).selectinload(Customer.hashed_version),
selectinload(Reservation.hashed_customer)
)
result = await session.execute(query)
reservations = result.scalars().all()
_LOGGER.info("Loaded %d reservations into cache", len(reservations))
# Organize by hotel_code for efficient lookup
for reservation in reservations:
hotel_code = reservation.hotel_code
if hotel_code not in self._reservation_cache:
self._reservation_cache[hotel_code] = []
# Cache the hashed customer - prefer direct relationship, fall back to customer relationship
hashed_customer = None
if reservation.hashed_customer:
hashed_customer = reservation.hashed_customer
elif reservation.customer and reservation.customer.hashed_version:
hashed_customer = reservation.customer.hashed_version
self._reservation_cache[hotel_code].append(
(reservation, hashed_customer)
)
self._cache_initialized = True
_LOGGER.info(
"Reservation cache initialized with %d hotel codes",
len(self._reservation_cache),
)
except Exception as e:
_LOGGER.error("Failed to load reservation cache: %s", e)
# Cache remains empty, fall back to direct queries
self._cache_initialized = True
finally:
# Close session if we created it
if close_session:
await session.close()
async def _process_reservations_sequential(
self, reservations: list, stats: dict[str, int]
) -> list[str]:
"""Process reservations one at a time (original behavior).
Returns:
List of pms_reservation_ids that were successfully created/updated
"""
semaphore = asyncio.Semaphore(1) # Process one at a time
results = []
tasks = []
try:
async with asyncio.TaskGroup() as tg:
for reservation in reservations:
task = tg.create_task(
self._process_reservation_safe(reservation, semaphore, stats)
)
tasks.append(task)
except ExceptionGroup as eg:
# Fail fast: cancel all remaining tasks and re-raise the first exception
for task in tasks:
if not task.done():
task.cancel()
# Re-raise the first exception from the group
if eg.exceptions:
raise eg.exceptions[0] from eg
raise
# Collect results from all tasks
for task in tasks:
pms_id = task.result()
if pms_id is not None:
results.append(pms_id)
return results
async def _process_reservations_concurrent(
self, reservations: list, stats: dict[str, int]
) -> list[str]:
"""Process reservations concurrently with semaphore limiting.
Each concurrent task gets its own independent database session
from the SessionMaker.
Returns:
List of pms_reservation_ids that were successfully created/updated
"""
if not self.session_maker:
_LOGGER.error(
"Concurrent processing requested but SessionMaker not available. "
"Falling back to sequential processing."
)
return await self._process_reservations_sequential(reservations, stats)
semaphore = asyncio.Semaphore(MAX_CONCURRENT_RESERVATIONS)
results = []
tasks = []
try:
async with asyncio.TaskGroup() as tg:
for reservation in reservations:
task = tg.create_task(
self._process_reservation_safe(reservation, semaphore, stats)
)
tasks.append(task)
except ExceptionGroup as eg:
# Fail fast: cancel all remaining tasks and re-raise the first exception
for task in tasks:
if not task.done():
task.cancel()
# Re-raise the first exception from the group
if eg.exceptions:
raise eg.exceptions[0] from eg
raise
# Collect results from all tasks
for task in tasks:
pms_id = task.result()
if pms_id is not None:
results.append(pms_id)
return results
async def _process_reservation_safe(
self,
reservation_elem: Any,
semaphore: asyncio.Semaphore,
stats: dict[str, int],
) -> str | None:
"""Safely process a single reservation with semaphore and transaction management.
Phase 1: Creation/update only (no matching).
In concurrent mode, creates its own session from SessionMaker.
In sequential mode, uses the shared session.
Args:
reservation_elem: XML element for the reservation
semaphore: Semaphore to limit concurrent operations
stats: Shared stats dictionary (thread-safe due to GIL)
Returns:
pms_reservation_id if successfully created/updated, None if error occurred
"""
pms_reservation_id = reservation_elem.get("id")
async with semaphore:
# In concurrent mode, create a new session for this task
if self.session_maker:
session = await self.session_maker.create_session()
else:
session = self.session
try:
# Phase 1: Create/update conversion record (no matching)
reservation_stats = await self._create_or_update_conversion(
reservation_elem, session
)
stats["total_daily_sales"] += reservation_stats["daily_sales_count"]
# Commit this task's transaction
await session.commit()
_LOGGER.debug(
"Successfully created/updated conversion for reservation %s",
pms_reservation_id,
)
return pms_reservation_id
except Exception as e:
# Rollback this task's transaction
await session.rollback()
_LOGGER.exception(
"Error processing reservation %s: %s",
pms_reservation_id,
e,
)
stats["errors"] += 1
return None
finally:
# Close the session if it was created by SessionMaker
if self.session_maker:
await session.close()
async def _handle_deleted_reservation(
self, pms_reservation_id: str, session: AsyncSession
):
"""Handle deleted reservation by marking conversions as deleted or removing them.
Args:
pms_reservation_id: PMS reservation ID to delete
session: AsyncSession to use for the operation
"""
# For now, we'll just log it. You could add a 'deleted' flag to the Conversion table
# or actually delete the conversion records
_LOGGER.info("Processing deleted reservation: PMS ID %s", pms_reservation_id)
# Option 1: Delete conversion records
result = await session.execute(
select(Conversion).where(
Conversion.pms_reservation_id == pms_reservation_id
)
)
conversions = result.scalars().all()
for conversion in conversions:
await session.delete(conversion)
if conversions:
_LOGGER.info(
"Deleted %d conversion records for PMS reservation %s",
len(conversions),
pms_reservation_id,
)
async def _create_or_update_conversion(
self, reservation_elem: ET.Element, session: AsyncSession | None = None
) -> dict[str, int]:
"""Create or update a conversion record from XML (Phase 1: no matching).
Args:
reservation_elem: XML element to process
session: AsyncSession to use. If None, uses self.session.
In concurrent mode, each task passes its own session.
Returns statistics about daily sales processed.
"""
if session is None:
session = self.session
stats = {
"daily_sales_count": 0,
}
# Extract reservation metadata
hotel_id = reservation_elem.get("hotelID")
pms_reservation_id = reservation_elem.get("id")
reservation_number = reservation_elem.get("number")
reservation_date_str = reservation_elem.get("date")
creation_time_str = reservation_elem.get("creationTime")
reservation_type = reservation_elem.get("type")
booking_channel = reservation_elem.get("bookingChannel")
# Extract guest information from guest element
guest_elem = reservation_elem.find("guest")
guest_first_name = None
guest_last_name = None
guest_email = None
guest_country_code = None
guest_birth_date_str = None
guest_id = None
if guest_elem is not None:
guest_first_name = guest_elem.get("firstName")
guest_last_name = guest_elem.get("lastName")
guest_email = guest_elem.get("email", None)
guest_country_code = guest_elem.get("countryCode", None)
guest_birth_date_str = guest_elem.get("dateOfBirth", None)
guest_id = guest_elem.get("id")
guest_birth_date = (
datetime.strptime(guest_birth_date_str, "%Y-%m-%d").date()
if guest_birth_date_str
else None
)
# Advertising/tracking data
advertising_medium = reservation_elem.get("advertisingMedium")
advertising_partner = reservation_elem.get("advertisingPartner")
advertising_campagne = reservation_elem.get("advertisingCampagne")
# Parse dates
reservation_date = None
if reservation_date_str:
try:
reservation_date = datetime.strptime(
reservation_date_str, "%Y-%m-%d"
).date()
except ValueError:
_LOGGER.warning(
"Invalid reservation date format: %s", reservation_date_str
)
creation_time = None
if creation_time_str:
try:
creation_time = datetime.fromisoformat(
creation_time_str.replace("Z", "+00:00")
)
except ValueError:
_LOGGER.warning("Invalid creation time format: %s", creation_time_str)
# Process all room reservations
room_reservations = reservation_elem.find("roomReservations")
if room_reservations is None:
_LOGGER.debug(
"No roomReservations found for reservation %s", pms_reservation_id
)
return stats
# ConversionGuests have already been bulk-upserted in Phase 1,
# so we can safely create/update conversions now
# Check if conversion already exists (upsert logic)
existing_result = await session.execute(
select(Conversion).where(
Conversion.pms_reservation_id == pms_reservation_id
)
)
existing_conversion = existing_result.scalar_one_or_none()
if existing_conversion:
# Update existing conversion - only update reservation metadata and advertising data
# Guest info is stored in ConversionGuest table, not here
# Don't clear reservation/customer links (matching logic will update if needed)
existing_conversion.reservation_number = reservation_number
existing_conversion.reservation_date = reservation_date
existing_conversion.creation_time = creation_time
existing_conversion.reservation_type = reservation_type
existing_conversion.booking_channel = booking_channel
existing_conversion.advertising_medium = advertising_medium
existing_conversion.advertising_partner = advertising_partner
existing_conversion.advertising_campagne = advertising_campagne
existing_conversion.updated_at = datetime.now()
conversion = existing_conversion
_LOGGER.info(
"Updated conversion %s (pms_id=%s)",
conversion.id,
pms_reservation_id,
)
else:
# Create new conversion entry (without matching - will be done later)
# Note: Guest information (first_name, last_name, email, etc) is stored in ConversionGuest table
conversion = Conversion(
# Links to existing entities (nullable, will be filled in after matching)
reservation_id=None,
customer_id=None,
hashed_customer_id=None,
# Reservation metadata
hotel_id=hotel_id,
guest_id=guest_id, # Links to ConversionGuest
pms_reservation_id=pms_reservation_id,
reservation_number=reservation_number,
reservation_date=reservation_date,
creation_time=creation_time,
reservation_type=reservation_type,
booking_channel=booking_channel,
# Advertising data
advertising_medium=advertising_medium,
advertising_partner=advertising_partner,
advertising_campagne=advertising_campagne,
# Metadata
created_at=datetime.now(),
updated_at=datetime.now(),
)
session.add(conversion)
_LOGGER.debug(
"Created conversion (pms_id=%s)",
pms_reservation_id,
)
# Flush to ensure conversion has an ID before creating room reservations
await session.flush()
# Batch-load existing room reservations to avoid N+1 queries
room_numbers = [
rm.get("roomNumber") for rm in room_reservations.findall("roomReservation")
]
pms_hotel_reservation_ids = [
f"{pms_reservation_id}_{room_num}" for room_num in room_numbers
]
existing_rooms_result = await session.execute(
select(ConversionRoom).where(
ConversionRoom.pms_hotel_reservation_id.in_(pms_hotel_reservation_ids)
)
)
existing_rooms = {
room.pms_hotel_reservation_id: room
for room in existing_rooms_result.scalars().all()
}
# Process room reservations
for room_reservation in room_reservations.findall("roomReservation"):
# Extract room reservation details
arrival_str = room_reservation.get("arrival")
departure_str = room_reservation.get("departure")
room_status = room_reservation.get("status")
room_type = room_reservation.get("roomType")
room_number = room_reservation.get("roomNumber")
adults_str = room_reservation.get("adults")
rate_plan_code = room_reservation.get("ratePlanCode")
connected_room_type = room_reservation.get("connectedRoomType")
arrival_date = None
if arrival_str:
try:
arrival_date = datetime.strptime(arrival_str, "%Y-%m-%d").date()
except ValueError:
_LOGGER.warning("Invalid arrival date format: %s", arrival_str)
departure_date = None
if departure_str:
try:
departure_date = datetime.strptime(departure_str, "%Y-%m-%d").date()
except ValueError:
_LOGGER.warning("Invalid departure date format: %s", departure_str)
num_adults = None
if adults_str:
try:
num_adults = int(adults_str)
except ValueError:
_LOGGER.warning("Invalid adults value: %s", adults_str)
# Create composite ID for upsert: pms_reservation_id + room_number
# This allows updating the same room reservation if it appears again
pms_hotel_reservation_id = f"{pms_reservation_id}_{room_number}"
# Process daily sales and extract total revenue
daily_sales_elem = room_reservation.find("dailySales")
daily_sales_list = []
total_revenue = Decimal(0)
if daily_sales_elem is not None:
for daily_sale in daily_sales_elem.findall("dailySale"):
stats["daily_sales_count"] += 1
# Extract daily sale data
sale_date_str = daily_sale.get("date")
daily_sale_obj = {}
if sale_date_str:
daily_sale_obj["date"] = sale_date_str
# Extract all revenue fields
revenue_total_str = daily_sale.get("revenueTotal")
if revenue_total_str:
daily_sale_obj["revenueTotal"] = revenue_total_str
try:
total_revenue += Decimal(revenue_total_str)
except (ValueError, TypeError):
_LOGGER.warning(
"Invalid revenueTotal value: %s", revenue_total_str
)
# Add other revenue fields if present
if daily_sale.get("revenueLogis"):
daily_sale_obj["revenueLogis"] = daily_sale.get("revenueLogis")
if daily_sale.get("revenueBoard"):
daily_sale_obj["revenueBoard"] = daily_sale.get("revenueBoard")
if daily_sale.get("revenueFB"):
daily_sale_obj["revenueFB"] = daily_sale.get("revenueFB")
if daily_sale.get("revenueSpa"):
daily_sale_obj["revenueSpa"] = daily_sale.get("revenueSpa")
if daily_sale.get("revenueOther"):
daily_sale_obj["revenueOther"] = daily_sale.get("revenueOther")
if daily_sale_obj: # Only add if has data
daily_sales_list.append(daily_sale_obj)
# Check if room reservation already exists using batch-loaded data
existing_room_reservation = existing_rooms.get(pms_hotel_reservation_id)
if existing_room_reservation:
# Update existing room reservation with all fields
existing_room_reservation.arrival_date = arrival_date
existing_room_reservation.departure_date = departure_date
existing_room_reservation.room_status = room_status
existing_room_reservation.room_type = room_type
existing_room_reservation.num_adults = num_adults
existing_room_reservation.rate_plan_code = rate_plan_code
existing_room_reservation.connected_room_type = connected_room_type
existing_room_reservation.daily_sales = (
daily_sales_list if daily_sales_list else None
)
existing_room_reservation.total_revenue = (
total_revenue if total_revenue > 0 else None
)
existing_room_reservation.updated_at = datetime.now()
_LOGGER.debug(
"Updated room reservation %s (pms_id=%s, room=%s)",
existing_room_reservation.id,
pms_reservation_id,
room_number,
)
else:
# Create new room reservation
room_reservation_record = ConversionRoom(
conversion_id=conversion.id,
pms_hotel_reservation_id=pms_hotel_reservation_id,
arrival_date=arrival_date,
departure_date=departure_date,
room_status=room_status,
room_type=room_type,
room_number=room_number,
num_adults=num_adults,
rate_plan_code=rate_plan_code,
connected_room_type=connected_room_type,
daily_sales=daily_sales_list if daily_sales_list else None,
total_revenue=total_revenue if total_revenue > 0 else None,
created_at=datetime.now(),
updated_at=datetime.now(),
)
session.add(room_reservation_record)
_LOGGER.debug(
"Created room reservation (pms_id=%s, room=%s, adults=%s)",
pms_reservation_id,
room_number,
num_adults,
)
return stats
async def _match_by_advertising(
self,
advertising_campagne: str,
hotel_id: str | None,
guest_first_name: str | None,
guest_last_name: str | None,
guest_email: str | None,
advertising_partner: str | None,
session: AsyncSession | None = None,
) -> Reservation | None:
"""Match reservation by advertising tracking data (fbclid/gclid/md5_unique_id).
Args:
advertising_campagne: Tracking ID from PMS (could be truncated click_id or md5_unique_id)
hotel_id: Hotel ID for filtering
guest_first_name: Guest first name for disambiguation
guest_last_name: Guest last name for disambiguation
guest_email: Guest email for disambiguation
advertising_partner: Partner info (matches utm_medium)
session: AsyncSession to use. If None, uses self.session.
Returns:
Matched Reservation or None
"""
if session is None:
session = self.session
# Find reservations where:
# - fbclid/gclid starts with the advertising_campagne value, OR
# - md5_unique_id matches exactly (for direct ID matching)
query = select(Reservation).where(
or_(
Reservation.fbclid.like(f"{advertising_campagne}%"),
Reservation.gclid.like(f"{advertising_campagne}%"),
Reservation.md5_unique_id == advertising_campagne,
)
)
# Eagerly load the customer relationship
query = query.options(selectinload(Reservation.customer))
# Add hotel filter if available
if hotel_id:
query = query.where(Reservation.hotel_code == hotel_id)
# Execute query
db_result = await session.execute(query)
reservations = db_result.scalars().all()
if not reservations:
return None
# If single match, return it
if len(reservations) == 1:
return reservations[0]
# If multiple matches, try to narrow down using guest details
_LOGGER.debug(
"Multiple reservations match advertisingCampagne %s (hotel=%s): found %d matches. "
"Attempting to narrow down using guest details.",
advertising_campagne,
hotel_id,
len(reservations),
)
matched_reservation = self._filter_reservations_by_guest_details(
reservations,
guest_first_name,
guest_last_name,
guest_email,
advertising_partner,
)
if matched_reservation is None:
# If we still can't narrow it down, use the first match and log warning
_LOGGER.warning(
"Could not narrow down multiple reservations for advertisingCampagne %s "
"(hotel=%s, guest=%s %s, email=%s). Using first match.",
advertising_campagne,
hotel_id,
guest_first_name,
guest_last_name,
guest_email,
)
matched_reservation = reservations[0]
return matched_reservation
async def _match_by_guest_details_hashed(
self,
guest_first_name: str | None,
guest_last_name: str | None,
guest_email: str | None,
session: AsyncSession | None = None,
) -> HashedCustomer | None:
"""Match guest by name and email directly to HashedCustomer (no Reservation needed).
This method bypasses the Reservation table entirely and matches directly against
hashed customer data. Used for guest-detail matching where we don't need to link
to a specific reservation.
Args:
guest_first_name: Guest first name (pre-hashed)
guest_last_name: Guest last name (pre-hashed)
guest_email: Guest email (pre-hashed)
session: AsyncSession to use. If None, uses self.session.
Returns:
Matched HashedCustomer or None
"""
if session is None:
session = self.session
# Query all hashed customers that match the guest details
query = select(HashedCustomer).options(
selectinload(HashedCustomer.customer)
)
# Build filter conditions
conditions = []
if guest_email:
conditions.append(HashedCustomer.hashed_email == guest_email)
if guest_first_name and guest_last_name:
conditions.append(
(HashedCustomer.hashed_given_name == guest_first_name)
& (HashedCustomer.hashed_surname == guest_last_name)
)
if not conditions:
return None
# Combine conditions with OR (match if email matches OR name matches)
query = query.where(or_(*conditions))
db_result = await session.execute(query)
matches = db_result.scalars().all()
if not matches:
return None
# If single match, return it
if len(matches) == 1:
return matches[0]
# If multiple matches, prefer email match over name match
for match in matches:
if guest_email and match.hashed_email == guest_email:
_LOGGER.debug(
"Multiple hashed customer matches, preferring email match"
)
return match
# Otherwise return first match
_LOGGER.warning(
"Multiple hashed customer matches found for guest details, using first match"
)
return matches[0]
async def _match_by_guest_details(
self,
hotel_id: str | None,
guest_first_name: str | None,
guest_last_name: str | None,
guest_email: str | None,
session: AsyncSession | None = None,
) -> Reservation | None:
"""Match reservation by guest name and email using cached data.
This method uses the reservation cache populated at the start of XML processing.
If cache is not available, falls back to database queries.
Args:
hotel_id: Hotel ID for filtering
guest_first_name: Guest first name
guest_last_name: Guest last name
guest_email: Guest email
session: AsyncSession to use. If None, uses self.session.
Returns:
Matched Reservation or None
"""
if session is None:
session = self.session
# Try to use cache first
if self._cache_initialized and self._reservation_cache:
all_reservations = []
# Get reservations from cache for this hotel
if hotel_id and hotel_id in self._reservation_cache:
# Extract reservations AND reattach their cached hashed_customer relationships
for reservation, hashed_customer in self._reservation_cache[hotel_id]:
if reservation.customer:
# Manually set the hashed_version from cache to ensure it's available
reservation.customer.hashed_version = hashed_customer
all_reservations.append(reservation)
elif not hotel_id:
# If no hotel_id specified, use all cached reservations
for reservations_list in self._reservation_cache.values():
for reservation, hashed_customer in reservations_list:
if reservation.customer:
# Manually set the hashed_version from cache to ensure it's available
reservation.customer.hashed_version = hashed_customer
all_reservations.append(reservation)
if all_reservations:
_LOGGER.debug(
"Using cached reservations for matching (hotel=%s, count=%d)",
hotel_id,
len(all_reservations),
)
return self._match_reservations_by_guest_details(
all_reservations,
guest_first_name,
guest_last_name,
guest_email,
)
# Fallback: Query database if cache is not available or empty
_LOGGER.debug(
"Cache unavailable or empty, falling back to database query (hotel=%s)",
hotel_id,
)
from sqlalchemy.orm import selectinload
query = select(Reservation).options(
selectinload(Reservation.customer).selectinload(Customer.hashed_version)
)
if hotel_id:
query = query.where(Reservation.hotel_code == hotel_id)
db_result = await session.execute(query)
all_reservations = db_result.scalars().all()
return self._match_reservations_by_guest_details(
all_reservations, guest_first_name, guest_last_name, guest_email
)
def _filter_reservations_by_guest_details(
self,
reservations: list[Reservation],
guest_first_name: str | None,
guest_last_name: str | None,
guest_email: str | None,
advertising_partner: str | None,
) -> Reservation | None:
"""Filter reservations using guest details to find a single match.
First tries to match by guest name and email. If that doesn't yield a single match,
tries matching by advertising_partner against utm_medium.
Args:
reservations: List of candidate reservations
guest_first_name: Guest first name
guest_last_name: Guest last name
guest_email: Guest email
advertising_partner: Partner info (e.g., "Facebook_Mobile_Feed")
Returns:
Single best-match Reservation, or None if no good match found
"""
candidates = reservations
# Try to narrow down by guest name and email
if guest_first_name or guest_last_name or guest_email:
# First try exact match on all available fields
for reservation in candidates:
customer = reservation.customer
if customer:
name_match = True
email_match = True
if guest_first_name:
name_match = name_match and (
customer.given_name
and customer.given_name.lower() == guest_first_name.lower()
)
if guest_last_name:
name_match = name_match and (
customer.surname
and customer.surname.lower() == guest_last_name.lower()
)
if guest_email:
email_match = (
customer.email_address
and customer.email_address.lower() == guest_email.lower()
)
if name_match and email_match:
_LOGGER.debug(
"Found exact match on guest name/email for %s %s",
guest_first_name,
guest_last_name,
)
return reservation
# Try to narrow down by advertising_partner matching utm_medium
if advertising_partner:
for reservation in candidates:
if (
reservation.utm_medium
and reservation.utm_medium.lower() == advertising_partner.lower()
):
_LOGGER.debug(
"Found match on advertising_partner=%s matching utm_medium",
advertising_partner,
)
return reservation
# No single clear match found
return None
async def _extract_unmatched_guests(
self, session: AsyncSession
) -> dict[str, HashedCustomer]:
"""Phase 3b: Extract unique guests from unmatched conversions and match them to customers.
Returns a mapping of guest_id -> HashedCustomer for all unique guests found in
unmatched conversions. Only processes each guest once.
Args:
session: AsyncSession for database queries
Returns:
Dictionary mapping guest_id to matched HashedCustomer (or None if no match)
"""
# Find all conversions that have no reservation/customer match yet
result = await session.execute(
select(Conversion)
.where(
(Conversion.reservation_id.is_(None))
& (Conversion.customer_id.is_(None))
& (Conversion.guest_id.isnot(None))
)
.options(selectinload(Conversion.guest))
)
unmatched_conversions = result.scalars().all()
if not unmatched_conversions:
_LOGGER.debug("Phase 3b: No unmatched conversions with guests")
return {}
# Extract unique guests (by guest_id)
unique_guests: dict[str, Conversion] = {}
for conversion in unmatched_conversions:
if conversion.guest_id not in unique_guests:
unique_guests[conversion.guest_id] = conversion
_LOGGER.info(
"Phase 3b: Found %d unique guests from %d unmatched conversions",
len(unique_guests),
len(unmatched_conversions),
)
# Match each unique guest to a hashed customer
guest_to_hashed_customer: dict[str, HashedCustomer] = {}
for guest_id, conversion in unique_guests.items():
conversion_guest = conversion.guest
if not conversion_guest:
continue
# Try to match by guest details
matched_hashed_customer = await self._match_by_guest_details_hashed(
conversion_guest.hashed_first_name,
conversion_guest.hashed_last_name,
conversion_guest.hashed_email,
session,
)
if matched_hashed_customer:
guest_to_hashed_customer[guest_id] = matched_hashed_customer
_LOGGER.debug(
"Phase 3b: Matched guest %s to hashed_customer %d",
guest_id,
matched_hashed_customer.id,
)
else:
guest_to_hashed_customer[guest_id] = None
_LOGGER.debug("Phase 3b: No match found for guest %s", guest_id)
return guest_to_hashed_customer
async def _link_matched_guests_to_reservations(
self,
guest_to_hashed_customer: dict[str, HashedCustomer],
session: AsyncSession,
stats: dict[str, int],
) -> None:
"""Phase 3c: Link conversions from matched guests to reservations based on dates.
For each guest matched to a hashed_customer, find all conversions from that guest
that don't have a reservation yet, and try to link them to reservations based on
date matching.
Args:
guest_to_hashed_customer: Mapping from guest_id to matched HashedCustomer
session: AsyncSession for database queries
stats: Shared stats dictionary to update
"""
for guest_id, matched_hashed_customer in guest_to_hashed_customer.items():
if not matched_hashed_customer or not matched_hashed_customer.customer_id:
continue
# Find all unmatched conversions from this guest
result = await session.execute(
select(Conversion)
.where(
(Conversion.guest_id == guest_id)
& (Conversion.reservation_id.is_(None))
& (Conversion.customer_id.is_(None))
)
.options(selectinload(Conversion.conversion_rooms))
)
conversions = result.scalars().all()
if not conversions:
continue
_LOGGER.debug(
"Phase 3c: Processing %d conversions for guest %s (customer_id=%d)",
len(conversions),
guest_id,
matched_hashed_customer.customer_id,
)
# Try to link each conversion to a reservation for this customer
for conversion in conversions:
matched_reservation, is_attributable = await self._check_if_attributable(
matched_hashed_customer.customer_id, conversion, session
)
if matched_reservation and is_attributable:
conversion.reservation_id = matched_reservation.id
conversion.customer_id = matched_hashed_customer.customer_id
conversion.hashed_customer_id = matched_hashed_customer.id
conversion.directly_attributable = False
conversion.guest_matched = True
conversion.updated_at = datetime.now()
stats["matched_to_reservation"] += 1
_LOGGER.info(
"Phase 3c: Linked conversion (pms_id=%s) to reservation %d via guest matching",
conversion.pms_reservation_id,
matched_reservation.id,
)
elif matched_hashed_customer:
# No attributable reservation found, but link to customer/hashed customer
conversion.customer_id = matched_hashed_customer.customer_id
conversion.hashed_customer_id = matched_hashed_customer.id
conversion.directly_attributable = False
conversion.guest_matched = True
conversion.updated_at = datetime.now()
stats["matched_to_customer"] += 1
async def _match_conversions_from_db_sequential(
self, pms_reservation_ids: list[str], stats: dict[str, int]
) -> None:
"""Phase 3a: Match conversions sequentially using database data only.
Performs ID-based matching for all conversions. Then extracts unique guests
and performs guest detail matching in phases 3b and 3c.
"""
semaphore = asyncio.Semaphore(1) # Process one at a time
async with asyncio.TaskGroup() as tg:
for pms_id in pms_reservation_ids:
tg.create_task(
self._match_conversion_from_db_safe(pms_id, semaphore, stats)
)
# After Phase 3a (ID-based matching), do Phase 3b and 3c (guest detail matching)
if self.session_maker:
session = await self.session_maker.create_session()
else:
session = self.session
try:
# Phase 3b: Extract and cache unique guests from unmatched conversions
guest_to_hashed_customer = await self._extract_unmatched_guests(session)
# Phase 3c: Link matched guests to reservations
if guest_to_hashed_customer:
await self._link_matched_guests_to_reservations(
guest_to_hashed_customer, session, stats
)
await session.commit()
except Exception as e:
await session.rollback()
_LOGGER.exception("Error in Phase 3b/3c guest matching: %s", e)
finally:
if self.session_maker:
await session.close()
async def _match_conversions_from_db_concurrent(
self, pms_reservation_ids: list[str], stats: dict[str, int]
) -> None:
"""Phase 3a: Match conversions concurrently using database data only.
Performs ID-based matching for all conversions concurrently. Then extracts unique guests
and performs guest detail matching sequentially in phases 3b and 3c.
Each concurrent task gets its own independent database session
from the SessionMaker.
"""
if not self.session_maker:
_LOGGER.error(
"Concurrent matching requested but SessionMaker not available. "
"Falling back to sequential matching."
)
await self._match_conversions_from_db_sequential(pms_reservation_ids, stats)
return
semaphore = asyncio.Semaphore(MAX_CONCURRENT_RESERVATIONS)
async with asyncio.TaskGroup() as tg:
for pms_id in pms_reservation_ids:
tg.create_task(
self._match_conversion_from_db_safe(pms_id, semaphore, stats)
)
# After Phase 3a (ID-based matching), do Phase 3b and 3c (guest detail matching)
# Use sequential processing for guest matching to avoid duplicating work
session = await self.session_maker.create_session()
try:
# Phase 3b: Extract and cache unique guests from unmatched conversions
guest_to_hashed_customer = await self._extract_unmatched_guests(session)
# Phase 3c: Link matched guests to reservations
if guest_to_hashed_customer:
await self._link_matched_guests_to_reservations(
guest_to_hashed_customer, session, stats
)
await session.commit()
except Exception as e:
await session.rollback()
_LOGGER.exception("Error in Phase 3b/3c guest matching: %s", e)
finally:
await session.close()
async def _match_conversion_from_db_safe(
self,
pms_reservation_id: str,
semaphore: asyncio.Semaphore,
stats: dict[str, int],
) -> None:
"""Phase 2: Safely match a conversion using only database data with transaction management.
In concurrent mode, creates its own session from SessionMaker.
In sequential mode, uses the shared session.
Args:
pms_reservation_id: PMS reservation ID to match
semaphore: Semaphore to limit concurrent operations
stats: Shared stats dictionary (thread-safe due to GIL)
"""
async with semaphore:
# In concurrent mode, create a new session for this task
if self.session_maker:
session = await self.session_maker.create_session()
else:
session = self.session
try:
# Phase 2: Match conversion using only database data
await self._match_conversion_using_db_data(
pms_reservation_id, session, stats
)
# Commit this task's transaction
await session.commit()
_LOGGER.debug(
"Successfully matched conversion for reservation %s",
pms_reservation_id,
)
except Exception as e:
# Rollback this task's transaction
await session.rollback()
_LOGGER.exception(
"Error matching conversion for reservation %s: %s",
pms_reservation_id,
e,
)
stats["errors"] += 1
finally:
# Close the session if it was created by SessionMaker
if self.session_maker:
await session.close()
async def _match_conversion_using_db_data(
self,
pms_reservation_id: str,
session: AsyncSession | None = None,
stats: dict[str, int] | None = None,
) -> None:
"""Phase 3a: Match a conversion using ID-based matching only.
This method reads both the conversion and conversion_guest from the database
and uses their stored hashed data to match to existing reservations via
advertising data (fbclid/gclid/md5_unique_id).
Guest detail matching is deferred to Phase 3b/3c to avoid reprocessing the same
guest multiple times.
Updates stats dictionary in-place if provided.
Args:
pms_reservation_id: PMS reservation ID to match
session: AsyncSession to use
stats: Shared stats dictionary to update (optional)
"""
if session is None:
session = self.session
# Get the conversion from the database with related data
result = await session.execute(
select(Conversion)
.where(Conversion.pms_reservation_id == pms_reservation_id)
.options(selectinload(Conversion.guest), selectinload(Conversion.conversion_rooms))
)
conversion = result.scalar_one_or_none()
if not conversion:
# This should be extremely rare since we filtered in process_conversion_xml
_LOGGER.debug(
"Conversion not found for pms_reservation_id=%s (may have been deleted)",
pms_reservation_id,
)
return
# Get conversion_guest if it exists (has the hashed data)
conversion_guest = conversion.guest
# Extract hashed data from conversion_guest (already hashed)
hashed_first_name = None
hashed_last_name = None
hashed_email = None
if conversion_guest:
hashed_first_name = conversion_guest.hashed_first_name
hashed_last_name = conversion_guest.hashed_last_name
hashed_email = conversion_guest.hashed_email
# Phase 3a: Only try ID-based matching (fbclid/gclid/md5_unique_id)
# Guest detail matching is deferred to Phase 3b/3c
matched_reservation = None
matched_customer = None
matched_hashed_customer = None
if conversion.advertising_campagne:
matched_reservation = await self._match_by_advertising(
conversion.advertising_campagne,
conversion.hotel_id,
hashed_first_name,
hashed_last_name,
hashed_email,
conversion.advertising_partner,
session,
)
if matched_reservation:
matched_customer = matched_reservation.customer
if matched_customer and matched_customer.hashed_version:
matched_hashed_customer = matched_customer.hashed_version
_LOGGER.info(
"Phase 3a: Matched conversion by advertising ID (pms_id=%s, reservation_id=%d)",
pms_reservation_id,
matched_reservation.id,
)
# Update the conversion with matched entities if found
if matched_reservation or matched_customer or matched_hashed_customer:
conversion.reservation_id = (
matched_reservation.id if matched_reservation else None
)
conversion.customer_id = (
matched_customer.id if matched_customer else None
)
conversion.hashed_customer_id = (
matched_hashed_customer.id if matched_hashed_customer else None
)
# ID-based matches are always directly attributable
conversion.directly_attributable = True
conversion.guest_matched = False
# Check if guest is regular
if matched_reservation:
await self._check_if_regular(conversion, matched_reservation, session)
# Update conversion_guest with hashed_customer reference if matched
if conversion_guest and matched_hashed_customer:
conversion_guest.hashed_customer_id = matched_hashed_customer.id
conversion.updated_at = datetime.now()
# Update stats if provided
if stats is not None:
if matched_reservation:
stats["matched_to_reservation"] += 1
elif matched_customer:
stats["matched_to_customer"] += 1
elif matched_hashed_customer:
stats["matched_to_hashed_customer"] += 1
else:
stats["unmatched"] += 1
async def _check_if_regular(
self,
conversion: Conversion,
matched_reservation: Reservation,
session: AsyncSession,
) -> None:
"""Check if guest is a regular customer and update is_regular flag.
A guest is regular if they have conversions with dates before their first completed reservation.
Otherwise, is_regular is set to False.
Args:
conversion: The Conversion record being evaluated
matched_reservation: The matched Reservation record
session: AsyncSession for database queries
"""
if not conversion.guest or not matched_reservation.customer_id:
return
# Find the earliest paying conversion for this customer
# (booked reservations from hotel with actual revenue)
earliest_paying_conversion_result = await session.execute(
select(Conversion)
.join(ConversionRoom, Conversion.id == ConversionRoom.conversion_id)
.where(
Conversion.hotel_id == conversion.hotel_id,
Conversion.guest_id == conversion.guest_id,
ConversionRoom.total_revenue.isnot(None),
ConversionRoom.total_revenue > Decimal(0),
)
.order_by(Conversion.reservation_date.asc())
.limit(1)
)
earliest_paying_conversion = earliest_paying_conversion_result.scalar_one_or_none()
if not earliest_paying_conversion:
conversion.guest.is_regular = False
return
# Find the earliest reservation (booking request we sent) for this customer
earliest_reservation_result = await session.execute(
select(Reservation)
.where(Reservation.customer_id == matched_reservation.customer_id)
.order_by(Reservation.start_date.asc())
.limit(1)
)
earliest_reservation = earliest_reservation_result.scalar_one_or_none()
if not earliest_reservation:
conversion.guest.is_regular = False
return
# Guest is regular if their earliest paying conversion predates all their reservations
# (meaning they were already a customer before we started tracking reservations)
is_regular = earliest_paying_conversion.reservation_date < earliest_reservation.start_date
conversion.guest.is_regular = is_regular
if is_regular:
_LOGGER.info(
"Marking guest as regular: earliest paying conversion date %s is before first reservation %s",
earliest_paying_conversion.reservation_date,
earliest_reservation.start_date,
)
async def _check_if_attributable(
self,
matched_customer_id: int,
conversion: Conversion,
session: AsyncSession,
exclude_reservation_id: int | None = None,
) -> tuple[Reservation | None, bool]:
"""Search for and check if a customer has an attributable reservation for this conversion.
Finds reservations linked to the customer and checks if any have dates matching this conversion.
A conversion is attributable ONLY if the conversion_room dates match a reservation's dates closely.
Args:
matched_customer_id: The customer ID to search reservations for
conversion: The Conversion record being evaluated
session: AsyncSession for database queries
exclude_reservation_id: Reservation ID to exclude from search (if already matched)
Returns:
Tuple of (matched_reservation, is_attributable) where:
- matched_reservation: The Reservation that matches (if any)
- is_attributable: True if the reservation's dates match this conversion
"""
# Check if conversion_room dates exist (criterion for attributability)
if not conversion.conversion_rooms:
return None, False
# Find reservations for this customer
query = select(Reservation).where(
Reservation.customer_id == matched_customer_id
)
if exclude_reservation_id:
query = query.where(Reservation.id != exclude_reservation_id)
db_result = await session.execute(query)
reservations = db_result.scalars().all()
if not reservations:
return None, False
# Check each reservation for date match
for reservation in reservations:
for room in conversion.conversion_rooms:
if (
room.arrival_date
and room.departure_date
and reservation.start_date
and reservation.end_date
):
# Check if dates match or mostly match (within 7 day tolerance)
arrival_match = abs(
(room.arrival_date - reservation.start_date).days
) <= 7
departure_match = abs(
(room.departure_date - reservation.end_date).days
) <= 7
if arrival_match and departure_match:
_LOGGER.info(
"Found attributable reservation for customer %d: "
"room dates %s-%s match reservation dates %s-%s",
matched_customer_id,
room.arrival_date,
room.departure_date,
reservation.start_date,
reservation.end_date,
)
return reservation, True
return None, False