Files
alpinebits_python/src/alpine_bits_python/conversion_service.py

1619 lines
61 KiB
Python

"""Service for handling conversion data from hotel PMS XML files."""
import asyncio
import xml.etree.ElementTree as ET
from dataclasses import dataclass, field
from datetime import UTC, date, datetime
from decimal import Decimal
from typing import Any
from sqlalchemy import or_, select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from .db import (
Conversion,
ConversionGuest,
ConversionRoom,
Customer,
Reservation,
SessionMaker,
)
from .logging_config import get_logger
from .schemas import ConversionData, ConversionGuestData
_LOGGER = get_logger(__name__)
# Limit concurrent reservation processing to avoid overwhelming the database
MAX_CONCURRENT_RESERVATIONS = 10
@dataclass(slots=True)
class ParsedRoomReservation:
"""Typed representation of a single <roomReservation> entry."""
pms_hotel_reservation_id: str
room_number: str | None
arrival_date: date | None
departure_date: date | None
room_status: str | None
room_type: str | None
num_adults: int | None
rate_plan_code: str | None
connected_room_type: str | None
daily_sales: list[dict[str, str]] = field(default_factory=list)
total_revenue: Decimal | None = None
daily_sales_count: int = 0
@dataclass(slots=True)
class ParsedReservationData:
"""Typed representation of reservation metadata and rooms."""
hotel_id: str | None
pms_reservation_id: int
guest_id: int | None
reservation_number: str | None
reservation_date: date | None
creation_time: datetime | None
reservation_type: str | None
booking_channel: str | None
advertising_medium: str | None
advertising_partner: str | None
advertising_campagne: str | None
room_reservations: list[ParsedRoomReservation] = field(default_factory=list)
class ConversionService:
"""Service for processing and storing conversion/daily sales data.
Supports two modes of operation:
1. Sequential mode: Single AsyncSession passed in, uses sequential processing
2. Concurrent mode: SessionMaker passed in, creates independent sessions per task
"""
def __init__(
self,
session: AsyncSession | SessionMaker | None = None,
hotel_id: str | None = None,
):
"""Initialize the ConversionService.
Args:
session: Can be either:
- AsyncSession: Single session for sequential processing
- SessionMaker: Factory for creating sessions in concurrent mode
- None: Not recommended, but allowed for subclassing
hotel_id: Hotel ID for this conversion service context (from authenticated user)
"""
self.session = None
self.session_maker = None
self.supports_concurrent = False
self.hotel_id = hotel_id
# Cache for reservation and customer data within a single XML processing run
# Maps hotel_code -> list of (reservation, customer) tuples
# This significantly speeds up matching when processing large XML files
# Uses hashed data for matching to preserve privacy
self._reservation_cache: dict[
str | None, list[tuple[Reservation, Customer | None]]
] = {}
self._cache_initialized = False
if isinstance(session, SessionMaker):
self.session_maker = session
self.supports_concurrent = True
_LOGGER.debug(
"ConversionService initialized in concurrent mode with SessionMaker"
)
elif isinstance(session, AsyncSession):
self.session = session
self.supports_concurrent = False
_LOGGER.debug(
"ConversionService initialized in sequential mode with single session"
)
elif session is not None:
raise TypeError(
f"session must be AsyncSession or SessionMaker, got {type(session)}"
)
@staticmethod
def _parse_required_int(value: str | None, field_name: str) -> int:
"""Parse an integer attribute that must be present."""
if value in (None, ""):
raise ValueError(f"{field_name} is required")
try:
return int(value)
except (TypeError, ValueError) as exc:
raise ValueError(
f"{field_name} must be an integer (value={value})"
) from exc
@staticmethod
def _parse_optional_int(value: str | None, field_name: str) -> int | None:
"""Parse an optional integer attribute, logging on failure."""
if value in (None, ""):
return None
try:
return int(value)
except (TypeError, ValueError):
_LOGGER.warning("Invalid %s value: %s", field_name, value)
return None
@staticmethod
def _parse_date(value: str | None, field_name: str) -> date | None:
"""Parse a YYYY-MM-DD formatted date string."""
if not value:
return None
try:
return datetime.strptime(value, "%Y-%m-%d").date()
except ValueError:
_LOGGER.warning("Invalid %s format: %s", field_name, value)
return None
@staticmethod
def _parse_datetime(value: str | None, field_name: str) -> datetime | None:
"""Parse an ISO timestamp string."""
if not value:
return None
try:
normalized = value.replace("Z", "+00:00")
return datetime.fromisoformat(normalized)
except ValueError:
_LOGGER.warning("Invalid %s format: %s", field_name, value)
return None
def _parse_daily_sales(
self, daily_sales_elem: ET.Element | None
) -> tuple[list[dict[str, str]], Decimal | None, int]:
"""Extract the list of sale dictionaries and aggregate revenue information."""
if daily_sales_elem is None:
return [], None, 0
daily_sales_list: list[dict[str, str]] = []
total_revenue = Decimal(0)
sale_count = 0
for daily_sale in daily_sales_elem.findall("dailySale"):
sale_count += 1
sale_data: dict[str, str] = {}
sale_date_str = daily_sale.get("date")
if sale_date_str:
sale_data["date"] = sale_date_str
revenue_total_str = daily_sale.get("revenueTotal")
if revenue_total_str:
sale_data["revenueTotal"] = revenue_total_str
try:
total_revenue += Decimal(revenue_total_str)
except (ValueError, TypeError):
_LOGGER.warning("Invalid revenueTotal value: %s", revenue_total_str)
# Copy the remaining optional revenue buckets if present
for field_name in (
"revenueLogis",
"revenueBoard",
"revenueFB",
"revenueSpa",
"revenueOther",
):
value = daily_sale.get(field_name)
if value:
sale_data[field_name] = value
if sale_data:
daily_sales_list.append(sale_data)
total_revenue_value = total_revenue if total_revenue > 0 else None
return daily_sales_list, total_revenue_value, sale_count
def _parse_room_reservation(
self, room_elem: ET.Element, pms_reservation_id: int, room_index: int
) -> ParsedRoomReservation:
"""Convert a <roomReservation> element into ParsedRoomReservation."""
arrival_date = self._parse_date(room_elem.get("arrival"), "arrival date")
departure_date = self._parse_date(room_elem.get("departure"), "departure date")
num_adults = self._parse_optional_int(room_elem.get("adults"), "adults")
room_number = room_elem.get("roomNumber")
if room_number is None:
_LOGGER.debug(
"Room reservation %s #%d has no roomNumber",
pms_reservation_id,
room_index,
)
daily_sales, total_revenue, sale_count = self._parse_daily_sales(
room_elem.find("dailySales")
)
return ParsedRoomReservation(
pms_hotel_reservation_id=f"{pms_reservation_id}_{room_number}",
room_number=room_number,
arrival_date=arrival_date,
departure_date=departure_date,
room_status=room_elem.get("status"),
room_type=room_elem.get("roomType"),
num_adults=num_adults,
rate_plan_code=room_elem.get("ratePlanCode"),
connected_room_type=room_elem.get("connectedRoomType"),
daily_sales=daily_sales,
total_revenue=total_revenue,
daily_sales_count=sale_count,
)
def _parse_reservation_element(
self, reservation_elem: ET.Element
) -> ParsedReservationData | None:
"""Convert a <reservation> element into a structured representation."""
try:
pms_reservation_id = self._parse_required_int(
reservation_elem.get("id"), "reservation id"
)
except ValueError as exc:
_LOGGER.error(
"Invalid reservation metadata in reservation element: %s", exc
)
return None
room_reservations_elem = reservation_elem.find("roomReservations")
if room_reservations_elem is None:
_LOGGER.debug(
"No roomReservations found for reservation %s", pms_reservation_id
)
return None
room_reservations = [
self._parse_room_reservation(room_elem, pms_reservation_id, idx)
for idx, room_elem in enumerate(
room_reservations_elem.findall("roomReservation")
)
]
if not room_reservations:
_LOGGER.debug(
"Reservation %s has no roomReservation entries", pms_reservation_id
)
return None
guest_elem = reservation_elem.find("guest")
guest_id = None
if guest_elem is not None:
guest_id = self._parse_optional_int(guest_elem.get("id"), "guest id")
return ParsedReservationData(
hotel_id=reservation_elem.get("hotelID"),
pms_reservation_id=pms_reservation_id,
guest_id=guest_id,
reservation_number=reservation_elem.get("number"),
reservation_date=self._parse_date(
reservation_elem.get("date"), "reservation date"
),
creation_time=self._parse_datetime(
reservation_elem.get("creationTime"), "creation time"
),
reservation_type=reservation_elem.get("type"),
booking_channel=reservation_elem.get("bookingChannel"),
advertising_medium=reservation_elem.get("advertisingMedium"),
advertising_partner=reservation_elem.get("advertisingPartner"),
advertising_campagne=reservation_elem.get("advertisingCampagne"),
room_reservations=room_reservations,
)
async def _extract_unique_guests_from_xml(
self, reservations: list
) -> dict[tuple[str, int], ConversionGuestData]:
"""Extract and deduplicate all guest data from XML reservations.
Phase 0: Single pass through XML to collect all unique guests.
Uses (hotel_id, guest_id) as the key for deduplication.
Validates each guest using Pydantic before storing.
Args:
reservations: List of XML reservation elements
Returns:
Dictionary mapping (hotel_id, guest_id) to validated ConversionGuestData
"""
guest_data_by_key = {}
now = datetime.now(UTC)
for reservation_elem in reservations:
hotel_id = reservation_elem.get("hotelID")
guest_elem = reservation_elem.find("guest")
if guest_elem is None:
_LOGGER.debug(
"No guest element found, skipping reservation %s (will be created with guest_id=None in Phase 2)",
reservation_elem.get("id"),
)
continue
guest_id = guest_elem.get("id")
guest_first_name = guest_elem.get("firstName")
guest_last_name = guest_elem.get("lastName")
guest_email = guest_elem.get("email")
guest_country_code = guest_elem.get("countryCode")
guest_birth_date_str = guest_elem.get("dateOfBirth")
guest_birth_date = None
if guest_birth_date_str:
try:
guest_birth_date = datetime.strptime(
guest_birth_date_str, "%Y-%m-%d"
).date()
except ValueError:
_LOGGER.warning(
"Invalid birth date format: %s", guest_birth_date_str
)
# Validate guest data with Pydantic during extraction
try:
validated_guest = ConversionGuestData(
hotel_id=hotel_id,
guest_id=guest_id, # Will be validated and converted to int
guest_first_name=guest_first_name,
guest_last_name=guest_last_name,
guest_email=guest_email,
guest_country_code=guest_country_code,
guest_birth_date=guest_birth_date,
first_seen=now,
last_seen=now,
)
# Use validated guest_id (now an int) for the key
key = (hotel_id, validated_guest.guest_id)
# Store validated guest data (will keep the last occurrence from XML)
guest_data_by_key[key] = validated_guest
except ValueError:
_LOGGER.exception(
"Failed to validate guest data for reservation %s",
reservation_elem.get("id"),
)
continue
return guest_data_by_key
async def _bulk_upsert_guests(
self,
session: AsyncSession,
guest_data_by_key: dict[tuple[str, int], ConversionGuestData],
) -> None:
"""Bulk upsert all unique guests to database using PostgreSQL ON CONFLICT.
Phase 1: Atomic upsert of all guests extracted in Phase 0.
Uses ON CONFLICT DO UPDATE to handle duplicates safely without race conditions.
Processes in batches to avoid overwhelming the database with large XML files.
Args:
session: AsyncSession to use
guest_data_by_key: Dictionary mapping (hotel_id, guest_id) to
validated ConversionGuestData
"""
if not guest_data_by_key:
return
# Process in batches to avoid overwhelming the database
batch_size = 1000
items = list(guest_data_by_key.items())
for batch_start in range(0, len(items), batch_size):
batch_end = min(batch_start + batch_size, len(items))
batch_items = items[batch_start:batch_end]
# Prepare list of values for this batch (already validated)
values_list = []
for (hotel_id, guest_id), validated_guest in batch_items:
# Convert validated Pydantic model to dict for insertion
# (all validations and hash calculations are already done)
values_list.append(validated_guest.model_dump())
# Use PostgreSQL ON CONFLICT DO UPDATE for atomic upsert
stmt = pg_insert(ConversionGuest).values(values_list)
stmt = stmt.on_conflict_do_update(
index_elements=["hotel_id", "guest_id"],
set_={
"guest_first_name": stmt.excluded.guest_first_name,
"guest_last_name": stmt.excluded.guest_last_name,
"guest_email": stmt.excluded.guest_email,
"guest_country_code": stmt.excluded.guest_country_code,
"guest_birth_date": stmt.excluded.guest_birth_date,
"hashed_first_name": stmt.excluded.hashed_first_name,
"hashed_last_name": stmt.excluded.hashed_last_name,
"hashed_email": stmt.excluded.hashed_email,
"hashed_country_code": stmt.excluded.hashed_country_code,
"hashed_birth_date": stmt.excluded.hashed_birth_date,
"last_seen": stmt.excluded.last_seen,
},
)
await session.execute(stmt)
_LOGGER.debug(
"Phase 1: Upserted batch %d-%d of %d guests",
batch_start + 1,
batch_end,
len(items),
)
async def process_conversion_xml(
self, xml_content: str, *, run_full_guest_matching: bool = False
) -> dict[str, Any]:
"""Parse conversion XML and save daily sales data to database.
Args:
xml_content: XML string containing reservation and daily sales data
run_full_guest_matching: If True, run guest-based matching for all unmatched
conversions of this hotel after processing the XML. When False (default),
guest matching only runs for the conversions touched in this XML.
Returns:
Dictionary with processing statistics
"""
try:
root = ET.fromstring(xml_content)
except ET.ParseError as e:
_LOGGER.error("Failed to parse conversion XML: %s", e)
raise ValueError(f"Invalid XML content: {e}") from e
# Initialize cache for this XML processing run
await self._load_reservation_cache()
stats = {
"total_reservations": 0,
"deleted_reservations": 0,
"total_daily_sales": 0,
"matched_to_reservation": 0,
"matched_to_customer": 0,
"unmatched": 0,
"errors": 0,
}
# Get a session for deleted reservations processing
if self.session_maker:
session = await self.session_maker.create_session()
else:
session = self.session
# Process deleted reservations
for deleted_res in root.findall("Deletedreservation"):
stats["deleted_reservations"] += 1
pms_reservation_id_str = deleted_res.get("ID")
try:
pms_reservation_id = (
int(pms_reservation_id_str) if pms_reservation_id_str else None
)
if pms_reservation_id is None:
_LOGGER.warning(
"Deleted reservation missing ID attribute, skipping"
)
continue
await self._handle_deleted_reservation(pms_reservation_id, session)
await session.commit()
except Exception as e:
await session.rollback()
_LOGGER.exception(
"Error deleting reservation %s: %s",
pms_reservation_id,
e,
)
stats["errors"] += 1
# Close session if created by SessionMaker
if self.session_maker:
await session.close()
# Process active reservations in four phases:
# Phase 0: Extract and deduplicate all guest data from XML
# Phase 1: Bulk upsert all unique guests to DB (no race conditions)
# Phase 2: Create/update all conversion records (ConversionGuests already exist)
# Phase 3: Match them to existing reservations/customers
reservations = root.findall("reservation")
stats["total_reservations"] = len(reservations)
if not reservations:
return stats
_LOGGER.info("Processing %d reservations in xml", len(reservations))
# Phase 0: Extract and deduplicate all guest data from XML
_LOGGER.debug("Phase 0: Extracting and deduplicating guest data from XML")
guest_data_by_key = await self._extract_unique_guests_from_xml(reservations)
_LOGGER.info(
"Phase 0: Extracted %d unique guests from %d reservations",
len(guest_data_by_key),
len(reservations),
)
# Phase 1: Bulk upsert all unique guests to database
if guest_data_by_key:
_LOGGER.debug(
"Phase 1: Bulk upserting %d unique guests to database",
len(guest_data_by_key),
)
if self.session_maker:
session = await self.session_maker.create_session()
else:
session = self.session
try:
await self._bulk_upsert_guests(session, guest_data_by_key)
await session.commit()
_LOGGER.info(
"Phase 1: Successfully upserted %d guests", len(guest_data_by_key)
)
except Exception as e:
await session.rollback()
_LOGGER.exception("Phase 1: Error during bulk guest upsert: %s", e)
stats["errors"] += len(guest_data_by_key)
return stats
finally:
if self.session_maker:
await session.close()
# Phase 2: Create/update all conversions (no matching, ConversionGuests already exist)
# Returns list of successfully created pms_reservation_ids
_LOGGER.debug("Phase 2: Creating/updating conversions")
if self.supports_concurrent:
pms_reservation_ids = await self._process_reservations_concurrent(
reservations, stats
)
else:
pms_reservation_ids = await self._process_reservations_sequential(
reservations, stats
)
_LOGGER.debug(
"Phase 3: Found %d successfully created conversions out of %d total reservations",
len(pms_reservation_ids),
len(reservations),
)
# Phase 3: Match all conversions using database data only via advertising IDs
if pms_reservation_ids:
_LOGGER.debug("Phase 3: Matching conversions to reservations/customers")
if self.supports_concurrent:
await self._match_conversions_from_db_concurrent(
pms_reservation_ids, stats
)
else:
await self._match_conversions_from_db_sequential(
pms_reservation_ids, stats
)
# Guest matching (optional full scan or limited to current XML)
await self._run_guest_matching(
pms_reservation_ids, stats, run_full_guest_matching
)
return stats
async def _load_reservation_cache(self) -> None:
"""Load all reservations and hashed customers into cache for fast matching.
This method is called once at the start of processing a large XML file.
It loads all reservations with their associated hashed customers into an in-memory
cache organized by hotel_code. This avoids repeated database queries during
matching operations and uses hashed data for privacy-preserving matching.
The cache structure:
- Key: hotel_code (str or None)
- Value: List of (reservation, hashed_customer) tuples
This is especially beneficial for large XML files with many reservations
where matching criteria is the same across multiple reservations.
"""
if self._cache_initialized:
_LOGGER.debug("Reservation cache already initialized, skipping reload")
return
# Get a session for loading the cache
if self.session_maker:
session = await self.session_maker.create_session()
close_session = True
else:
session = self.session
close_session = False
if not session:
_LOGGER.warning("No session available for cache loading")
return
try:
# Load all reservations with their hashed customers in one query
from sqlalchemy.orm import selectinload
query = select(Reservation).options(
selectinload(Reservation.customer),
)
query = query.filter(Reservation.hotel_id == self.hotel_id) if self.hotel_id else query
result = await session.execute(query)
reservations = result.scalars().all()
_LOGGER.info("Loaded %d reservations into cache", len(reservations))
# Organize by hotel_code for efficient lookup
for reservation in reservations:
hotel_code = reservation.hotel_id
if hotel_code not in self._reservation_cache:
self._reservation_cache[hotel_code] = []
# Cache the hashed customer - prefer direct relationship, fall back to customer relationship
customer = None
if reservation.customer:
customer = reservation.customer
self._reservation_cache[hotel_code].append(
(reservation, customer)
)
self._cache_initialized = True
_LOGGER.info(
"Reservation cache initialized with %d hotel codes",
len(self._reservation_cache),
)
except Exception as e:
_LOGGER.error("Failed to load reservation cache: %s", e)
# Cache remains empty, fall back to direct queries
self._cache_initialized = True
finally:
# Close session if we created it
if close_session:
await session.close()
async def _process_reservations_sequential(
self, reservations: list, stats: dict[str, int]
) -> list[int]:
"""Process reservations one at a time (original behavior).
Returns:
List of pms_reservation_ids that were successfully created/updated
"""
semaphore = asyncio.Semaphore(1) # Process one at a time
results = []
tasks = []
try:
async with asyncio.TaskGroup() as tg:
for reservation in reservations:
task = tg.create_task(
self._process_reservation_safe(reservation, semaphore, stats)
)
tasks.append(task)
except ExceptionGroup as eg:
# Fail fast: cancel all remaining tasks and re-raise the first exception
for task in tasks:
if not task.done():
task.cancel()
# Re-raise the first exception from the group
if eg.exceptions:
raise eg.exceptions[0] from eg
raise
# Collect results from all tasks
for task in tasks:
pms_id = task.result()
if pms_id is not None:
results.append(pms_id)
return results
async def _process_reservations_concurrent(
self, reservations: list, stats: dict[str, int]
) -> list[int]:
"""Process reservations concurrently with semaphore limiting.
Each concurrent task gets its own independent database session
from the SessionMaker.
Returns:
List of pms_reservation_ids that were successfully created/updated
"""
if not self.session_maker:
_LOGGER.error(
"Concurrent processing requested but SessionMaker not available. "
"Falling back to sequential processing."
)
return await self._process_reservations_sequential(reservations, stats)
semaphore = asyncio.Semaphore(MAX_CONCURRENT_RESERVATIONS)
results = []
tasks = []
try:
async with asyncio.TaskGroup() as tg:
for reservation in reservations:
task = tg.create_task(
self._process_reservation_safe(reservation, semaphore, stats)
)
tasks.append(task)
except ExceptionGroup as eg:
# Fail fast: cancel all remaining tasks and re-raise the first exception
for task in tasks:
if not task.done():
task.cancel()
# Re-raise the first exception from the group
if eg.exceptions:
raise eg.exceptions[0] from eg
raise
# Collect results from all tasks
for task in tasks:
pms_id = task.result()
if pms_id is not None:
results.append(pms_id)
return results
async def _process_reservation_safe(
self,
reservation_elem: Any,
semaphore: asyncio.Semaphore,
stats: dict[str, int],
) -> str | None:
"""Safely process a single reservation with semaphore and transaction management.
Phase 1: Creation/update only (no matching).
In concurrent mode, creates its own session from SessionMaker.
In sequential mode, uses the shared session.
Args:
reservation_elem: XML element for the reservation
semaphore: Semaphore to limit concurrent operations
stats: Shared stats dictionary (thread-safe due to GIL)
Returns:
pms_reservation_id if successfully created/updated, None if error occurred
"""
pms_reservation_id = int(reservation_elem.get("id"))
async with semaphore:
# In concurrent mode, create a new session for this task
if self.session_maker:
session = await self.session_maker.create_session()
else:
session = self.session
try:
# Phase 1: Create/update conversion record (no matching)
reservation_stats = await self._create_or_update_conversion(
reservation_elem, session
)
stats["total_daily_sales"] += reservation_stats["daily_sales_count"]
# Commit this task's transaction
await session.commit()
_LOGGER.debug(
"Successfully created/updated conversion for reservation %s",
pms_reservation_id,
)
return pms_reservation_id
except Exception as e:
# Rollback this task's transaction
await session.rollback()
_LOGGER.exception(
"Error processing reservation %s: %s",
pms_reservation_id,
e,
)
stats["errors"] += 1
return None
finally:
# Close the session if it was created by SessionMaker
if self.session_maker:
await session.close()
async def _handle_deleted_reservation(
self, pms_reservation_id: int, session: AsyncSession
):
"""Handle deleted reservation by marking conversions as deleted or removing them.
Args:
pms_reservation_id: PMS reservation ID to delete
session: AsyncSession to use for the operation
"""
if not self.hotel_id:
_LOGGER.error(
"Cannot delete reservation: hotel_id not set in ConversionService"
)
return
_LOGGER.info(
"Processing deleted reservation: Hotel %s, PMS ID %s",
self.hotel_id,
pms_reservation_id,
)
# Delete conversion records for this hotel + pms_reservation_id
result = await session.execute(
select(Conversion).where(
Conversion.hotel_id == self.hotel_id,
Conversion.pms_reservation_id == pms_reservation_id,
)
)
conversions = result.scalars().all()
for conversion in conversions:
await session.delete(conversion)
if conversions:
_LOGGER.info(
"Deleted %d conversion records for hotel %s, PMS reservation %s",
len(conversions),
self.hotel_id,
pms_reservation_id,
)
async def _create_or_update_conversion(
self, reservation_elem: ET.Element, session: AsyncSession | None = None
) -> dict[str, int]:
"""Create or update a conversion record from XML (Phase 1: no matching).
Args:
reservation_elem: XML element to process
session: AsyncSession to use. If None, uses self.session.
In concurrent mode, each task passes its own session.
Returns statistics about daily sales processed.
"""
if session is None:
session = self.session
stats = {
"daily_sales_count": 0,
}
parsed_reservation = self._parse_reservation_element(reservation_elem)
if not parsed_reservation:
return stats
hotel_id = parsed_reservation.hotel_id
pms_reservation_id = parsed_reservation.pms_reservation_id
# ConversionGuests have already been bulk-upserted in Phase 1,
# so we can safely create/update conversions now
# Check if conversion already exists (upsert logic)
existing_result = await session.execute(
select(Conversion).where(
Conversion.hotel_id == hotel_id,
Conversion.pms_reservation_id == pms_reservation_id,
)
)
existing_conversion = existing_result.scalar_one_or_none()
if existing_conversion:
# Update existing conversion - only update reservation metadata and advertising data
# Guest info is stored in ConversionGuest table, not here
# Don't clear reservation/customer links (matching logic will update if needed)
existing_conversion.reservation_number = (
parsed_reservation.reservation_number
)
existing_conversion.reservation_date = parsed_reservation.reservation_date
existing_conversion.creation_time = parsed_reservation.creation_time
existing_conversion.reservation_type = parsed_reservation.reservation_type
existing_conversion.booking_channel = parsed_reservation.booking_channel
existing_conversion.advertising_medium = (
parsed_reservation.advertising_medium
)
existing_conversion.advertising_partner = (
parsed_reservation.advertising_partner
)
existing_conversion.advertising_campagne = (
parsed_reservation.advertising_campagne
)
existing_conversion.updated_at = datetime.now()
conversion = existing_conversion
_LOGGER.info(
"Updated conversion %s (pms_id=%s)",
conversion.id,
pms_reservation_id,
)
else:
# Create new conversion entry (without matching - will be done later)
# Note: Guest information (first_name, last_name, email, etc) is stored in ConversionGuest table
conversion_data = ConversionData(
# Links to existing entities (nullable, will be filled in after matching)
# Reservation metadata
hotel_id=hotel_id,
guest_id=parsed_reservation.guest_id, # Links to ConversionGuest
pms_reservation_id=pms_reservation_id,
reservation_number=parsed_reservation.reservation_number,
reservation_date=parsed_reservation.reservation_date,
creation_time=parsed_reservation.creation_time,
reservation_type=parsed_reservation.reservation_type,
booking_channel=parsed_reservation.booking_channel,
# Advertising data
advertising_medium=parsed_reservation.advertising_medium,
advertising_partner=parsed_reservation.advertising_partner,
advertising_campagne=parsed_reservation.advertising_campagne,
# Metadata
)
conversion = Conversion(**conversion_data.model_dump())
session.add(conversion)
_LOGGER.debug(
"Created conversion (pms_id=%s)",
pms_reservation_id,
)
# Flush to ensure conversion has an ID before creating room reservations
await session.flush()
# Fetch ALL existing rooms for this conversion (not just the ones in current XML)
existing_rooms_result = await session.execute(
select(ConversionRoom).where(ConversionRoom.conversion_id == conversion.id)
)
existing_rooms = {
room.pms_hotel_reservation_id: room
for room in existing_rooms_result.scalars().all()
}
# Track which room IDs are present in the current XML
current_pms_hotel_reservation_ids = set()
# Process room reservations
for room_reservation in parsed_reservation.room_reservations:
current_pms_hotel_reservation_ids.add(
room_reservation.pms_hotel_reservation_id
)
stats["daily_sales_count"] += room_reservation.daily_sales_count
# Check if room reservation already exists using batch-loaded data
existing_room_reservation = existing_rooms.get(
room_reservation.pms_hotel_reservation_id
)
if existing_room_reservation:
# Update existing room reservation with all fields
existing_room_reservation.arrival_date = room_reservation.arrival_date
existing_room_reservation.departure_date = (
room_reservation.departure_date
)
existing_room_reservation.room_status = room_reservation.room_status
existing_room_reservation.room_type = room_reservation.room_type
existing_room_reservation.num_adults = room_reservation.num_adults
existing_room_reservation.rate_plan_code = (
room_reservation.rate_plan_code
)
existing_room_reservation.connected_room_type = (
room_reservation.connected_room_type
)
existing_room_reservation.daily_sales = (
room_reservation.daily_sales
if room_reservation.daily_sales
else None
)
existing_room_reservation.total_revenue = room_reservation.total_revenue
existing_room_reservation.updated_at = datetime.now()
_LOGGER.debug(
"Updated room reservation %s (pms_id=%s, room=%s)",
existing_room_reservation.id,
pms_reservation_id,
room_reservation.room_number,
)
else:
# Create new room reservation
room_reservation_record = ConversionRoom(
conversion_id=conversion.id,
pms_hotel_reservation_id=room_reservation.pms_hotel_reservation_id,
arrival_date=room_reservation.arrival_date,
departure_date=room_reservation.departure_date,
room_status=room_reservation.room_status,
room_type=room_reservation.room_type,
room_number=room_reservation.room_number,
num_adults=room_reservation.num_adults,
rate_plan_code=room_reservation.rate_plan_code,
connected_room_type=room_reservation.connected_room_type,
daily_sales=(
room_reservation.daily_sales
if room_reservation.daily_sales
else None
),
total_revenue=room_reservation.total_revenue,
created_at=datetime.now(),
updated_at=datetime.now(),
)
session.add(room_reservation_record)
_LOGGER.debug(
"Created room reservation (pms_id=%s, room=%s, adults=%s)",
pms_reservation_id,
room_reservation.room_number,
room_reservation.num_adults,
)
# Delete room entries that are no longer present in the current XML
# This handles cases where a reservation is updated and room numbers change
rooms_to_delete = [
room
for pms_id, room in existing_rooms.items()
if pms_id not in current_pms_hotel_reservation_ids
]
if rooms_to_delete:
for room in rooms_to_delete:
await session.delete(room)
_LOGGER.debug(
"Deleted room reservation %s (pms_id=%s, room=%s) - no longer in current XML",
room.id,
room.pms_hotel_reservation_id,
room.room_number,
)
return stats
async def _match_by_advertising(
self,
advertising_campagne: str,
hotel_id: str | None,
hashed_first_name: str | None,
hashed_last_name: str | None,
hashed_email: str | None,
session: AsyncSession | None = None,
) -> Reservation | None:
"""Match reservation by advertising tracking data (fbclid/gclid/md5_unique_id).
Args:
advertising_campagne: Tracking ID from PMS (typically md5_unique_id)
hotel_id: Hotel ID for filtering
hashed_first_name: Guest first name (hashed) for disambiguation
hashed_last_name: Guest last name (hashed) for disambiguation
hashed_email: Guest email (hashed) for disambiguation
session: AsyncSession to use. If None, uses self.session.
Returns:
Matched Reservation or None
"""
if session is None:
session = self.session
if not advertising_campagne:
return None
base_query = select(Reservation).options(selectinload(Reservation.customer))
if hotel_id:
base_query = base_query.where(Reservation.hotel_id == hotel_id)
# Normal flow: md5 hash matches exactly
is_md5_lookup = len(advertising_campagne or "") == 32
if is_md5_lookup:
md5_query = base_query.where(
Reservation.md5_unique_id == advertising_campagne
)
md5_result = await session.execute(md5_query)
md5_matches = md5_result.scalars().all()
if md5_matches:
return self._select_reservation_by_guest_hashes(
md5_matches,
hashed_first_name,
hashed_last_name,
hashed_email,
)
# Fallback: advertising ids (fbclid/gclid) are truncated, so only match prefix
like_pattern = f"{advertising_campagne}%"
advertising_query = base_query.where(
or_(
Reservation.fbclid.like(like_pattern),
Reservation.gclid.like(like_pattern),
)
)
advertising_result = await session.execute(advertising_query)
reservations = advertising_result.scalars().all()
if not reservations:
return None
if len(reservations) > 1:
_LOGGER.info(
(
"Ambiguous advertising match for %s (hotel=%s, candidates=%d). "
"Using hashed guest data to deduplicate."
),
advertising_campagne,
hotel_id,
len(reservations),
)
return self._select_reservation_by_guest_hashes(
reservations,
hashed_first_name,
hashed_last_name,
hashed_email,
)
def _select_reservation_by_guest_hashes(
self,
reservations: list[Reservation],
hashed_first_name: str | None,
hashed_last_name: str | None,
hashed_email: str | None,
) -> Reservation | None:
"""Select the best matching reservation using hashed guest info."""
if not reservations:
return None
def _matches_email(reservation: Reservation) -> bool:
return (
hashed_email is not None
and reservation.customer is not None
and reservation.customer.hashed_email == hashed_email
)
def _matches_full_name(reservation: Reservation) -> bool:
return (
hashed_first_name is not None
and hashed_last_name is not None
and reservation.customer is not None
and reservation.customer.hashed_given_name == hashed_first_name
and reservation.customer.hashed_surname == hashed_last_name
)
email_matches = [res for res in reservations if _matches_email(res)]
if email_matches:
if len(email_matches) > 1:
_LOGGER.warning(
"Multiple reservations matched hashed email; using first candidate"
)
return email_matches[0]
name_matches = [res for res in reservations if _matches_full_name(res)]
if name_matches:
if len(name_matches) > 1:
_LOGGER.warning(
"Multiple reservations matched hashed name; using first candidate"
)
return name_matches[0]
if len(reservations) > 1:
_LOGGER.warning(
"Unable to disambiguate %d reservations without hashed guest data; using first match",
len(reservations),
)
return reservations[0]
async def _match_conversions_from_db_sequential(
self, pms_reservation_ids: list[int], stats: dict[str, int]
) -> None:
"""Match conversions sequentially using database data only."""
semaphore = asyncio.Semaphore(1) # Process one at a time
async with asyncio.TaskGroup() as tg:
for pms_id in pms_reservation_ids:
tg.create_task(
self._match_conversion_from_db_safe(pms_id, semaphore, stats)
)
async def _match_conversions_from_db_concurrent(
self, pms_reservation_ids: list[int], stats: dict[str, int]
) -> None:
"""Match conversions concurrently using database data only."""
if not self.session_maker:
_LOGGER.error(
"Concurrent matching requested but SessionMaker not available. "
"Falling back to sequential matching."
)
await self._match_conversions_from_db_sequential(pms_reservation_ids, stats)
return
semaphore = asyncio.Semaphore(MAX_CONCURRENT_RESERVATIONS)
async with asyncio.TaskGroup() as tg:
for pms_id in pms_reservation_ids:
tg.create_task(
self._match_conversion_from_db_safe(pms_id, semaphore, stats)
)
async def _match_conversion_from_db_safe(
self,
pms_reservation_id: int,
semaphore: asyncio.Semaphore,
stats: dict[str, int],
) -> None:
"""Phase 2: Safely match a conversion using only database data with transaction management.
In concurrent mode, creates its own session from SessionMaker.
In sequential mode, uses the shared session.
Args:
pms_reservation_id: PMS reservation ID to match
semaphore: Semaphore to limit concurrent operations
stats: Shared stats dictionary (thread-safe due to GIL)
"""
async with semaphore:
# In concurrent mode, create a new session for this task
if self.session_maker:
session = await self.session_maker.create_session()
else:
session = self.session
try:
# Phase 2: Match conversion using only database data
await self._match_conversion_using_db_data(
pms_reservation_id, session, stats
)
# Commit this task's transaction
await session.commit()
_LOGGER.debug(
"Successfully matched conversion for reservation %s",
pms_reservation_id,
)
except Exception as e:
# Rollback this task's transaction
await session.rollback()
_LOGGER.exception(
"Error matching conversion for reservation %s: %s",
pms_reservation_id,
e,
)
stats["errors"] += 1
finally:
# Close the session if it was created by SessionMaker
if self.session_maker:
await session.close()
async def _match_conversion_using_db_data(
self,
pms_reservation_id: int,
session: AsyncSession | None = None,
stats: dict[str, int] | None = None,
) -> None:
"""Match a conversion to an existing reservation using tracking IDs.
Uses the stored advertising tracking ID (md5_unique_id/fbclid/gclid) plus hashed
guest details for deduplication when multiple reservations share the same tracking ID.
Args:
pms_reservation_id: PMS reservation ID to match
session: AsyncSession to use
stats: Shared stats dictionary to update (optional)
"""
if session is None:
session = self.session
if not self.hotel_id:
_LOGGER.error(
"Cannot match conversion: hotel_id not set in ConversionService"
)
return
# Get the conversion from the database with related data
result = await session.execute(
select(Conversion)
.where(
Conversion.hotel_id == self.hotel_id,
Conversion.pms_reservation_id == pms_reservation_id,
)
.options(
selectinload(Conversion.guest),
selectinload(Conversion.conversion_rooms),
)
)
conversion = result.scalar_one_or_none()
if not conversion:
# This should be extremely rare since we filtered in process_conversion_xml
_LOGGER.debug(
"Conversion not found for pms_reservation_id=%s (may have been deleted)",
pms_reservation_id,
)
return
# Get conversion_guest if it exists (has the hashed data)
conversion_guest = conversion.guest
# Extract hashed and raw data from conversion_guest
hashed_first_name = None
hashed_last_name = None
hashed_email = None
if conversion_guest:
hashed_first_name = conversion_guest.hashed_first_name
hashed_last_name = conversion_guest.hashed_last_name
hashed_email = conversion_guest.hashed_email
matched_reservation = None
matched_customer = None
if conversion.advertising_campagne:
matched_reservation = await self._match_by_advertising(
conversion.advertising_campagne,
conversion.hotel_id,
hashed_first_name,
hashed_last_name,
hashed_email,
session,
)
if matched_reservation:
matched_customer = matched_reservation.customer
_LOGGER.info(
"Matched conversion by advertising ID (pms_id=%s, reservation_id=%d)",
pms_reservation_id,
matched_reservation.id,
)
# Update the conversion with matched entities if found
if matched_reservation:
conversion.reservation_id = matched_reservation.id
conversion.customer_id = matched_customer.id if matched_customer else None
# ID-based matches are always directly attributable
conversion.directly_attributable = True
conversion.guest_matched = False
conversion.updated_at = datetime.now()
# Update stats if provided
if stats is not None:
if matched_reservation:
stats["matched_to_reservation"] += 1
else:
stats["unmatched"] += 1
async def _run_guest_matching(
self,
pms_reservation_ids: list[int],
stats: dict[str, int],
run_full_guest_matching: bool,
) -> None:
"""Run guest-detail matching for conversions.
Args:
pms_reservation_ids: Reservation IDs processed in the current XML run
stats: Shared stats dictionary
run_full_guest_matching: If True, scan all guest conversions (including those already matched)
"""
if not self.hotel_id:
_LOGGER.warning("Guest matching skipped: hotel_id not set")
return
if not run_full_guest_matching and not pms_reservation_ids:
return
if self.session_maker:
session = await self.session_maker.create_session()
close_session = True
else:
session = self.session
close_session = False
if not session:
_LOGGER.warning("Guest matching skipped: no active session")
return
try:
base_conditions = [
Conversion.hotel_id == self.hotel_id,
Conversion.guest_id.isnot(None),
]
mode_label = "full-scan" if run_full_guest_matching else "new-only"
query = select(Conversion).where(*base_conditions).options(
selectinload(Conversion.guest),
selectinload(Conversion.conversion_rooms),
selectinload(Conversion.customer),
)
target_ids: set[int] = set()
if run_full_guest_matching:
_LOGGER.info(
"Guest matching (%s): scanning all conversions with guests for hotel %s",
mode_label,
self.hotel_id,
)
else:
query = query.where(Conversion.reservation_id.is_(None))
if pms_reservation_ids:
target_ids = {
int(pms_id)
for pms_id in pms_reservation_ids
if pms_id is not None
}
if not target_ids:
return
query = query.where(Conversion.pms_reservation_id.in_(target_ids))
_LOGGER.debug(
"Guest matching (%s): scanning %d recent conversions",
mode_label,
len(target_ids),
)
result = await session.execute(query)
conversions = result.scalars().all()
if not conversions:
return
for conversion in conversions:
had_reservation = conversion.reservation_id is not None
had_customer = conversion.customer_id is not None
matched_reservation, matched_customer = await self._apply_guest_matching(
conversion, session, rematch_existing=run_full_guest_matching
)
if matched_reservation and not had_reservation:
stats["matched_to_reservation"] += 1
if stats["unmatched"] > 0:
stats["unmatched"] -= 1
elif matched_customer and not had_customer:
stats["matched_to_customer"] += 1
if stats["unmatched"] > 0:
stats["unmatched"] -= 1
await session.commit()
except Exception as exc:
await session.rollback()
_LOGGER.exception("Guest matching failed: %s", exc)
finally:
if close_session:
await session.close()
async def _apply_guest_matching(
self,
conversion: Conversion,
session: AsyncSession,
*,
rematch_existing: bool,
) -> tuple[Reservation | None, Customer | None]:
"""Apply guest-detail matching to a single conversion."""
guest = conversion.guest
if not guest:
return None, None
existing_customer = conversion.customer
matched_customer = existing_customer
if rematch_existing or matched_customer is None:
candidate = await self._find_customer_for_guest(guest, session)
if candidate is not None:
matched_customer = candidate
elif matched_customer is None:
return None, None
matched_reservation = None
if matched_customer:
matched_reservation = await self._find_reservation_for_customer(
matched_customer.id, conversion, session
)
if matched_customer:
conversion.customer_id = matched_customer.id
conversion.guest_matched = True
if matched_reservation:
conversion.reservation_id = matched_reservation.id
conversion.directly_attributable = True
elif conversion.reservation_id is None:
conversion.directly_attributable = False
conversion.updated_at = datetime.now()
return matched_reservation, matched_customer
async def _find_customer_for_guest(
self, guest: ConversionGuest, session: AsyncSession
) -> Customer | None:
"""Locate the best matching customer for the given guest hashes."""
conditions = []
if guest.hashed_email:
conditions.append(Customer.hashed_email == guest.hashed_email)
if guest.hashed_first_name and guest.hashed_last_name:
conditions.append(
(Customer.hashed_given_name == guest.hashed_first_name)
& (Customer.hashed_surname == guest.hashed_last_name)
)
if not conditions:
return None
query = select(Customer).where(or_(*conditions))
result = await session.execute(query)
candidates = result.scalars().all()
if not candidates:
return None
if len(candidates) == 1:
return candidates[0]
best_customer: Customer | None = None
best_score = -1
tie = False
for candidate in candidates:
score = self._score_guest_customer_match(guest, candidate)
if score > best_score:
best_score = score
best_customer = candidate
tie = False
elif score == best_score:
tie = True
if best_customer and best_score > 0 and not tie:
return best_customer
return candidates[0]
def _score_guest_customer_match(
self, guest: ConversionGuest, customer: Customer
) -> int:
"""Score how well a guest matches a given customer using hashed data."""
score = 0
if guest.hashed_email and customer.hashed_email == guest.hashed_email:
score += 200
if guest.hashed_first_name and guest.hashed_last_name:
if (
customer.hashed_given_name == guest.hashed_first_name
and customer.hashed_surname == guest.hashed_last_name
):
score += 100
else:
if guest.hashed_first_name and customer.hashed_given_name == guest.hashed_first_name:
score += 40
if guest.hashed_last_name and customer.hashed_surname == guest.hashed_last_name:
score += 40
if guest.hashed_country_code and customer.hashed_country_code == guest.hashed_country_code:
score += 5
if guest.hashed_birth_date and customer.hashed_birth_date == guest.hashed_birth_date:
score += 2
return score
async def _find_reservation_for_customer(
self,
customer_id: int,
conversion: Conversion,
session: AsyncSession,
) -> Reservation | None:
"""Find a reservation for the customer that matches this conversion's stay dates."""
if not conversion.conversion_rooms:
return None
query = select(Reservation).where(Reservation.customer_id == customer_id)
result = await session.execute(query)
reservations = result.scalars().all()
if not reservations:
return None
for reservation in reservations:
for room in conversion.conversion_rooms:
if (
room.arrival_date
and room.departure_date
and reservation.start_date
and reservation.end_date
):
arrival_match = (
abs((room.arrival_date - reservation.start_date).days) <= 7
)
departure_match = (
abs((room.departure_date - reservation.end_date).days) <= 7
)
if arrival_match and departure_match:
return reservation
return None