1829 lines
72 KiB
Python
1829 lines
72 KiB
Python
"""Service for handling conversion data from hotel PMS XML files."""
|
|
|
|
import asyncio
|
|
import xml.etree.ElementTree as ET
|
|
from datetime import UTC, datetime
|
|
from decimal import Decimal
|
|
from typing import Any
|
|
|
|
from sqlalchemy import or_, select
|
|
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from .db import (
|
|
Conversion,
|
|
ConversionGuest,
|
|
ConversionRoom,
|
|
Customer,
|
|
HashedCustomer,
|
|
Reservation,
|
|
SessionMaker,
|
|
)
|
|
from .logging_config import get_logger
|
|
from .schemas import ConversionData, ConversionGuestData
|
|
|
|
_LOGGER = get_logger(__name__)
|
|
|
|
# Limit concurrent reservation processing to avoid overwhelming the database
|
|
MAX_CONCURRENT_RESERVATIONS = 10
|
|
|
|
|
|
class ConversionService:
|
|
"""Service for processing and storing conversion/daily sales data.
|
|
|
|
Supports two modes of operation:
|
|
1. Sequential mode: Single AsyncSession passed in, uses sequential processing
|
|
2. Concurrent mode: SessionMaker passed in, creates independent sessions per task
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
session: AsyncSession | SessionMaker | None = None,
|
|
hotel_id: str | None = None,
|
|
):
|
|
"""Initialize the ConversionService.
|
|
|
|
Args:
|
|
session: Can be either:
|
|
- AsyncSession: Single session for sequential processing
|
|
- SessionMaker: Factory for creating sessions in concurrent mode
|
|
- None: Not recommended, but allowed for subclassing
|
|
hotel_id: Hotel ID for this conversion service context (from authenticated user)
|
|
|
|
"""
|
|
self.session = None
|
|
self.session_maker = None
|
|
self.supports_concurrent = False
|
|
self.hotel_id = hotel_id
|
|
|
|
# Cache for reservation and customer data within a single XML processing run
|
|
# Maps hotel_code -> list of (reservation, hashed_customer) tuples
|
|
# This significantly speeds up matching when processing large XML files
|
|
# Uses hashed data for matching to preserve privacy
|
|
self._reservation_cache: dict[
|
|
str | None, list[tuple[Reservation, HashedCustomer | None]]
|
|
] = {}
|
|
self._cache_initialized = False
|
|
|
|
if isinstance(session, SessionMaker):
|
|
self.session_maker = session
|
|
self.supports_concurrent = True
|
|
_LOGGER.info(
|
|
"ConversionService initialized in concurrent mode with SessionMaker"
|
|
)
|
|
elif isinstance(session, AsyncSession):
|
|
self.session = session
|
|
self.supports_concurrent = False
|
|
_LOGGER.info(
|
|
"ConversionService initialized in sequential mode with single session"
|
|
)
|
|
elif session is not None:
|
|
raise TypeError(
|
|
f"session must be AsyncSession or SessionMaker, got {type(session)}"
|
|
)
|
|
|
|
async def _extract_unique_guests_from_xml(
|
|
self, reservations: list
|
|
) -> dict[tuple[str, int], ConversionGuestData]:
|
|
"""Extract and deduplicate all guest data from XML reservations.
|
|
|
|
Phase 0: Single pass through XML to collect all unique guests.
|
|
Uses (hotel_id, guest_id) as the key for deduplication.
|
|
Validates each guest using Pydantic before storing.
|
|
|
|
Args:
|
|
reservations: List of XML reservation elements
|
|
|
|
Returns:
|
|
Dictionary mapping (hotel_id, guest_id) to validated ConversionGuestData
|
|
|
|
"""
|
|
guest_data_by_key = {}
|
|
now = datetime.now(UTC)
|
|
|
|
for reservation_elem in reservations:
|
|
hotel_id = reservation_elem.get("hotelID")
|
|
guest_elem = reservation_elem.find("guest")
|
|
|
|
if guest_elem is None:
|
|
_LOGGER.debug(
|
|
"No guest element found, skipping reservation %s (will be created with guest_id=None in Phase 2)",
|
|
reservation_elem.get("id"),
|
|
)
|
|
continue
|
|
|
|
guest_id = guest_elem.get("id")
|
|
guest_first_name = guest_elem.get("firstName")
|
|
guest_last_name = guest_elem.get("lastName")
|
|
guest_email = guest_elem.get("email")
|
|
guest_country_code = guest_elem.get("countryCode")
|
|
guest_birth_date_str = guest_elem.get("dateOfBirth")
|
|
|
|
guest_birth_date = None
|
|
if guest_birth_date_str:
|
|
try:
|
|
guest_birth_date = datetime.strptime(
|
|
guest_birth_date_str, "%Y-%m-%d"
|
|
).date()
|
|
except ValueError:
|
|
_LOGGER.warning(
|
|
"Invalid birth date format: %s", guest_birth_date_str
|
|
)
|
|
|
|
# Validate guest data with Pydantic during extraction
|
|
try:
|
|
validated_guest = ConversionGuestData(
|
|
hotel_id=hotel_id,
|
|
guest_id=guest_id, # Will be validated and converted to int
|
|
guest_first_name=guest_first_name,
|
|
guest_last_name=guest_last_name,
|
|
guest_email=guest_email,
|
|
guest_country_code=guest_country_code,
|
|
guest_birth_date=guest_birth_date,
|
|
first_seen=now,
|
|
last_seen=now,
|
|
)
|
|
|
|
# Use validated guest_id (now an int) for the key
|
|
key = (hotel_id, validated_guest.guest_id)
|
|
|
|
# Store validated guest data (will keep the last occurrence from XML)
|
|
guest_data_by_key[key] = validated_guest
|
|
|
|
except ValueError:
|
|
_LOGGER.exception(
|
|
"Failed to validate guest data for reservation %s",
|
|
reservation_elem.get("id"),
|
|
)
|
|
continue
|
|
|
|
return guest_data_by_key
|
|
|
|
async def _bulk_upsert_guests(
|
|
self,
|
|
session: AsyncSession,
|
|
guest_data_by_key: dict[tuple[str, int], ConversionGuestData],
|
|
) -> None:
|
|
"""Bulk upsert all unique guests to database using PostgreSQL ON CONFLICT.
|
|
|
|
Phase 1: Atomic upsert of all guests extracted in Phase 0.
|
|
Uses ON CONFLICT DO UPDATE to handle duplicates safely without race conditions.
|
|
Processes in batches to avoid overwhelming the database with large XML files.
|
|
|
|
Args:
|
|
session: AsyncSession to use
|
|
guest_data_by_key: Dictionary mapping (hotel_id, guest_id) to
|
|
validated ConversionGuestData
|
|
|
|
"""
|
|
if not guest_data_by_key:
|
|
return
|
|
|
|
# Process in batches to avoid overwhelming the database
|
|
batch_size = 1000
|
|
items = list(guest_data_by_key.items())
|
|
|
|
for batch_start in range(0, len(items), batch_size):
|
|
batch_end = min(batch_start + batch_size, len(items))
|
|
batch_items = items[batch_start:batch_end]
|
|
|
|
# Prepare list of values for this batch (already validated)
|
|
values_list = []
|
|
for (hotel_id, guest_id), validated_guest in batch_items:
|
|
# Convert validated Pydantic model to dict for insertion
|
|
# (all validations and hash calculations are already done)
|
|
values_list.append(validated_guest.model_dump())
|
|
|
|
# Use PostgreSQL ON CONFLICT DO UPDATE for atomic upsert
|
|
stmt = pg_insert(ConversionGuest).values(values_list)
|
|
stmt = stmt.on_conflict_do_update(
|
|
index_elements=["hotel_id", "guest_id"],
|
|
set_={
|
|
"guest_first_name": stmt.excluded.guest_first_name,
|
|
"guest_last_name": stmt.excluded.guest_last_name,
|
|
"guest_email": stmt.excluded.guest_email,
|
|
"guest_country_code": stmt.excluded.guest_country_code,
|
|
"guest_birth_date": stmt.excluded.guest_birth_date,
|
|
"hashed_first_name": stmt.excluded.hashed_first_name,
|
|
"hashed_last_name": stmt.excluded.hashed_last_name,
|
|
"hashed_email": stmt.excluded.hashed_email,
|
|
"hashed_country_code": stmt.excluded.hashed_country_code,
|
|
"hashed_birth_date": stmt.excluded.hashed_birth_date,
|
|
"last_seen": stmt.excluded.last_seen,
|
|
},
|
|
)
|
|
|
|
await session.execute(stmt)
|
|
|
|
_LOGGER.info(
|
|
"Phase 1: Upserted batch %d-%d of %d guests",
|
|
batch_start + 1,
|
|
batch_end,
|
|
len(items),
|
|
)
|
|
|
|
async def process_conversion_xml(self, xml_content: str) -> dict[str, Any]:
|
|
"""Parse conversion XML and save daily sales data to database.
|
|
|
|
Args:
|
|
xml_content: XML string containing reservation and daily sales data
|
|
|
|
Returns:
|
|
Dictionary with processing statistics
|
|
|
|
"""
|
|
try:
|
|
root = ET.fromstring(xml_content)
|
|
except ET.ParseError as e:
|
|
_LOGGER.error("Failed to parse conversion XML: %s", e)
|
|
raise ValueError(f"Invalid XML content: {e}") from e
|
|
|
|
# Initialize cache for this XML processing run
|
|
await self._load_reservation_cache()
|
|
|
|
stats = {
|
|
"total_reservations": 0,
|
|
"deleted_reservations": 0,
|
|
"total_daily_sales": 0,
|
|
"matched_to_reservation": 0,
|
|
"matched_to_customer": 0,
|
|
"matched_to_hashed_customer": 0,
|
|
"unmatched": 0,
|
|
"errors": 0,
|
|
}
|
|
|
|
# Get a session for deleted reservations processing
|
|
if self.session_maker:
|
|
session = await self.session_maker.create_session()
|
|
else:
|
|
session = self.session
|
|
|
|
# Process deleted reservations
|
|
for deleted_res in root.findall("Deletedreservation"):
|
|
stats["deleted_reservations"] += 1
|
|
pms_reservation_id_str = deleted_res.get("ID")
|
|
try:
|
|
pms_reservation_id = (
|
|
int(pms_reservation_id_str) if pms_reservation_id_str else None
|
|
)
|
|
if pms_reservation_id is None:
|
|
_LOGGER.warning(
|
|
"Deleted reservation missing ID attribute, skipping"
|
|
)
|
|
continue
|
|
await self._handle_deleted_reservation(pms_reservation_id, session)
|
|
await session.commit()
|
|
except Exception as e:
|
|
await session.rollback()
|
|
_LOGGER.exception(
|
|
"Error deleting reservation %s: %s",
|
|
pms_reservation_id,
|
|
e,
|
|
)
|
|
stats["errors"] += 1
|
|
|
|
# Close session if created by SessionMaker
|
|
if self.session_maker:
|
|
await session.close()
|
|
|
|
# Process active reservations in four phases:
|
|
# Phase 0: Extract and deduplicate all guest data from XML
|
|
# Phase 1: Bulk upsert all unique guests to DB (no race conditions)
|
|
# Phase 2: Create/update all conversion records (ConversionGuests already exist)
|
|
# Phase 3: Match them to existing reservations/customers
|
|
reservations = root.findall("reservation")
|
|
stats["total_reservations"] = len(reservations)
|
|
|
|
if not reservations:
|
|
return stats
|
|
|
|
_LOGGER.info("Processing %d reservations in xml", len(reservations))
|
|
|
|
# Phase 0: Extract and deduplicate all guest data from XML
|
|
_LOGGER.debug("Phase 0: Extracting and deduplicating guest data from XML")
|
|
guest_data_by_key = await self._extract_unique_guests_from_xml(reservations)
|
|
_LOGGER.info(
|
|
"Phase 0: Extracted %d unique guests from %d reservations",
|
|
len(guest_data_by_key),
|
|
len(reservations),
|
|
)
|
|
|
|
# Phase 1: Bulk upsert all unique guests to database
|
|
if guest_data_by_key:
|
|
_LOGGER.debug(
|
|
"Phase 1: Bulk upserting %d unique guests to database",
|
|
len(guest_data_by_key),
|
|
)
|
|
if self.session_maker:
|
|
session = await self.session_maker.create_session()
|
|
else:
|
|
session = self.session
|
|
|
|
try:
|
|
await self._bulk_upsert_guests(session, guest_data_by_key)
|
|
await session.commit()
|
|
_LOGGER.info(
|
|
"Phase 1: Successfully upserted %d guests", len(guest_data_by_key)
|
|
)
|
|
except Exception as e:
|
|
await session.rollback()
|
|
_LOGGER.exception("Phase 1: Error during bulk guest upsert: %s", e)
|
|
stats["errors"] += len(guest_data_by_key)
|
|
return stats
|
|
finally:
|
|
if self.session_maker:
|
|
await session.close()
|
|
|
|
# Phase 2: Create/update all conversions (no matching, ConversionGuests already exist)
|
|
# Returns list of successfully created pms_reservation_ids
|
|
_LOGGER.debug("Phase 2: Creating/updating conversions")
|
|
if self.supports_concurrent:
|
|
pms_reservation_ids = await self._process_reservations_concurrent(
|
|
reservations, stats
|
|
)
|
|
else:
|
|
pms_reservation_ids = await self._process_reservations_sequential(
|
|
reservations, stats
|
|
)
|
|
|
|
_LOGGER.debug(
|
|
"Phase 3: Found %d successfully created conversions out of %d total reservations",
|
|
len(pms_reservation_ids),
|
|
len(reservations),
|
|
)
|
|
|
|
# Phase 3: Match all conversions using database data only
|
|
# Only match conversions that actually exist in the database
|
|
# No XML parsing, no re-hashing - complete separation of concerns
|
|
# This also enables matching historical data that wasn't just created
|
|
if pms_reservation_ids:
|
|
_LOGGER.debug("Phase 3: Matching conversions to reservations/customers")
|
|
if self.supports_concurrent:
|
|
await self._match_conversions_from_db_concurrent(
|
|
pms_reservation_ids, stats
|
|
)
|
|
else:
|
|
await self._match_conversions_from_db_sequential(
|
|
pms_reservation_ids, stats
|
|
)
|
|
|
|
return stats
|
|
|
|
async def _load_reservation_cache(self) -> None:
|
|
"""Load all reservations and hashed customers into cache for fast matching.
|
|
|
|
This method is called once at the start of processing a large XML file.
|
|
It loads all reservations with their associated hashed customers into an in-memory
|
|
cache organized by hotel_code. This avoids repeated database queries during
|
|
matching operations and uses hashed data for privacy-preserving matching.
|
|
|
|
The cache structure:
|
|
- Key: hotel_code (str or None)
|
|
- Value: List of (reservation, hashed_customer) tuples
|
|
|
|
This is especially beneficial for large XML files with many reservations
|
|
where matching criteria is the same across multiple reservations.
|
|
"""
|
|
if self._cache_initialized:
|
|
_LOGGER.debug("Reservation cache already initialized, skipping reload")
|
|
return
|
|
|
|
# Get a session for loading the cache
|
|
if self.session_maker:
|
|
session = await self.session_maker.create_session()
|
|
close_session = True
|
|
else:
|
|
session = self.session
|
|
close_session = False
|
|
|
|
if not session:
|
|
_LOGGER.warning("No session available for cache loading")
|
|
return
|
|
|
|
try:
|
|
# Load all reservations with their hashed customers in one query
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
query = select(Reservation).options(
|
|
selectinload(Reservation.customer).selectinload(
|
|
Customer.hashed_version
|
|
),
|
|
selectinload(Reservation.hashed_customer),
|
|
)
|
|
result = await session.execute(query)
|
|
reservations = result.scalars().all()
|
|
|
|
_LOGGER.info("Loaded %d reservations into cache", len(reservations))
|
|
|
|
# Organize by hotel_code for efficient lookup
|
|
for reservation in reservations:
|
|
hotel_code = reservation.hotel_code
|
|
if hotel_code not in self._reservation_cache:
|
|
self._reservation_cache[hotel_code] = []
|
|
# Cache the hashed customer - prefer direct relationship, fall back to customer relationship
|
|
hashed_customer = None
|
|
if reservation.hashed_customer:
|
|
hashed_customer = reservation.hashed_customer
|
|
elif reservation.customer and reservation.customer.hashed_version:
|
|
hashed_customer = reservation.customer.hashed_version
|
|
self._reservation_cache[hotel_code].append(
|
|
(reservation, hashed_customer)
|
|
)
|
|
|
|
self._cache_initialized = True
|
|
_LOGGER.info(
|
|
"Reservation cache initialized with %d hotel codes",
|
|
len(self._reservation_cache),
|
|
)
|
|
except Exception as e:
|
|
_LOGGER.error("Failed to load reservation cache: %s", e)
|
|
# Cache remains empty, fall back to direct queries
|
|
self._cache_initialized = True
|
|
finally:
|
|
# Close session if we created it
|
|
if close_session:
|
|
await session.close()
|
|
|
|
async def _process_reservations_sequential(
|
|
self, reservations: list, stats: dict[str, int]
|
|
) -> list[str]:
|
|
"""Process reservations one at a time (original behavior).
|
|
|
|
Returns:
|
|
List of pms_reservation_ids that were successfully created/updated
|
|
|
|
"""
|
|
semaphore = asyncio.Semaphore(1) # Process one at a time
|
|
results = []
|
|
tasks = []
|
|
|
|
try:
|
|
async with asyncio.TaskGroup() as tg:
|
|
for reservation in reservations:
|
|
task = tg.create_task(
|
|
self._process_reservation_safe(reservation, semaphore, stats)
|
|
)
|
|
tasks.append(task)
|
|
except ExceptionGroup as eg:
|
|
# Fail fast: cancel all remaining tasks and re-raise the first exception
|
|
for task in tasks:
|
|
if not task.done():
|
|
task.cancel()
|
|
# Re-raise the first exception from the group
|
|
if eg.exceptions:
|
|
raise eg.exceptions[0] from eg
|
|
raise
|
|
|
|
# Collect results from all tasks
|
|
for task in tasks:
|
|
pms_id = task.result()
|
|
if pms_id is not None:
|
|
results.append(pms_id)
|
|
|
|
return results
|
|
|
|
async def _process_reservations_concurrent(
|
|
self, reservations: list, stats: dict[str, int]
|
|
) -> list[str]:
|
|
"""Process reservations concurrently with semaphore limiting.
|
|
|
|
Each concurrent task gets its own independent database session
|
|
from the SessionMaker.
|
|
|
|
Returns:
|
|
List of pms_reservation_ids that were successfully created/updated
|
|
|
|
"""
|
|
if not self.session_maker:
|
|
_LOGGER.error(
|
|
"Concurrent processing requested but SessionMaker not available. "
|
|
"Falling back to sequential processing."
|
|
)
|
|
return await self._process_reservations_sequential(reservations, stats)
|
|
|
|
semaphore = asyncio.Semaphore(MAX_CONCURRENT_RESERVATIONS)
|
|
results = []
|
|
tasks = []
|
|
|
|
try:
|
|
async with asyncio.TaskGroup() as tg:
|
|
for reservation in reservations:
|
|
task = tg.create_task(
|
|
self._process_reservation_safe(reservation, semaphore, stats)
|
|
)
|
|
tasks.append(task)
|
|
except ExceptionGroup as eg:
|
|
# Fail fast: cancel all remaining tasks and re-raise the first exception
|
|
for task in tasks:
|
|
if not task.done():
|
|
task.cancel()
|
|
# Re-raise the first exception from the group
|
|
if eg.exceptions:
|
|
raise eg.exceptions[0] from eg
|
|
raise
|
|
|
|
# Collect results from all tasks
|
|
for task in tasks:
|
|
pms_id = task.result()
|
|
if pms_id is not None:
|
|
results.append(pms_id)
|
|
|
|
return results
|
|
|
|
async def _process_reservation_safe(
|
|
self,
|
|
reservation_elem: Any,
|
|
semaphore: asyncio.Semaphore,
|
|
stats: dict[str, int],
|
|
) -> str | None:
|
|
"""Safely process a single reservation with semaphore and transaction management.
|
|
|
|
Phase 1: Creation/update only (no matching).
|
|
In concurrent mode, creates its own session from SessionMaker.
|
|
In sequential mode, uses the shared session.
|
|
|
|
Args:
|
|
reservation_elem: XML element for the reservation
|
|
semaphore: Semaphore to limit concurrent operations
|
|
stats: Shared stats dictionary (thread-safe due to GIL)
|
|
|
|
Returns:
|
|
pms_reservation_id if successfully created/updated, None if error occurred
|
|
|
|
"""
|
|
pms_reservation_id = int(reservation_elem.get("id"))
|
|
|
|
async with semaphore:
|
|
# In concurrent mode, create a new session for this task
|
|
if self.session_maker:
|
|
session = await self.session_maker.create_session()
|
|
else:
|
|
session = self.session
|
|
|
|
try:
|
|
# Phase 1: Create/update conversion record (no matching)
|
|
reservation_stats = await self._create_or_update_conversion(
|
|
reservation_elem, session
|
|
)
|
|
stats["total_daily_sales"] += reservation_stats["daily_sales_count"]
|
|
|
|
# Commit this task's transaction
|
|
await session.commit()
|
|
_LOGGER.debug(
|
|
"Successfully created/updated conversion for reservation %s",
|
|
pms_reservation_id,
|
|
)
|
|
return pms_reservation_id
|
|
|
|
except Exception as e:
|
|
# Rollback this task's transaction
|
|
await session.rollback()
|
|
_LOGGER.exception(
|
|
"Error processing reservation %s: %s",
|
|
pms_reservation_id,
|
|
e,
|
|
)
|
|
stats["errors"] += 1
|
|
return None
|
|
finally:
|
|
# Close the session if it was created by SessionMaker
|
|
if self.session_maker:
|
|
await session.close()
|
|
|
|
async def _handle_deleted_reservation(
|
|
self, pms_reservation_id: int, session: AsyncSession
|
|
):
|
|
"""Handle deleted reservation by marking conversions as deleted or removing them.
|
|
|
|
Args:
|
|
pms_reservation_id: PMS reservation ID to delete
|
|
session: AsyncSession to use for the operation
|
|
|
|
"""
|
|
if not self.hotel_id:
|
|
_LOGGER.error(
|
|
"Cannot delete reservation: hotel_id not set in ConversionService"
|
|
)
|
|
return
|
|
|
|
_LOGGER.info(
|
|
"Processing deleted reservation: Hotel %s, PMS ID %s",
|
|
self.hotel_id,
|
|
pms_reservation_id,
|
|
)
|
|
|
|
# Delete conversion records for this hotel + pms_reservation_id
|
|
result = await session.execute(
|
|
select(Conversion).where(
|
|
Conversion.hotel_id == self.hotel_id,
|
|
Conversion.pms_reservation_id == pms_reservation_id,
|
|
)
|
|
)
|
|
conversions = result.scalars().all()
|
|
|
|
for conversion in conversions:
|
|
await session.delete(conversion)
|
|
|
|
if conversions:
|
|
_LOGGER.info(
|
|
"Deleted %d conversion records for hotel %s, PMS reservation %s",
|
|
len(conversions),
|
|
self.hotel_id,
|
|
pms_reservation_id,
|
|
)
|
|
|
|
async def _create_or_update_conversion(
|
|
self, reservation_elem: ET.Element, session: AsyncSession | None = None
|
|
) -> dict[str, int]:
|
|
"""Create or update a conversion record from XML (Phase 1: no matching).
|
|
|
|
Args:
|
|
reservation_elem: XML element to process
|
|
session: AsyncSession to use. If None, uses self.session.
|
|
In concurrent mode, each task passes its own session.
|
|
|
|
Returns statistics about daily sales processed.
|
|
|
|
"""
|
|
if session is None:
|
|
session = self.session
|
|
stats = {
|
|
"daily_sales_count": 0,
|
|
}
|
|
|
|
hotel_id = reservation_elem.get("hotelID")
|
|
try:
|
|
# Extract reservation metadata
|
|
|
|
pms_reservation_id = int(reservation_elem.get("id"))
|
|
except ValueError as e:
|
|
_LOGGER.error("Invalid reservation metadata in reservation element: %s", e)
|
|
return stats
|
|
|
|
reservation_number = reservation_elem.get("number")
|
|
reservation_date_str = reservation_elem.get("date")
|
|
creation_time_str = reservation_elem.get("creationTime")
|
|
reservation_type = reservation_elem.get("type")
|
|
booking_channel = reservation_elem.get("bookingChannel")
|
|
|
|
# Extract guest information from guest element
|
|
guest_elem = reservation_elem.find("guest")
|
|
guest_birth_date_str = None
|
|
guest_id = None
|
|
|
|
if guest_elem is not None:
|
|
guest_id = guest_elem.get("id")
|
|
|
|
# Advertising/tracking data
|
|
advertising_medium = reservation_elem.get("advertisingMedium")
|
|
advertising_partner = reservation_elem.get("advertisingPartner")
|
|
advertising_campagne = reservation_elem.get("advertisingCampagne")
|
|
|
|
# Parse dates
|
|
reservation_date = None
|
|
if reservation_date_str:
|
|
try:
|
|
reservation_date = datetime.strptime(
|
|
reservation_date_str, "%Y-%m-%d"
|
|
).date()
|
|
except ValueError:
|
|
_LOGGER.warning(
|
|
"Invalid reservation date format: %s", reservation_date_str
|
|
)
|
|
|
|
creation_time = None
|
|
if creation_time_str:
|
|
try:
|
|
creation_time = datetime.fromisoformat(
|
|
creation_time_str.replace("Z", "+00:00")
|
|
)
|
|
except ValueError:
|
|
_LOGGER.warning("Invalid creation time format: %s", creation_time_str)
|
|
|
|
# Process all room reservations
|
|
room_reservations = reservation_elem.find("roomReservations")
|
|
if room_reservations is None:
|
|
_LOGGER.debug(
|
|
"No roomReservations found for reservation %s", pms_reservation_id
|
|
)
|
|
return stats
|
|
|
|
# ConversionGuests have already been bulk-upserted in Phase 1,
|
|
# so we can safely create/update conversions now
|
|
# Check if conversion already exists (upsert logic)
|
|
existing_result = await session.execute(
|
|
select(Conversion).where(
|
|
Conversion.hotel_id == hotel_id,
|
|
Conversion.pms_reservation_id == pms_reservation_id,
|
|
)
|
|
)
|
|
existing_conversion = existing_result.scalar_one_or_none()
|
|
|
|
if existing_conversion:
|
|
# Update existing conversion - only update reservation metadata and advertising data
|
|
# Guest info is stored in ConversionGuest table, not here
|
|
# Don't clear reservation/customer links (matching logic will update if needed)
|
|
existing_conversion.reservation_number = reservation_number
|
|
existing_conversion.reservation_date = reservation_date
|
|
existing_conversion.creation_time = creation_time
|
|
existing_conversion.reservation_type = reservation_type
|
|
existing_conversion.booking_channel = booking_channel
|
|
existing_conversion.advertising_medium = advertising_medium
|
|
existing_conversion.advertising_partner = advertising_partner
|
|
existing_conversion.advertising_campagne = advertising_campagne
|
|
existing_conversion.updated_at = datetime.now()
|
|
conversion = existing_conversion
|
|
_LOGGER.info(
|
|
"Updated conversion %s (pms_id=%s)",
|
|
conversion.id,
|
|
pms_reservation_id,
|
|
)
|
|
else:
|
|
# Create new conversion entry (without matching - will be done later)
|
|
# Note: Guest information (first_name, last_name, email, etc) is stored in ConversionGuest table
|
|
conversion_data = ConversionData(
|
|
# Links to existing entities (nullable, will be filled in after matching)
|
|
# Reservation metadata
|
|
hotel_id=hotel_id,
|
|
guest_id=guest_id, # Links to ConversionGuest
|
|
pms_reservation_id=pms_reservation_id,
|
|
reservation_number=reservation_number,
|
|
reservation_date=reservation_date,
|
|
creation_time=creation_time,
|
|
reservation_type=reservation_type,
|
|
booking_channel=booking_channel,
|
|
# Advertising data
|
|
advertising_medium=advertising_medium,
|
|
advertising_partner=advertising_partner,
|
|
advertising_campagne=advertising_campagne,
|
|
# Metadata
|
|
)
|
|
conversion = Conversion(**conversion_data.model_dump())
|
|
session.add(conversion)
|
|
_LOGGER.debug(
|
|
"Created conversion (pms_id=%s)",
|
|
pms_reservation_id,
|
|
)
|
|
|
|
# Flush to ensure conversion has an ID before creating room reservations
|
|
await session.flush()
|
|
|
|
# Fetch ALL existing rooms for this conversion (not just the ones in current XML)
|
|
existing_rooms_result = await session.execute(
|
|
select(ConversionRoom).where(ConversionRoom.conversion_id == conversion.id)
|
|
)
|
|
existing_rooms = {
|
|
room.pms_hotel_reservation_id: room
|
|
for room in existing_rooms_result.scalars().all()
|
|
}
|
|
|
|
# Track which room IDs are present in the current XML
|
|
current_pms_hotel_reservation_ids = set()
|
|
|
|
# Process room reservations
|
|
for room_reservation in room_reservations.findall("roomReservation"):
|
|
# Extract room reservation details
|
|
arrival_str = room_reservation.get("arrival")
|
|
departure_str = room_reservation.get("departure")
|
|
room_status = room_reservation.get("status")
|
|
room_type = room_reservation.get("roomType")
|
|
room_number = room_reservation.get("roomNumber")
|
|
adults_str = room_reservation.get("adults")
|
|
rate_plan_code = room_reservation.get("ratePlanCode")
|
|
connected_room_type = room_reservation.get("connectedRoomType")
|
|
|
|
arrival_date = None
|
|
if arrival_str:
|
|
try:
|
|
arrival_date = datetime.strptime(arrival_str, "%Y-%m-%d").date()
|
|
except ValueError:
|
|
_LOGGER.warning("Invalid arrival date format: %s", arrival_str)
|
|
|
|
departure_date = None
|
|
if departure_str:
|
|
try:
|
|
departure_date = datetime.strptime(departure_str, "%Y-%m-%d").date()
|
|
except ValueError:
|
|
_LOGGER.warning("Invalid departure date format: %s", departure_str)
|
|
|
|
num_adults = None
|
|
if adults_str:
|
|
try:
|
|
num_adults = int(adults_str)
|
|
except ValueError:
|
|
_LOGGER.warning("Invalid adults value: %s", adults_str)
|
|
|
|
# Create composite ID for upsert: pms_reservation_id + room_number
|
|
# This allows updating the same room reservation if it appears again
|
|
pms_hotel_reservation_id = f"{pms_reservation_id}_{room_number}"
|
|
|
|
# Track this room as present in current XML
|
|
current_pms_hotel_reservation_ids.add(pms_hotel_reservation_id)
|
|
|
|
# Process daily sales and extract total revenue
|
|
daily_sales_elem = room_reservation.find("dailySales")
|
|
daily_sales_list = []
|
|
total_revenue = Decimal(0)
|
|
|
|
if daily_sales_elem is not None:
|
|
for daily_sale in daily_sales_elem.findall("dailySale"):
|
|
stats["daily_sales_count"] += 1
|
|
|
|
# Extract daily sale data
|
|
sale_date_str = daily_sale.get("date")
|
|
daily_sale_obj = {}
|
|
|
|
if sale_date_str:
|
|
daily_sale_obj["date"] = sale_date_str
|
|
|
|
# Extract all revenue fields
|
|
revenue_total_str = daily_sale.get("revenueTotal")
|
|
if revenue_total_str:
|
|
daily_sale_obj["revenueTotal"] = revenue_total_str
|
|
try:
|
|
total_revenue += Decimal(revenue_total_str)
|
|
except (ValueError, TypeError):
|
|
_LOGGER.warning(
|
|
"Invalid revenueTotal value: %s", revenue_total_str
|
|
)
|
|
|
|
# Add other revenue fields if present
|
|
if daily_sale.get("revenueLogis"):
|
|
daily_sale_obj["revenueLogis"] = daily_sale.get("revenueLogis")
|
|
if daily_sale.get("revenueBoard"):
|
|
daily_sale_obj["revenueBoard"] = daily_sale.get("revenueBoard")
|
|
if daily_sale.get("revenueFB"):
|
|
daily_sale_obj["revenueFB"] = daily_sale.get("revenueFB")
|
|
if daily_sale.get("revenueSpa"):
|
|
daily_sale_obj["revenueSpa"] = daily_sale.get("revenueSpa")
|
|
if daily_sale.get("revenueOther"):
|
|
daily_sale_obj["revenueOther"] = daily_sale.get("revenueOther")
|
|
|
|
if daily_sale_obj: # Only add if has data
|
|
daily_sales_list.append(daily_sale_obj)
|
|
|
|
# Check if room reservation already exists using batch-loaded data
|
|
existing_room_reservation = existing_rooms.get(pms_hotel_reservation_id)
|
|
|
|
if existing_room_reservation:
|
|
# Update existing room reservation with all fields
|
|
existing_room_reservation.arrival_date = arrival_date
|
|
existing_room_reservation.departure_date = departure_date
|
|
existing_room_reservation.room_status = room_status
|
|
existing_room_reservation.room_type = room_type
|
|
existing_room_reservation.num_adults = num_adults
|
|
existing_room_reservation.rate_plan_code = rate_plan_code
|
|
existing_room_reservation.connected_room_type = connected_room_type
|
|
existing_room_reservation.daily_sales = (
|
|
daily_sales_list if daily_sales_list else None
|
|
)
|
|
existing_room_reservation.total_revenue = (
|
|
total_revenue if total_revenue > 0 else None
|
|
)
|
|
existing_room_reservation.updated_at = datetime.now()
|
|
_LOGGER.debug(
|
|
"Updated room reservation %s (pms_id=%s, room=%s)",
|
|
existing_room_reservation.id,
|
|
pms_reservation_id,
|
|
room_number,
|
|
)
|
|
else:
|
|
# Create new room reservation
|
|
room_reservation_record = ConversionRoom(
|
|
conversion_id=conversion.id,
|
|
pms_hotel_reservation_id=pms_hotel_reservation_id,
|
|
arrival_date=arrival_date,
|
|
departure_date=departure_date,
|
|
room_status=room_status,
|
|
room_type=room_type,
|
|
room_number=room_number,
|
|
num_adults=num_adults,
|
|
rate_plan_code=rate_plan_code,
|
|
connected_room_type=connected_room_type,
|
|
daily_sales=daily_sales_list if daily_sales_list else None,
|
|
total_revenue=total_revenue if total_revenue > 0 else None,
|
|
created_at=datetime.now(),
|
|
updated_at=datetime.now(),
|
|
)
|
|
session.add(room_reservation_record)
|
|
_LOGGER.debug(
|
|
"Created room reservation (pms_id=%s, room=%s, adults=%s)",
|
|
pms_reservation_id,
|
|
room_number,
|
|
num_adults,
|
|
)
|
|
|
|
# Delete room entries that are no longer present in the current XML
|
|
# This handles cases where a reservation is updated and room numbers change
|
|
rooms_to_delete = [
|
|
room
|
|
for pms_id, room in existing_rooms.items()
|
|
if pms_id not in current_pms_hotel_reservation_ids
|
|
]
|
|
|
|
if rooms_to_delete:
|
|
for room in rooms_to_delete:
|
|
await session.delete(room)
|
|
_LOGGER.debug(
|
|
"Deleted room reservation %s (pms_id=%s, room=%s) - no longer in current XML",
|
|
room.id,
|
|
room.pms_hotel_reservation_id,
|
|
room.room_number,
|
|
)
|
|
|
|
return stats
|
|
|
|
async def _match_by_advertising(
|
|
self,
|
|
advertising_campagne: str,
|
|
hotel_id: str | None,
|
|
guest_first_name: str | None,
|
|
guest_last_name: str | None,
|
|
guest_email: str | None,
|
|
advertising_partner: str | None,
|
|
session: AsyncSession | None = None,
|
|
) -> Reservation | None:
|
|
"""Match reservation by advertising tracking data (fbclid/gclid/md5_unique_id).
|
|
|
|
Args:
|
|
advertising_campagne: Tracking ID from PMS (could be truncated click_id or md5_unique_id)
|
|
hotel_id: Hotel ID for filtering
|
|
guest_first_name: Guest first name for disambiguation
|
|
guest_last_name: Guest last name for disambiguation
|
|
guest_email: Guest email for disambiguation
|
|
advertising_partner: Partner info (matches utm_medium)
|
|
session: AsyncSession to use. If None, uses self.session.
|
|
|
|
Returns:
|
|
Matched Reservation or None
|
|
|
|
"""
|
|
if session is None:
|
|
session = self.session
|
|
# Find reservations where:
|
|
# - fbclid/gclid starts with the advertising_campagne value, OR
|
|
# - md5_unique_id matches exactly (for direct ID matching)
|
|
query = select(Reservation).where(
|
|
or_(
|
|
Reservation.fbclid.like(f"{advertising_campagne}%"),
|
|
Reservation.gclid.like(f"{advertising_campagne}%"),
|
|
Reservation.md5_unique_id == advertising_campagne,
|
|
)
|
|
)
|
|
|
|
# Eagerly load the customer relationship
|
|
query = query.options(selectinload(Reservation.customer))
|
|
|
|
# Add hotel filter if available
|
|
if hotel_id:
|
|
query = query.where(Reservation.hotel_code == hotel_id)
|
|
|
|
# Execute query
|
|
db_result = await session.execute(query)
|
|
reservations = db_result.scalars().all()
|
|
|
|
if not reservations:
|
|
return None
|
|
|
|
# If single match, return it
|
|
if len(reservations) == 1:
|
|
return reservations[0]
|
|
|
|
# If multiple matches, try to narrow down using guest details
|
|
_LOGGER.debug(
|
|
"Multiple reservations match advertisingCampagne %s (hotel=%s): found %d matches. "
|
|
"Attempting to narrow down using guest details.",
|
|
advertising_campagne,
|
|
hotel_id,
|
|
len(reservations),
|
|
)
|
|
|
|
matched_reservation = self._filter_reservations_by_guest_details(
|
|
reservations,
|
|
guest_first_name,
|
|
guest_last_name,
|
|
guest_email,
|
|
advertising_partner,
|
|
)
|
|
|
|
if matched_reservation is None:
|
|
# If we still can't narrow it down, use the first match and log warning
|
|
_LOGGER.warning(
|
|
"Could not narrow down multiple reservations for advertisingCampagne %s "
|
|
"(hotel=%s, guest=%s %s, email=%s). Using first match.",
|
|
advertising_campagne,
|
|
hotel_id,
|
|
guest_first_name,
|
|
guest_last_name,
|
|
guest_email,
|
|
)
|
|
matched_reservation = reservations[0]
|
|
|
|
return matched_reservation
|
|
|
|
async def _match_by_guest_details_hashed(
|
|
self,
|
|
guest_first_name: str | None,
|
|
guest_last_name: str | None,
|
|
guest_email: str | None,
|
|
session: AsyncSession | None = None,
|
|
) -> HashedCustomer | None:
|
|
"""Match guest by name and email directly to HashedCustomer (no Reservation needed).
|
|
|
|
This method bypasses the Reservation table entirely and matches directly against
|
|
hashed customer data. Used for guest-detail matching where we don't need to link
|
|
to a specific reservation.
|
|
|
|
Args:
|
|
guest_first_name: Guest first name (pre-hashed)
|
|
guest_last_name: Guest last name (pre-hashed)
|
|
guest_email: Guest email (pre-hashed)
|
|
session: AsyncSession to use. If None, uses self.session.
|
|
|
|
Returns:
|
|
Matched HashedCustomer or None
|
|
|
|
"""
|
|
if session is None:
|
|
session = self.session
|
|
|
|
# Query all hashed customers that match the guest details
|
|
query = select(HashedCustomer).options(selectinload(HashedCustomer.customer))
|
|
|
|
# Build filter conditions
|
|
conditions = []
|
|
if guest_email:
|
|
conditions.append(HashedCustomer.hashed_email == guest_email)
|
|
if guest_first_name and guest_last_name:
|
|
conditions.append(
|
|
(HashedCustomer.hashed_given_name == guest_first_name)
|
|
& (HashedCustomer.hashed_surname == guest_last_name)
|
|
)
|
|
|
|
if not conditions:
|
|
return None
|
|
|
|
# Combine conditions with OR (match if email matches OR name matches)
|
|
query = query.where(or_(*conditions))
|
|
|
|
db_result = await session.execute(query)
|
|
matches = db_result.scalars().all()
|
|
|
|
if not matches:
|
|
return None
|
|
|
|
# If single match, return it
|
|
if len(matches) == 1:
|
|
return matches[0]
|
|
|
|
# If multiple matches, prefer email match over name match
|
|
for match in matches:
|
|
if guest_email and match.hashed_email == guest_email:
|
|
_LOGGER.debug(
|
|
"Multiple hashed customer matches, preferring email match"
|
|
)
|
|
return match
|
|
|
|
# Otherwise return first match
|
|
_LOGGER.warning(
|
|
"Multiple hashed customer matches found for guest details, using first match"
|
|
)
|
|
return matches[0]
|
|
|
|
def _filter_reservations_by_guest_details(
|
|
self,
|
|
reservations: list[Reservation],
|
|
guest_first_name: str | None,
|
|
guest_last_name: str | None,
|
|
guest_email: str | None,
|
|
advertising_partner: str | None,
|
|
) -> Reservation | None:
|
|
"""Filter reservations using guest details to find a single match.
|
|
|
|
First tries to match by guest name and email. If that doesn't yield a single match,
|
|
tries matching by advertising_partner against utm_medium.
|
|
|
|
Args:
|
|
reservations: List of candidate reservations
|
|
guest_first_name: Guest first name
|
|
guest_last_name: Guest last name
|
|
guest_email: Guest email
|
|
advertising_partner: Partner info (e.g., "Facebook_Mobile_Feed")
|
|
|
|
Returns:
|
|
Single best-match Reservation, or None if no good match found
|
|
|
|
"""
|
|
candidates = reservations
|
|
|
|
# Try to narrow down by guest name and email
|
|
if guest_first_name or guest_last_name or guest_email:
|
|
# First try exact match on all available fields
|
|
for reservation in candidates:
|
|
customer = reservation.customer
|
|
if customer:
|
|
name_match = True
|
|
email_match = True
|
|
|
|
if guest_first_name:
|
|
name_match = name_match and (
|
|
customer.given_name
|
|
and customer.given_name.lower() == guest_first_name.lower()
|
|
)
|
|
|
|
if guest_last_name:
|
|
name_match = name_match and (
|
|
customer.surname
|
|
and customer.surname.lower() == guest_last_name.lower()
|
|
)
|
|
|
|
if guest_email:
|
|
email_match = (
|
|
customer.email_address
|
|
and customer.email_address.lower() == guest_email.lower()
|
|
)
|
|
|
|
if name_match and email_match:
|
|
_LOGGER.debug(
|
|
"Found exact match on guest name/email for %s %s",
|
|
guest_first_name,
|
|
guest_last_name,
|
|
)
|
|
return reservation
|
|
|
|
# Try to narrow down by advertising_partner matching utm_medium
|
|
if advertising_partner:
|
|
for reservation in candidates:
|
|
if (
|
|
reservation.utm_medium
|
|
and reservation.utm_medium.lower() == advertising_partner.lower()
|
|
):
|
|
_LOGGER.debug(
|
|
"Found match on advertising_partner=%s matching utm_medium",
|
|
advertising_partner,
|
|
)
|
|
return reservation
|
|
|
|
# No single clear match found
|
|
return None
|
|
|
|
async def _extract_unmatched_guests(
|
|
self, session: AsyncSession
|
|
) -> dict[str, HashedCustomer]:
|
|
"""Phase 3b: Extract unique guests from unmatched conversions and match them to customers.
|
|
|
|
Returns a mapping of guest_id -> HashedCustomer for all unique guests found in
|
|
unmatched conversions. Only processes each guest once.
|
|
|
|
This includes:
|
|
1. Conversions with no customer match at all (reservation_id IS NULL AND customer_id IS NULL)
|
|
2. Conversions matched to a customer but not a reservation (reservation_id IS NULL AND customer_id IS NOT NULL)
|
|
- These may have been matched in a previous run and need re-evaluation for reservation linking
|
|
|
|
Args:
|
|
session: AsyncSession for database queries
|
|
|
|
Returns:
|
|
Dictionary mapping guest_id to matched HashedCustomer (or None if no match)
|
|
|
|
"""
|
|
# Find all conversions that either:
|
|
# - Have no match at all (reservation_id IS NULL AND customer_id IS NULL), OR
|
|
# - Have a customer but no reservation (for re-linking in case new reservations were added)
|
|
result = await session.execute(
|
|
select(Conversion)
|
|
.where(
|
|
(Conversion.reservation_id.is_(None))
|
|
& (Conversion.guest_id.isnot(None))
|
|
)
|
|
.options(selectinload(Conversion.guest))
|
|
)
|
|
unmatched_conversions = result.scalars().all()
|
|
|
|
if not unmatched_conversions:
|
|
_LOGGER.debug("Phase 3b: No unmatched conversions with guests")
|
|
return {}
|
|
|
|
# Extract unique guests (by guest_id)
|
|
unique_guests: dict[str, Conversion] = {}
|
|
for conversion in unmatched_conversions:
|
|
if conversion.guest_id not in unique_guests:
|
|
unique_guests[conversion.guest_id] = conversion
|
|
|
|
_LOGGER.info(
|
|
"Phase 3b: Found %d unique guests from %d unmatched conversions",
|
|
len(unique_guests),
|
|
len(unmatched_conversions),
|
|
)
|
|
|
|
# Match each unique guest to a hashed customer
|
|
guest_to_hashed_customer: dict[str, HashedCustomer] = {}
|
|
for guest_id, conversion in unique_guests.items():
|
|
conversion_guest = conversion.guest
|
|
if not conversion_guest:
|
|
continue
|
|
|
|
# Try to match by guest details
|
|
matched_hashed_customer = await self._match_by_guest_details_hashed(
|
|
conversion_guest.hashed_first_name,
|
|
conversion_guest.hashed_last_name,
|
|
conversion_guest.hashed_email,
|
|
session,
|
|
)
|
|
|
|
if matched_hashed_customer:
|
|
guest_to_hashed_customer[guest_id] = matched_hashed_customer
|
|
_LOGGER.debug(
|
|
"Phase 3b: Matched guest %s to hashed_customer %d",
|
|
guest_id,
|
|
matched_hashed_customer.id,
|
|
)
|
|
else:
|
|
guest_to_hashed_customer[guest_id] = None
|
|
_LOGGER.debug("Phase 3b: No match found for guest %s", guest_id)
|
|
|
|
return guest_to_hashed_customer
|
|
|
|
async def _link_matched_guests_to_reservations(
|
|
self,
|
|
guest_to_hashed_customer: dict[str, HashedCustomer],
|
|
session: AsyncSession,
|
|
stats: dict[str, int],
|
|
) -> None:
|
|
"""Phase 3c: Link conversions from matched guests to reservations based on dates.
|
|
|
|
For each guest matched to a hashed_customer, find all conversions from that guest
|
|
that don't have a reservation yet, and try to link them to reservations based on
|
|
date matching.
|
|
|
|
This includes:
|
|
1. Conversions with no customer match (will link customer first)
|
|
2. Conversions already linked to a customer from a previous run (will try to link to reservation)
|
|
|
|
After all conversions for a guest are processed, check if the guest is a regular
|
|
by looking at whether they have paying conversions that predate any reservations.
|
|
|
|
Args:
|
|
guest_to_hashed_customer: Mapping from guest_id to matched HashedCustomer
|
|
session: AsyncSession for database queries
|
|
stats: Shared stats dictionary to update
|
|
|
|
"""
|
|
for guest_id, matched_hashed_customer in guest_to_hashed_customer.items():
|
|
if not matched_hashed_customer or not matched_hashed_customer.customer_id:
|
|
continue
|
|
|
|
# Find all conversions from this guest that don't have a reservation
|
|
# (whether or not they have a customer match - we might be re-running after new reservations added)
|
|
result = await session.execute(
|
|
select(Conversion)
|
|
.where(
|
|
(Conversion.guest_id == guest_id)
|
|
& (Conversion.reservation_id.is_(None))
|
|
)
|
|
.options(
|
|
selectinload(Conversion.conversion_rooms),
|
|
selectinload(Conversion.guest),
|
|
)
|
|
)
|
|
conversions = result.scalars().all()
|
|
|
|
if not conversions:
|
|
continue
|
|
|
|
_LOGGER.debug(
|
|
"Phase 3c: Processing %d conversions for guest %s (customer_id=%d)",
|
|
len(conversions),
|
|
guest_id,
|
|
matched_hashed_customer.customer_id,
|
|
)
|
|
|
|
# Try to link each conversion to a reservation for this customer
|
|
for conversion in conversions:
|
|
(
|
|
matched_reservation,
|
|
is_attributable,
|
|
) = await self._check_if_attributable(
|
|
matched_hashed_customer.customer_id, conversion, session
|
|
)
|
|
|
|
if matched_reservation and is_attributable:
|
|
# Only update stats if this is a NEW match (wasn't matched before)
|
|
was_previously_matched = conversion.customer_id is not None
|
|
|
|
conversion.reservation_id = matched_reservation.id
|
|
conversion.customer_id = matched_hashed_customer.customer_id
|
|
conversion.hashed_customer_id = matched_hashed_customer.id
|
|
conversion.directly_attributable = True
|
|
conversion.guest_matched = True
|
|
conversion.updated_at = datetime.now()
|
|
|
|
if not was_previously_matched:
|
|
stats["matched_to_reservation"] += 1
|
|
|
|
_LOGGER.info(
|
|
"Phase 3c: Linked conversion (pms_id=%s) to reservation %d via guest matching",
|
|
conversion.pms_reservation_id,
|
|
matched_reservation.id,
|
|
)
|
|
elif matched_hashed_customer and conversion.customer_id is None:
|
|
# Only count new customer matches (conversions that didn't have a customer before)
|
|
conversion.customer_id = matched_hashed_customer.customer_id
|
|
conversion.hashed_customer_id = matched_hashed_customer.id
|
|
conversion.directly_attributable = False
|
|
conversion.guest_matched = True
|
|
conversion.updated_at = datetime.now()
|
|
stats["matched_to_customer"] += 1
|
|
|
|
# After all conversions for this guest are processed, check if guest is regular
|
|
# Look at ALL conversions from this guest to see if there are pre-dated payments
|
|
if conversions and conversions[0].guest:
|
|
await self._check_if_guest_is_regular(
|
|
guest_id, matched_hashed_customer.customer_id, session
|
|
)
|
|
|
|
async def _check_regularity_for_all_matched_guests(
|
|
self, session: AsyncSession
|
|
) -> None:
|
|
"""Phase 3d: Check regularity for ALL matched guests (both ID-matched and guest-detail-matched).
|
|
|
|
This is called after all matching is complete to evaluate every guest that has been
|
|
matched to a customer, regardless of match type. This ensures consistent regularity
|
|
evaluation across all matched conversions.
|
|
|
|
This is run on ALL matched guests, not just newly matched ones, to ensure that if
|
|
the regularity logic changes, it gets re-applied to all guests on the next run.
|
|
This maintains idempotency of the matching process.
|
|
|
|
Args:
|
|
session: AsyncSession for database queries
|
|
|
|
"""
|
|
# Get all ConversionGuests that have ANY customer link
|
|
# This includes:
|
|
# 1. Guests matched via guest-details (hashed_customer_id is not null)
|
|
# 2. Guests matched via ID-based matching (customer_id is not null via conversion)
|
|
result = await session.execute(
|
|
select(ConversionGuest).where(
|
|
ConversionGuest.hashed_customer_id.isnot(None)
|
|
)
|
|
)
|
|
matched_guests = result.scalars().all()
|
|
|
|
if not matched_guests:
|
|
_LOGGER.debug("Phase 3d: No matched guests to check for regularity")
|
|
return
|
|
|
|
_LOGGER.debug(
|
|
"Phase 3d: Checking regularity for %d matched guests", len(matched_guests)
|
|
)
|
|
|
|
for conversion_guest in matched_guests:
|
|
if not conversion_guest.hashed_customer_id:
|
|
continue
|
|
|
|
# Get the customer ID from the hashed_customer
|
|
hashed_customer_result = await session.execute(
|
|
select(HashedCustomer).where(
|
|
HashedCustomer.id == conversion_guest.hashed_customer_id
|
|
)
|
|
)
|
|
hashed_customer = hashed_customer_result.scalar_one_or_none()
|
|
|
|
if hashed_customer and hashed_customer.customer_id:
|
|
await self._check_if_guest_is_regular(
|
|
conversion_guest.guest_id, hashed_customer.customer_id, session
|
|
)
|
|
|
|
async def _match_conversions_from_db_sequential(
|
|
self, pms_reservation_ids: list[str], stats: dict[str, int]
|
|
) -> None:
|
|
"""Phase 3a: Match conversions sequentially using database data only.
|
|
|
|
Performs ID-based matching for all conversions. Then extracts unique guests
|
|
and performs guest detail matching in phases 3b and 3c.
|
|
"""
|
|
semaphore = asyncio.Semaphore(1) # Process one at a time
|
|
async with asyncio.TaskGroup() as tg:
|
|
for pms_id in pms_reservation_ids:
|
|
tg.create_task(
|
|
self._match_conversion_from_db_safe(pms_id, semaphore, stats)
|
|
)
|
|
|
|
# After Phase 3a (ID-based matching), do Phase 3b and 3c (guest detail matching)
|
|
if self.session_maker:
|
|
session = await self.session_maker.create_session()
|
|
else:
|
|
session = self.session
|
|
|
|
try:
|
|
# Phase 3b: Extract and cache unique guests from unmatched conversions
|
|
guest_to_hashed_customer = await self._extract_unmatched_guests(session)
|
|
|
|
# Phase 3c: Link matched guests to reservations
|
|
if guest_to_hashed_customer:
|
|
await self._link_matched_guests_to_reservations(
|
|
guest_to_hashed_customer, session, stats
|
|
)
|
|
|
|
# Phase 3d: Check regularity for all matched guests (both ID and guest-detail matched)
|
|
await self._check_regularity_for_all_matched_guests(session)
|
|
|
|
await session.commit()
|
|
except Exception as e:
|
|
await session.rollback()
|
|
_LOGGER.exception("Error in Phase 3b/3c/3d guest matching: %s", e)
|
|
finally:
|
|
if self.session_maker:
|
|
await session.close()
|
|
|
|
async def _match_conversions_from_db_concurrent(
|
|
self, pms_reservation_ids: list[str], stats: dict[str, int]
|
|
) -> None:
|
|
"""Phase 3a: Match conversions concurrently using database data only.
|
|
|
|
Performs ID-based matching for all conversions concurrently. Then extracts unique guests
|
|
and performs guest detail matching sequentially in phases 3b and 3c.
|
|
|
|
Each concurrent task gets its own independent database session
|
|
from the SessionMaker.
|
|
"""
|
|
if not self.session_maker:
|
|
_LOGGER.error(
|
|
"Concurrent matching requested but SessionMaker not available. "
|
|
"Falling back to sequential matching."
|
|
)
|
|
await self._match_conversions_from_db_sequential(pms_reservation_ids, stats)
|
|
return
|
|
|
|
semaphore = asyncio.Semaphore(MAX_CONCURRENT_RESERVATIONS)
|
|
async with asyncio.TaskGroup() as tg:
|
|
for pms_id in pms_reservation_ids:
|
|
tg.create_task(
|
|
self._match_conversion_from_db_safe(pms_id, semaphore, stats)
|
|
)
|
|
|
|
# After Phase 3a (ID-based matching), do Phase 3b and 3c (guest detail matching)
|
|
# Use sequential processing for guest matching to avoid duplicating work
|
|
session = await self.session_maker.create_session()
|
|
|
|
try:
|
|
# Phase 3b: Extract and cache unique guests from unmatched conversions
|
|
guest_to_hashed_customer = await self._extract_unmatched_guests(session)
|
|
|
|
# Phase 3c: Link matched guests to reservations
|
|
if guest_to_hashed_customer:
|
|
await self._link_matched_guests_to_reservations(
|
|
guest_to_hashed_customer, session, stats
|
|
)
|
|
|
|
# Phase 3d: Check regularity for all matched guests (both ID and guest-detail matched)
|
|
await self._check_regularity_for_all_matched_guests(session)
|
|
|
|
await session.commit()
|
|
except Exception as e:
|
|
await session.rollback()
|
|
_LOGGER.exception("Error in Phase 3b/3c/3d guest matching: %s", e)
|
|
finally:
|
|
await session.close()
|
|
|
|
async def _match_conversion_from_db_safe(
|
|
self,
|
|
pms_reservation_id: int,
|
|
semaphore: asyncio.Semaphore,
|
|
stats: dict[str, int],
|
|
) -> None:
|
|
"""Phase 2: Safely match a conversion using only database data with transaction management.
|
|
|
|
In concurrent mode, creates its own session from SessionMaker.
|
|
In sequential mode, uses the shared session.
|
|
|
|
Args:
|
|
pms_reservation_id: PMS reservation ID to match
|
|
semaphore: Semaphore to limit concurrent operations
|
|
stats: Shared stats dictionary (thread-safe due to GIL)
|
|
|
|
"""
|
|
async with semaphore:
|
|
# In concurrent mode, create a new session for this task
|
|
if self.session_maker:
|
|
session = await self.session_maker.create_session()
|
|
else:
|
|
session = self.session
|
|
|
|
try:
|
|
# Phase 2: Match conversion using only database data
|
|
await self._match_conversion_using_db_data(
|
|
pms_reservation_id, session, stats
|
|
)
|
|
|
|
# Commit this task's transaction
|
|
await session.commit()
|
|
_LOGGER.debug(
|
|
"Successfully matched conversion for reservation %s",
|
|
pms_reservation_id,
|
|
)
|
|
|
|
except Exception as e:
|
|
# Rollback this task's transaction
|
|
await session.rollback()
|
|
_LOGGER.exception(
|
|
"Error matching conversion for reservation %s: %s",
|
|
pms_reservation_id,
|
|
e,
|
|
)
|
|
stats["errors"] += 1
|
|
finally:
|
|
# Close the session if it was created by SessionMaker
|
|
if self.session_maker:
|
|
await session.close()
|
|
|
|
async def _match_conversion_using_db_data(
|
|
self,
|
|
pms_reservation_id: int,
|
|
session: AsyncSession | None = None,
|
|
stats: dict[str, int] | None = None,
|
|
) -> None:
|
|
"""Phase 3a: Match a conversion using ID-based matching only.
|
|
|
|
This method reads both the conversion and conversion_guest from the database
|
|
and uses their stored hashed data to match to existing reservations via
|
|
advertising data (fbclid/gclid/md5_unique_id).
|
|
|
|
Guest detail matching is deferred to Phase 3b/3c to avoid reprocessing the same
|
|
guest multiple times.
|
|
|
|
Updates stats dictionary in-place if provided.
|
|
|
|
Args:
|
|
pms_reservation_id: PMS reservation ID to match
|
|
session: AsyncSession to use
|
|
stats: Shared stats dictionary to update (optional)
|
|
|
|
"""
|
|
if session is None:
|
|
session = self.session
|
|
|
|
if not self.hotel_id:
|
|
_LOGGER.error(
|
|
"Cannot match conversion: hotel_id not set in ConversionService"
|
|
)
|
|
return
|
|
|
|
# Get the conversion from the database with related data
|
|
result = await session.execute(
|
|
select(Conversion)
|
|
.where(
|
|
Conversion.hotel_id == self.hotel_id,
|
|
Conversion.pms_reservation_id == pms_reservation_id,
|
|
)
|
|
.options(
|
|
selectinload(Conversion.guest),
|
|
selectinload(Conversion.conversion_rooms),
|
|
)
|
|
)
|
|
conversion = result.scalar_one_or_none()
|
|
|
|
if not conversion:
|
|
# This should be extremely rare since we filtered in process_conversion_xml
|
|
_LOGGER.debug(
|
|
"Conversion not found for pms_reservation_id=%s (may have been deleted)",
|
|
pms_reservation_id,
|
|
)
|
|
return
|
|
|
|
# Get conversion_guest if it exists (has the hashed data)
|
|
conversion_guest = conversion.guest
|
|
|
|
# Extract hashed data from conversion_guest (already hashed)
|
|
hashed_first_name = None
|
|
hashed_last_name = None
|
|
hashed_email = None
|
|
|
|
if conversion_guest:
|
|
hashed_first_name = conversion_guest.hashed_first_name
|
|
hashed_last_name = conversion_guest.hashed_last_name
|
|
hashed_email = conversion_guest.hashed_email
|
|
|
|
# Phase 3a: Only try ID-based matching (fbclid/gclid/md5_unique_id)
|
|
# Guest detail matching is deferred to Phase 3b/3c
|
|
matched_reservation = None
|
|
matched_customer = None
|
|
matched_hashed_customer = None
|
|
|
|
if conversion.advertising_campagne:
|
|
matched_reservation = await self._match_by_advertising(
|
|
conversion.advertising_campagne,
|
|
conversion.hotel_id,
|
|
hashed_first_name,
|
|
hashed_last_name,
|
|
hashed_email,
|
|
conversion.advertising_partner,
|
|
session,
|
|
)
|
|
|
|
if matched_reservation:
|
|
matched_customer = matched_reservation.customer
|
|
if matched_customer and matched_customer.hashed_version:
|
|
matched_hashed_customer = matched_customer.hashed_version
|
|
|
|
_LOGGER.info(
|
|
"Phase 3a: Matched conversion by advertising ID (pms_id=%s, reservation_id=%d)",
|
|
pms_reservation_id,
|
|
matched_reservation.id,
|
|
)
|
|
|
|
# Update the conversion with matched entities if found
|
|
if matched_reservation or matched_customer or matched_hashed_customer:
|
|
conversion.reservation_id = (
|
|
matched_reservation.id if matched_reservation else None
|
|
)
|
|
conversion.customer_id = matched_customer.id if matched_customer else None
|
|
conversion.hashed_customer_id = (
|
|
matched_hashed_customer.id if matched_hashed_customer else None
|
|
)
|
|
|
|
# ID-based matches are always directly attributable
|
|
conversion.directly_attributable = True
|
|
conversion.guest_matched = False
|
|
|
|
# Update conversion_guest with hashed_customer reference if matched
|
|
if conversion_guest and matched_hashed_customer:
|
|
conversion_guest.hashed_customer_id = matched_hashed_customer.id
|
|
|
|
conversion.updated_at = datetime.now()
|
|
|
|
# Update stats if provided
|
|
if stats is not None:
|
|
if matched_reservation:
|
|
stats["matched_to_reservation"] += 1
|
|
elif matched_customer:
|
|
stats["matched_to_customer"] += 1
|
|
elif matched_hashed_customer:
|
|
stats["matched_to_hashed_customer"] += 1
|
|
else:
|
|
stats["unmatched"] += 1
|
|
|
|
async def _check_if_guest_is_regular(
|
|
self,
|
|
guest_id: str,
|
|
customer_id: int,
|
|
session: AsyncSession,
|
|
) -> None:
|
|
"""Check if a guest is a regular customer based on conversion and reservation history.
|
|
|
|
A guest is regular if they have conversions with paying bookings that predate their first
|
|
reservation sent by us. This indicates they were already a customer before we started tracking.
|
|
|
|
This check is done AFTER all conversions for a guest have been processed and matched,
|
|
so it can evaluate the complete picture of their payment history vs our reservation history.
|
|
|
|
Args:
|
|
guest_id: The guest ID to evaluate
|
|
customer_id: The matched customer ID
|
|
session: AsyncSession for database queries
|
|
|
|
"""
|
|
# Get the ConversionGuest record
|
|
guest_result = await session.execute(
|
|
select(ConversionGuest).where(ConversionGuest.guest_id == guest_id)
|
|
)
|
|
conversion_guest = guest_result.scalar_one_or_none()
|
|
|
|
if not conversion_guest:
|
|
return
|
|
|
|
# Find the earliest paying conversion for this guest (across all hotels)
|
|
# Look for conversions with actual revenue
|
|
earliest_paying_conversion_result = await session.execute(
|
|
select(Conversion)
|
|
.join(ConversionRoom, Conversion.id == ConversionRoom.conversion_id)
|
|
.where(
|
|
Conversion.guest_id == guest_id,
|
|
ConversionRoom.total_revenue.isnot(None),
|
|
ConversionRoom.total_revenue > Decimal(0),
|
|
)
|
|
.order_by(Conversion.reservation_date.asc())
|
|
.limit(1)
|
|
)
|
|
earliest_paying_conversion = (
|
|
earliest_paying_conversion_result.scalar_one_or_none()
|
|
)
|
|
|
|
if not earliest_paying_conversion:
|
|
# No paying conversions found for this guest
|
|
conversion_guest.is_regular = False
|
|
return
|
|
|
|
# Find the earliest reservation sent to this customer
|
|
earliest_reservation_result = await session.execute(
|
|
select(Reservation)
|
|
.where(Reservation.customer_id == customer_id)
|
|
.order_by(Reservation.start_date.asc())
|
|
.limit(1)
|
|
)
|
|
earliest_reservation = earliest_reservation_result.scalar_one_or_none()
|
|
|
|
if not earliest_reservation:
|
|
# No reservations for this customer yet, can't determine regularity
|
|
conversion_guest.is_regular = False
|
|
return
|
|
|
|
# Guest is regular if their earliest paying conversion predates our first reservation
|
|
# (meaning they were already a customer before we sent them a reservation)
|
|
# Compare against the reservation's creation date (when WE created/sent it), not check-in date
|
|
# Convert created_at to date for comparison with reservation_date (both are dates)
|
|
is_regular = (
|
|
earliest_paying_conversion.reservation_date
|
|
< earliest_reservation.created_at.date()
|
|
)
|
|
conversion_guest.is_regular = is_regular
|
|
|
|
if is_regular:
|
|
_LOGGER.info(
|
|
"Marking guest %s as regular: earliest paying conversion %s predates first reservation created at %s",
|
|
guest_id,
|
|
earliest_paying_conversion.reservation_date,
|
|
earliest_reservation.created_at,
|
|
)
|
|
else:
|
|
_LOGGER.debug(
|
|
"Guest %s is not regular: first paying conversion %s is after/equal to first reservation created at %s",
|
|
guest_id,
|
|
earliest_paying_conversion.reservation_date,
|
|
earliest_reservation.created_at,
|
|
)
|
|
|
|
async def _check_if_attributable(
|
|
self,
|
|
matched_customer_id: int,
|
|
conversion: Conversion,
|
|
session: AsyncSession,
|
|
exclude_reservation_id: int | None = None,
|
|
) -> tuple[Reservation | None, bool]:
|
|
"""Search for and check if a customer has an attributable reservation for this conversion.
|
|
|
|
Finds reservations linked to the customer and checks if any have dates matching this conversion.
|
|
A conversion is attributable ONLY if the conversion_room dates match a reservation's dates closely.
|
|
|
|
Args:
|
|
matched_customer_id: The customer ID to search reservations for
|
|
conversion: The Conversion record being evaluated
|
|
session: AsyncSession for database queries
|
|
exclude_reservation_id: Reservation ID to exclude from search (if already matched)
|
|
|
|
Returns:
|
|
Tuple of (matched_reservation, is_attributable) where:
|
|
- matched_reservation: The Reservation that matches (if any)
|
|
- is_attributable: True if the reservation's dates match this conversion
|
|
|
|
"""
|
|
# Check if conversion_room dates exist (criterion for attributability)
|
|
if not conversion.conversion_rooms:
|
|
return None, False
|
|
|
|
# Find reservations for this customer
|
|
query = select(Reservation).where(
|
|
Reservation.customer_id == matched_customer_id
|
|
)
|
|
if exclude_reservation_id:
|
|
query = query.where(Reservation.id != exclude_reservation_id)
|
|
|
|
db_result = await session.execute(query)
|
|
reservations = db_result.scalars().all()
|
|
|
|
if not reservations:
|
|
return None, False
|
|
|
|
# Check each reservation for date match
|
|
for reservation in reservations:
|
|
for room in conversion.conversion_rooms:
|
|
if (
|
|
room.arrival_date
|
|
and room.departure_date
|
|
and reservation.start_date
|
|
and reservation.end_date
|
|
):
|
|
# Check if dates match or mostly match (within 7 day tolerance)
|
|
arrival_match = (
|
|
abs((room.arrival_date - reservation.start_date).days) <= 7
|
|
)
|
|
departure_match = (
|
|
abs((room.departure_date - reservation.end_date).days) <= 7
|
|
)
|
|
|
|
if arrival_match and departure_match:
|
|
_LOGGER.info(
|
|
"Found attributable reservation for customer %d: "
|
|
"room dates %s-%s match reservation dates %s-%s",
|
|
matched_customer_id,
|
|
room.arrival_date,
|
|
room.departure_date,
|
|
reservation.start_date,
|
|
reservation.end_date,
|
|
)
|
|
return reservation, True
|
|
|
|
return None, False
|