2286 lines
88 KiB
Python
2286 lines
88 KiB
Python
"""Service for handling conversion data from hotel PMS XML files."""
|
|
|
|
import asyncio
|
|
import xml.etree.ElementTree as ET
|
|
from dataclasses import dataclass, field
|
|
from datetime import UTC, date, datetime
|
|
from decimal import Decimal
|
|
from typing import Any
|
|
|
|
from sqlalchemy import or_, select
|
|
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from .db import (
|
|
Conversion,
|
|
ConversionGuest,
|
|
ConversionRoom,
|
|
Customer,
|
|
Reservation,
|
|
SessionMaker,
|
|
)
|
|
from .logging_config import get_logger
|
|
from .schemas import ConversionData, ConversionGuestData
|
|
|
|
_LOGGER = get_logger(__name__)
|
|
|
|
# Limit concurrent reservation processing to avoid overwhelming the database
|
|
MAX_CONCURRENT_RESERVATIONS = 10
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class ParsedRoomReservation:
|
|
"""Typed representation of a single <roomReservation> entry."""
|
|
|
|
pms_hotel_reservation_id: str
|
|
room_number: str | None
|
|
arrival_date: date | None
|
|
departure_date: date | None
|
|
room_status: str | None
|
|
room_type: str | None
|
|
num_adults: int | None
|
|
rate_plan_code: str | None
|
|
connected_room_type: str | None
|
|
daily_sales: list[dict[str, str]] = field(default_factory=list)
|
|
total_revenue: Decimal | None = None
|
|
daily_sales_count: int = 0
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class ParsedReservationData:
|
|
"""Typed representation of reservation metadata and rooms."""
|
|
|
|
hotel_id: str | None
|
|
pms_reservation_id: int
|
|
guest_id: int | None
|
|
reservation_number: str | None
|
|
reservation_date: date | None
|
|
creation_time: datetime | None
|
|
reservation_type: str | None
|
|
booking_channel: str | None
|
|
advertising_medium: str | None
|
|
advertising_partner: str | None
|
|
advertising_campagne: str | None
|
|
room_reservations: list[ParsedRoomReservation] = field(default_factory=list)
|
|
|
|
|
|
class ConversionService:
|
|
"""Service for processing and storing conversion/daily sales data.
|
|
|
|
Supports two modes of operation:
|
|
1. Sequential mode: Single AsyncSession passed in, uses sequential processing
|
|
2. Concurrent mode: SessionMaker passed in, creates independent sessions per task
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
session: AsyncSession | SessionMaker | None = None,
|
|
hotel_id: str | None = None,
|
|
):
|
|
"""Initialize the ConversionService.
|
|
|
|
Args:
|
|
session: Can be either:
|
|
- AsyncSession: Single session for sequential processing
|
|
- SessionMaker: Factory for creating sessions in concurrent mode
|
|
- None: Not recommended, but allowed for subclassing
|
|
hotel_id: Hotel ID for this conversion service context (from authenticated user)
|
|
|
|
"""
|
|
self.session = None
|
|
self.session_maker = None
|
|
self.supports_concurrent = False
|
|
self.hotel_id = hotel_id
|
|
|
|
# Cache for reservation and customer data within a single XML processing run
|
|
# Maps hotel_code -> list of (reservation, customer) tuples
|
|
# This significantly speeds up matching when processing large XML files
|
|
# Uses hashed data for matching to preserve privacy
|
|
self._reservation_cache: dict[
|
|
str | None, list[tuple[Reservation, Customer | None]]
|
|
] = {}
|
|
self._cache_initialized = False
|
|
|
|
if isinstance(session, SessionMaker):
|
|
self.session_maker = session
|
|
self.supports_concurrent = True
|
|
_LOGGER.debug(
|
|
"ConversionService initialized in concurrent mode with SessionMaker"
|
|
)
|
|
elif isinstance(session, AsyncSession):
|
|
self.session = session
|
|
self.supports_concurrent = False
|
|
_LOGGER.debug(
|
|
"ConversionService initialized in sequential mode with single session"
|
|
)
|
|
elif session is not None:
|
|
raise TypeError(
|
|
f"session must be AsyncSession or SessionMaker, got {type(session)}"
|
|
)
|
|
|
|
@staticmethod
|
|
def _parse_required_int(value: str | None, field_name: str) -> int:
|
|
"""Parse an integer attribute that must be present."""
|
|
if value in (None, ""):
|
|
raise ValueError(f"{field_name} is required")
|
|
|
|
try:
|
|
return int(value)
|
|
except (TypeError, ValueError) as exc:
|
|
raise ValueError(
|
|
f"{field_name} must be an integer (value={value})"
|
|
) from exc
|
|
|
|
@staticmethod
|
|
def _parse_optional_int(value: str | None, field_name: str) -> int | None:
|
|
"""Parse an optional integer attribute, logging on failure."""
|
|
if value in (None, ""):
|
|
return None
|
|
|
|
try:
|
|
return int(value)
|
|
except (TypeError, ValueError):
|
|
_LOGGER.warning("Invalid %s value: %s", field_name, value)
|
|
return None
|
|
|
|
@staticmethod
|
|
def _parse_date(value: str | None, field_name: str) -> date | None:
|
|
"""Parse a YYYY-MM-DD formatted date string."""
|
|
if not value:
|
|
return None
|
|
|
|
try:
|
|
return datetime.strptime(value, "%Y-%m-%d").date()
|
|
except ValueError:
|
|
_LOGGER.warning("Invalid %s format: %s", field_name, value)
|
|
return None
|
|
|
|
@staticmethod
|
|
def _parse_datetime(value: str | None, field_name: str) -> datetime | None:
|
|
"""Parse an ISO timestamp string."""
|
|
if not value:
|
|
return None
|
|
|
|
try:
|
|
normalized = value.replace("Z", "+00:00")
|
|
return datetime.fromisoformat(normalized)
|
|
except ValueError:
|
|
_LOGGER.warning("Invalid %s format: %s", field_name, value)
|
|
return None
|
|
|
|
def _parse_daily_sales(
|
|
self, daily_sales_elem: ET.Element | None
|
|
) -> tuple[list[dict[str, str]], Decimal | None, int]:
|
|
"""Extract the list of sale dictionaries and aggregate revenue information."""
|
|
if daily_sales_elem is None:
|
|
return [], None, 0
|
|
|
|
daily_sales_list: list[dict[str, str]] = []
|
|
total_revenue = Decimal(0)
|
|
sale_count = 0
|
|
|
|
for daily_sale in daily_sales_elem.findall("dailySale"):
|
|
sale_count += 1
|
|
sale_data: dict[str, str] = {}
|
|
|
|
sale_date_str = daily_sale.get("date")
|
|
if sale_date_str:
|
|
sale_data["date"] = sale_date_str
|
|
|
|
revenue_total_str = daily_sale.get("revenueTotal")
|
|
if revenue_total_str:
|
|
sale_data["revenueTotal"] = revenue_total_str
|
|
try:
|
|
total_revenue += Decimal(revenue_total_str)
|
|
except (ValueError, TypeError):
|
|
_LOGGER.warning("Invalid revenueTotal value: %s", revenue_total_str)
|
|
|
|
# Copy the remaining optional revenue buckets if present
|
|
for field_name in (
|
|
"revenueLogis",
|
|
"revenueBoard",
|
|
"revenueFB",
|
|
"revenueSpa",
|
|
"revenueOther",
|
|
):
|
|
value = daily_sale.get(field_name)
|
|
if value:
|
|
sale_data[field_name] = value
|
|
|
|
if sale_data:
|
|
daily_sales_list.append(sale_data)
|
|
|
|
total_revenue_value = total_revenue if total_revenue > 0 else None
|
|
return daily_sales_list, total_revenue_value, sale_count
|
|
|
|
def _parse_room_reservation(
|
|
self, room_elem: ET.Element, pms_reservation_id: int, room_index: int
|
|
) -> ParsedRoomReservation:
|
|
"""Convert a <roomReservation> element into ParsedRoomReservation."""
|
|
arrival_date = self._parse_date(room_elem.get("arrival"), "arrival date")
|
|
departure_date = self._parse_date(room_elem.get("departure"), "departure date")
|
|
num_adults = self._parse_optional_int(room_elem.get("adults"), "adults")
|
|
room_number = room_elem.get("roomNumber")
|
|
if room_number is None:
|
|
_LOGGER.debug(
|
|
"Room reservation %s #%d has no roomNumber",
|
|
pms_reservation_id,
|
|
room_index,
|
|
)
|
|
|
|
daily_sales, total_revenue, sale_count = self._parse_daily_sales(
|
|
room_elem.find("dailySales")
|
|
)
|
|
|
|
return ParsedRoomReservation(
|
|
pms_hotel_reservation_id=f"{pms_reservation_id}_{room_number}",
|
|
room_number=room_number,
|
|
arrival_date=arrival_date,
|
|
departure_date=departure_date,
|
|
room_status=room_elem.get("status"),
|
|
room_type=room_elem.get("roomType"),
|
|
num_adults=num_adults,
|
|
rate_plan_code=room_elem.get("ratePlanCode"),
|
|
connected_room_type=room_elem.get("connectedRoomType"),
|
|
daily_sales=daily_sales,
|
|
total_revenue=total_revenue,
|
|
daily_sales_count=sale_count,
|
|
)
|
|
|
|
def _parse_reservation_element(
|
|
self, reservation_elem: ET.Element
|
|
) -> ParsedReservationData | None:
|
|
"""Convert a <reservation> element into a structured representation."""
|
|
try:
|
|
pms_reservation_id = self._parse_required_int(
|
|
reservation_elem.get("id"), "reservation id"
|
|
)
|
|
except ValueError as exc:
|
|
_LOGGER.error(
|
|
"Invalid reservation metadata in reservation element: %s", exc
|
|
)
|
|
return None
|
|
|
|
room_reservations_elem = reservation_elem.find("roomReservations")
|
|
if room_reservations_elem is None:
|
|
_LOGGER.debug(
|
|
"No roomReservations found for reservation %s", pms_reservation_id
|
|
)
|
|
return None
|
|
|
|
room_reservations = [
|
|
self._parse_room_reservation(room_elem, pms_reservation_id, idx)
|
|
for idx, room_elem in enumerate(
|
|
room_reservations_elem.findall("roomReservation")
|
|
)
|
|
]
|
|
|
|
if not room_reservations:
|
|
_LOGGER.debug(
|
|
"Reservation %s has no roomReservation entries", pms_reservation_id
|
|
)
|
|
return None
|
|
|
|
guest_elem = reservation_elem.find("guest")
|
|
guest_id = None
|
|
if guest_elem is not None:
|
|
guest_id = self._parse_optional_int(guest_elem.get("id"), "guest id")
|
|
|
|
return ParsedReservationData(
|
|
hotel_id=reservation_elem.get("hotelID"),
|
|
pms_reservation_id=pms_reservation_id,
|
|
guest_id=guest_id,
|
|
reservation_number=reservation_elem.get("number"),
|
|
reservation_date=self._parse_date(
|
|
reservation_elem.get("date"), "reservation date"
|
|
),
|
|
creation_time=self._parse_datetime(
|
|
reservation_elem.get("creationTime"), "creation time"
|
|
),
|
|
reservation_type=reservation_elem.get("type"),
|
|
booking_channel=reservation_elem.get("bookingChannel"),
|
|
advertising_medium=reservation_elem.get("advertisingMedium"),
|
|
advertising_partner=reservation_elem.get("advertisingPartner"),
|
|
advertising_campagne=reservation_elem.get("advertisingCampagne"),
|
|
room_reservations=room_reservations,
|
|
)
|
|
|
|
async def _extract_unique_guests_from_xml(
|
|
self, reservations: list
|
|
) -> dict[tuple[str, int], ConversionGuestData]:
|
|
"""Extract and deduplicate all guest data from XML reservations.
|
|
|
|
Phase 0: Single pass through XML to collect all unique guests.
|
|
Uses (hotel_id, guest_id) as the key for deduplication.
|
|
Validates each guest using Pydantic before storing.
|
|
|
|
Args:
|
|
reservations: List of XML reservation elements
|
|
|
|
Returns:
|
|
Dictionary mapping (hotel_id, guest_id) to validated ConversionGuestData
|
|
|
|
"""
|
|
guest_data_by_key = {}
|
|
now = datetime.now(UTC)
|
|
|
|
for reservation_elem in reservations:
|
|
hotel_id = reservation_elem.get("hotelID")
|
|
guest_elem = reservation_elem.find("guest")
|
|
|
|
if guest_elem is None:
|
|
_LOGGER.debug(
|
|
"No guest element found, skipping reservation %s (will be created with guest_id=None in Phase 2)",
|
|
reservation_elem.get("id"),
|
|
)
|
|
continue
|
|
|
|
guest_id = guest_elem.get("id")
|
|
guest_first_name = guest_elem.get("firstName")
|
|
guest_last_name = guest_elem.get("lastName")
|
|
guest_email = guest_elem.get("email")
|
|
guest_country_code = guest_elem.get("countryCode")
|
|
guest_birth_date_str = guest_elem.get("dateOfBirth")
|
|
|
|
guest_birth_date = None
|
|
if guest_birth_date_str:
|
|
try:
|
|
guest_birth_date = datetime.strptime(
|
|
guest_birth_date_str, "%Y-%m-%d"
|
|
).date()
|
|
except ValueError:
|
|
_LOGGER.warning(
|
|
"Invalid birth date format: %s", guest_birth_date_str
|
|
)
|
|
|
|
# Validate guest data with Pydantic during extraction
|
|
try:
|
|
validated_guest = ConversionGuestData(
|
|
hotel_id=hotel_id,
|
|
guest_id=guest_id, # Will be validated and converted to int
|
|
guest_first_name=guest_first_name,
|
|
guest_last_name=guest_last_name,
|
|
guest_email=guest_email,
|
|
guest_country_code=guest_country_code,
|
|
guest_birth_date=guest_birth_date,
|
|
first_seen=now,
|
|
last_seen=now,
|
|
)
|
|
|
|
# Use validated guest_id (now an int) for the key
|
|
key = (hotel_id, validated_guest.guest_id)
|
|
|
|
# Store validated guest data (will keep the last occurrence from XML)
|
|
guest_data_by_key[key] = validated_guest
|
|
|
|
except ValueError:
|
|
_LOGGER.exception(
|
|
"Failed to validate guest data for reservation %s",
|
|
reservation_elem.get("id"),
|
|
)
|
|
continue
|
|
|
|
return guest_data_by_key
|
|
|
|
async def _bulk_upsert_guests(
|
|
self,
|
|
session: AsyncSession,
|
|
guest_data_by_key: dict[tuple[str, int], ConversionGuestData],
|
|
) -> None:
|
|
"""Bulk upsert all unique guests to database using PostgreSQL ON CONFLICT.
|
|
|
|
Phase 1: Atomic upsert of all guests extracted in Phase 0.
|
|
Uses ON CONFLICT DO UPDATE to handle duplicates safely without race conditions.
|
|
Processes in batches to avoid overwhelming the database with large XML files.
|
|
|
|
Args:
|
|
session: AsyncSession to use
|
|
guest_data_by_key: Dictionary mapping (hotel_id, guest_id) to
|
|
validated ConversionGuestData
|
|
|
|
"""
|
|
if not guest_data_by_key:
|
|
return
|
|
|
|
# Process in batches to avoid overwhelming the database
|
|
batch_size = 1000
|
|
items = list(guest_data_by_key.items())
|
|
|
|
for batch_start in range(0, len(items), batch_size):
|
|
batch_end = min(batch_start + batch_size, len(items))
|
|
batch_items = items[batch_start:batch_end]
|
|
|
|
# Prepare list of values for this batch (already validated)
|
|
values_list = []
|
|
for (hotel_id, guest_id), validated_guest in batch_items:
|
|
# Convert validated Pydantic model to dict for insertion
|
|
# (all validations and hash calculations are already done)
|
|
values_list.append(validated_guest.model_dump())
|
|
|
|
# Use PostgreSQL ON CONFLICT DO UPDATE for atomic upsert
|
|
stmt = pg_insert(ConversionGuest).values(values_list)
|
|
stmt = stmt.on_conflict_do_update(
|
|
index_elements=["hotel_id", "guest_id"],
|
|
set_={
|
|
"guest_first_name": stmt.excluded.guest_first_name,
|
|
"guest_last_name": stmt.excluded.guest_last_name,
|
|
"guest_email": stmt.excluded.guest_email,
|
|
"guest_country_code": stmt.excluded.guest_country_code,
|
|
"guest_birth_date": stmt.excluded.guest_birth_date,
|
|
"hashed_first_name": stmt.excluded.hashed_first_name,
|
|
"hashed_last_name": stmt.excluded.hashed_last_name,
|
|
"hashed_email": stmt.excluded.hashed_email,
|
|
"hashed_country_code": stmt.excluded.hashed_country_code,
|
|
"hashed_birth_date": stmt.excluded.hashed_birth_date,
|
|
"last_seen": stmt.excluded.last_seen,
|
|
},
|
|
)
|
|
|
|
await session.execute(stmt)
|
|
|
|
_LOGGER.debug(
|
|
"Phase 1: Upserted batch %d-%d of %d guests",
|
|
batch_start + 1,
|
|
batch_end,
|
|
len(items),
|
|
)
|
|
|
|
async def process_conversion_xml(self, xml_content: str) -> dict[str, Any]:
|
|
"""Parse conversion XML and save daily sales data to database.
|
|
|
|
Args:
|
|
xml_content: XML string containing reservation and daily sales data
|
|
|
|
Returns:
|
|
Dictionary with processing statistics
|
|
|
|
"""
|
|
try:
|
|
root = ET.fromstring(xml_content)
|
|
except ET.ParseError as e:
|
|
_LOGGER.error("Failed to parse conversion XML: %s", e)
|
|
raise ValueError(f"Invalid XML content: {e}") from e
|
|
|
|
# Initialize cache for this XML processing run
|
|
await self._load_reservation_cache()
|
|
|
|
stats = {
|
|
"total_reservations": 0,
|
|
"deleted_reservations": 0,
|
|
"total_daily_sales": 0,
|
|
"matched_to_reservation": 0,
|
|
"matched_to_customer": 0,
|
|
"unmatched": 0,
|
|
"errors": 0,
|
|
}
|
|
|
|
# Get a session for deleted reservations processing
|
|
if self.session_maker:
|
|
session = await self.session_maker.create_session()
|
|
else:
|
|
session = self.session
|
|
|
|
# Process deleted reservations
|
|
for deleted_res in root.findall("Deletedreservation"):
|
|
stats["deleted_reservations"] += 1
|
|
pms_reservation_id_str = deleted_res.get("ID")
|
|
try:
|
|
pms_reservation_id = (
|
|
int(pms_reservation_id_str) if pms_reservation_id_str else None
|
|
)
|
|
if pms_reservation_id is None:
|
|
_LOGGER.warning(
|
|
"Deleted reservation missing ID attribute, skipping"
|
|
)
|
|
continue
|
|
await self._handle_deleted_reservation(pms_reservation_id, session)
|
|
await session.commit()
|
|
except Exception as e:
|
|
await session.rollback()
|
|
_LOGGER.exception(
|
|
"Error deleting reservation %s: %s",
|
|
pms_reservation_id,
|
|
e,
|
|
)
|
|
stats["errors"] += 1
|
|
|
|
# Close session if created by SessionMaker
|
|
if self.session_maker:
|
|
await session.close()
|
|
|
|
# Process active reservations in four phases:
|
|
# Phase 0: Extract and deduplicate all guest data from XML
|
|
# Phase 1: Bulk upsert all unique guests to DB (no race conditions)
|
|
# Phase 2: Create/update all conversion records (ConversionGuests already exist)
|
|
# Phase 3: Match them to existing reservations/customers
|
|
reservations = root.findall("reservation")
|
|
stats["total_reservations"] = len(reservations)
|
|
|
|
if not reservations:
|
|
return stats
|
|
|
|
_LOGGER.info("Processing %d reservations in xml", len(reservations))
|
|
|
|
# Phase 0: Extract and deduplicate all guest data from XML
|
|
_LOGGER.debug("Phase 0: Extracting and deduplicating guest data from XML")
|
|
guest_data_by_key = await self._extract_unique_guests_from_xml(reservations)
|
|
_LOGGER.info(
|
|
"Phase 0: Extracted %d unique guests from %d reservations",
|
|
len(guest_data_by_key),
|
|
len(reservations),
|
|
)
|
|
|
|
# Phase 1: Bulk upsert all unique guests to database
|
|
if guest_data_by_key:
|
|
_LOGGER.debug(
|
|
"Phase 1: Bulk upserting %d unique guests to database",
|
|
len(guest_data_by_key),
|
|
)
|
|
if self.session_maker:
|
|
session = await self.session_maker.create_session()
|
|
else:
|
|
session = self.session
|
|
|
|
try:
|
|
await self._bulk_upsert_guests(session, guest_data_by_key)
|
|
await session.commit()
|
|
_LOGGER.info(
|
|
"Phase 1: Successfully upserted %d guests", len(guest_data_by_key)
|
|
)
|
|
except Exception as e:
|
|
await session.rollback()
|
|
_LOGGER.exception("Phase 1: Error during bulk guest upsert: %s", e)
|
|
stats["errors"] += len(guest_data_by_key)
|
|
return stats
|
|
finally:
|
|
if self.session_maker:
|
|
await session.close()
|
|
|
|
# Phase 2: Create/update all conversions (no matching, ConversionGuests already exist)
|
|
# Returns list of successfully created pms_reservation_ids
|
|
_LOGGER.debug("Phase 2: Creating/updating conversions")
|
|
if self.supports_concurrent:
|
|
pms_reservation_ids = await self._process_reservations_concurrent(
|
|
reservations, stats
|
|
)
|
|
else:
|
|
pms_reservation_ids = await self._process_reservations_sequential(
|
|
reservations, stats
|
|
)
|
|
|
|
_LOGGER.debug(
|
|
"Phase 3: Found %d successfully created conversions out of %d total reservations",
|
|
len(pms_reservation_ids),
|
|
len(reservations),
|
|
)
|
|
|
|
# Phase 3: Match all conversions using database data only
|
|
# Only match conversions that actually exist in the database
|
|
# No XML parsing, no re-hashing - complete separation of concerns
|
|
# This also enables matching historical data that wasn't just created
|
|
if pms_reservation_ids:
|
|
_LOGGER.debug("Phase 3: Matching conversions to reservations/customers")
|
|
if self.supports_concurrent:
|
|
await self._match_conversions_from_db_concurrent(
|
|
pms_reservation_ids, stats
|
|
)
|
|
else:
|
|
await self._match_conversions_from_db_sequential(
|
|
pms_reservation_ids, stats
|
|
)
|
|
|
|
return stats
|
|
|
|
async def _load_reservation_cache(self) -> None:
|
|
"""Load all reservations and hashed customers into cache for fast matching.
|
|
|
|
This method is called once at the start of processing a large XML file.
|
|
It loads all reservations with their associated hashed customers into an in-memory
|
|
cache organized by hotel_code. This avoids repeated database queries during
|
|
matching operations and uses hashed data for privacy-preserving matching.
|
|
|
|
The cache structure:
|
|
- Key: hotel_code (str or None)
|
|
- Value: List of (reservation, hashed_customer) tuples
|
|
|
|
This is especially beneficial for large XML files with many reservations
|
|
where matching criteria is the same across multiple reservations.
|
|
"""
|
|
if self._cache_initialized:
|
|
_LOGGER.debug("Reservation cache already initialized, skipping reload")
|
|
return
|
|
|
|
# Get a session for loading the cache
|
|
if self.session_maker:
|
|
session = await self.session_maker.create_session()
|
|
close_session = True
|
|
else:
|
|
session = self.session
|
|
close_session = False
|
|
|
|
if not session:
|
|
_LOGGER.warning("No session available for cache loading")
|
|
return
|
|
|
|
try:
|
|
# Load all reservations with their hashed customers in one query
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
query = select(Reservation).options(
|
|
selectinload(Reservation.customer),
|
|
|
|
)
|
|
|
|
query = query.filter(Reservation.hotel_id == self.hotel_id) if self.hotel_id else query
|
|
result = await session.execute(query)
|
|
reservations = result.scalars().all()
|
|
|
|
_LOGGER.info("Loaded %d reservations into cache", len(reservations))
|
|
|
|
# Organize by hotel_code for efficient lookup
|
|
for reservation in reservations:
|
|
hotel_code = reservation.hotel_id
|
|
if hotel_code not in self._reservation_cache:
|
|
self._reservation_cache[hotel_code] = []
|
|
# Cache the hashed customer - prefer direct relationship, fall back to customer relationship
|
|
customer = None
|
|
if reservation.customer:
|
|
customer = reservation.customer
|
|
self._reservation_cache[hotel_code].append(
|
|
(reservation, customer)
|
|
)
|
|
|
|
self._cache_initialized = True
|
|
_LOGGER.info(
|
|
"Reservation cache initialized with %d hotel codes",
|
|
len(self._reservation_cache),
|
|
)
|
|
except Exception as e:
|
|
_LOGGER.error("Failed to load reservation cache: %s", e)
|
|
# Cache remains empty, fall back to direct queries
|
|
self._cache_initialized = True
|
|
finally:
|
|
# Close session if we created it
|
|
if close_session:
|
|
await session.close()
|
|
|
|
async def _process_reservations_sequential(
|
|
self, reservations: list, stats: dict[str, int]
|
|
) -> list[str]:
|
|
"""Process reservations one at a time (original behavior).
|
|
|
|
Returns:
|
|
List of pms_reservation_ids that were successfully created/updated
|
|
|
|
"""
|
|
semaphore = asyncio.Semaphore(1) # Process one at a time
|
|
results = []
|
|
tasks = []
|
|
|
|
try:
|
|
async with asyncio.TaskGroup() as tg:
|
|
for reservation in reservations:
|
|
task = tg.create_task(
|
|
self._process_reservation_safe(reservation, semaphore, stats)
|
|
)
|
|
tasks.append(task)
|
|
except ExceptionGroup as eg:
|
|
# Fail fast: cancel all remaining tasks and re-raise the first exception
|
|
for task in tasks:
|
|
if not task.done():
|
|
task.cancel()
|
|
# Re-raise the first exception from the group
|
|
if eg.exceptions:
|
|
raise eg.exceptions[0] from eg
|
|
raise
|
|
|
|
# Collect results from all tasks
|
|
for task in tasks:
|
|
pms_id = task.result()
|
|
if pms_id is not None:
|
|
results.append(pms_id)
|
|
|
|
return results
|
|
|
|
async def _process_reservations_concurrent(
|
|
self, reservations: list, stats: dict[str, int]
|
|
) -> list[str]:
|
|
"""Process reservations concurrently with semaphore limiting.
|
|
|
|
Each concurrent task gets its own independent database session
|
|
from the SessionMaker.
|
|
|
|
Returns:
|
|
List of pms_reservation_ids that were successfully created/updated
|
|
|
|
"""
|
|
if not self.session_maker:
|
|
_LOGGER.error(
|
|
"Concurrent processing requested but SessionMaker not available. "
|
|
"Falling back to sequential processing."
|
|
)
|
|
return await self._process_reservations_sequential(reservations, stats)
|
|
|
|
semaphore = asyncio.Semaphore(MAX_CONCURRENT_RESERVATIONS)
|
|
results = []
|
|
tasks = []
|
|
|
|
try:
|
|
async with asyncio.TaskGroup() as tg:
|
|
for reservation in reservations:
|
|
task = tg.create_task(
|
|
self._process_reservation_safe(reservation, semaphore, stats)
|
|
)
|
|
tasks.append(task)
|
|
except ExceptionGroup as eg:
|
|
# Fail fast: cancel all remaining tasks and re-raise the first exception
|
|
for task in tasks:
|
|
if not task.done():
|
|
task.cancel()
|
|
# Re-raise the first exception from the group
|
|
if eg.exceptions:
|
|
raise eg.exceptions[0] from eg
|
|
raise
|
|
|
|
# Collect results from all tasks
|
|
for task in tasks:
|
|
pms_id = task.result()
|
|
if pms_id is not None:
|
|
results.append(pms_id)
|
|
|
|
return results
|
|
|
|
async def _process_reservation_safe(
|
|
self,
|
|
reservation_elem: Any,
|
|
semaphore: asyncio.Semaphore,
|
|
stats: dict[str, int],
|
|
) -> str | None:
|
|
"""Safely process a single reservation with semaphore and transaction management.
|
|
|
|
Phase 1: Creation/update only (no matching).
|
|
In concurrent mode, creates its own session from SessionMaker.
|
|
In sequential mode, uses the shared session.
|
|
|
|
Args:
|
|
reservation_elem: XML element for the reservation
|
|
semaphore: Semaphore to limit concurrent operations
|
|
stats: Shared stats dictionary (thread-safe due to GIL)
|
|
|
|
Returns:
|
|
pms_reservation_id if successfully created/updated, None if error occurred
|
|
|
|
"""
|
|
pms_reservation_id = int(reservation_elem.get("id"))
|
|
|
|
async with semaphore:
|
|
# In concurrent mode, create a new session for this task
|
|
if self.session_maker:
|
|
session = await self.session_maker.create_session()
|
|
else:
|
|
session = self.session
|
|
|
|
try:
|
|
# Phase 1: Create/update conversion record (no matching)
|
|
reservation_stats = await self._create_or_update_conversion(
|
|
reservation_elem, session
|
|
)
|
|
stats["total_daily_sales"] += reservation_stats["daily_sales_count"]
|
|
|
|
# Commit this task's transaction
|
|
await session.commit()
|
|
_LOGGER.debug(
|
|
"Successfully created/updated conversion for reservation %s",
|
|
pms_reservation_id,
|
|
)
|
|
return pms_reservation_id
|
|
|
|
except Exception as e:
|
|
# Rollback this task's transaction
|
|
await session.rollback()
|
|
_LOGGER.exception(
|
|
"Error processing reservation %s: %s",
|
|
pms_reservation_id,
|
|
e,
|
|
)
|
|
stats["errors"] += 1
|
|
return None
|
|
finally:
|
|
# Close the session if it was created by SessionMaker
|
|
if self.session_maker:
|
|
await session.close()
|
|
|
|
async def _handle_deleted_reservation(
|
|
self, pms_reservation_id: int, session: AsyncSession
|
|
):
|
|
"""Handle deleted reservation by marking conversions as deleted or removing them.
|
|
|
|
Args:
|
|
pms_reservation_id: PMS reservation ID to delete
|
|
session: AsyncSession to use for the operation
|
|
|
|
"""
|
|
if not self.hotel_id:
|
|
_LOGGER.error(
|
|
"Cannot delete reservation: hotel_id not set in ConversionService"
|
|
)
|
|
return
|
|
|
|
_LOGGER.info(
|
|
"Processing deleted reservation: Hotel %s, PMS ID %s",
|
|
self.hotel_id,
|
|
pms_reservation_id,
|
|
)
|
|
|
|
# Delete conversion records for this hotel + pms_reservation_id
|
|
result = await session.execute(
|
|
select(Conversion).where(
|
|
Conversion.hotel_id == self.hotel_id,
|
|
Conversion.pms_reservation_id == pms_reservation_id,
|
|
)
|
|
)
|
|
conversions = result.scalars().all()
|
|
|
|
for conversion in conversions:
|
|
await session.delete(conversion)
|
|
|
|
if conversions:
|
|
_LOGGER.info(
|
|
"Deleted %d conversion records for hotel %s, PMS reservation %s",
|
|
len(conversions),
|
|
self.hotel_id,
|
|
pms_reservation_id,
|
|
)
|
|
|
|
async def _create_or_update_conversion(
|
|
self, reservation_elem: ET.Element, session: AsyncSession | None = None
|
|
) -> dict[str, int]:
|
|
"""Create or update a conversion record from XML (Phase 1: no matching).
|
|
|
|
Args:
|
|
reservation_elem: XML element to process
|
|
session: AsyncSession to use. If None, uses self.session.
|
|
In concurrent mode, each task passes its own session.
|
|
|
|
Returns statistics about daily sales processed.
|
|
|
|
"""
|
|
if session is None:
|
|
session = self.session
|
|
stats = {
|
|
"daily_sales_count": 0,
|
|
}
|
|
parsed_reservation = self._parse_reservation_element(reservation_elem)
|
|
if not parsed_reservation:
|
|
return stats
|
|
|
|
hotel_id = parsed_reservation.hotel_id
|
|
pms_reservation_id = parsed_reservation.pms_reservation_id
|
|
|
|
# ConversionGuests have already been bulk-upserted in Phase 1,
|
|
# so we can safely create/update conversions now
|
|
# Check if conversion already exists (upsert logic)
|
|
existing_result = await session.execute(
|
|
select(Conversion).where(
|
|
Conversion.hotel_id == hotel_id,
|
|
Conversion.pms_reservation_id == pms_reservation_id,
|
|
)
|
|
)
|
|
existing_conversion = existing_result.scalar_one_or_none()
|
|
|
|
if existing_conversion:
|
|
# Update existing conversion - only update reservation metadata and advertising data
|
|
# Guest info is stored in ConversionGuest table, not here
|
|
# Don't clear reservation/customer links (matching logic will update if needed)
|
|
existing_conversion.reservation_number = (
|
|
parsed_reservation.reservation_number
|
|
)
|
|
existing_conversion.reservation_date = parsed_reservation.reservation_date
|
|
existing_conversion.creation_time = parsed_reservation.creation_time
|
|
existing_conversion.reservation_type = parsed_reservation.reservation_type
|
|
existing_conversion.booking_channel = parsed_reservation.booking_channel
|
|
existing_conversion.advertising_medium = (
|
|
parsed_reservation.advertising_medium
|
|
)
|
|
existing_conversion.advertising_partner = (
|
|
parsed_reservation.advertising_partner
|
|
)
|
|
existing_conversion.advertising_campagne = (
|
|
parsed_reservation.advertising_campagne
|
|
)
|
|
existing_conversion.updated_at = datetime.now()
|
|
conversion = existing_conversion
|
|
_LOGGER.info(
|
|
"Updated conversion %s (pms_id=%s)",
|
|
conversion.id,
|
|
pms_reservation_id,
|
|
)
|
|
else:
|
|
# Create new conversion entry (without matching - will be done later)
|
|
# Note: Guest information (first_name, last_name, email, etc) is stored in ConversionGuest table
|
|
conversion_data = ConversionData(
|
|
# Links to existing entities (nullable, will be filled in after matching)
|
|
# Reservation metadata
|
|
hotel_id=hotel_id,
|
|
guest_id=parsed_reservation.guest_id, # Links to ConversionGuest
|
|
pms_reservation_id=pms_reservation_id,
|
|
reservation_number=parsed_reservation.reservation_number,
|
|
reservation_date=parsed_reservation.reservation_date,
|
|
creation_time=parsed_reservation.creation_time,
|
|
reservation_type=parsed_reservation.reservation_type,
|
|
booking_channel=parsed_reservation.booking_channel,
|
|
# Advertising data
|
|
advertising_medium=parsed_reservation.advertising_medium,
|
|
advertising_partner=parsed_reservation.advertising_partner,
|
|
advertising_campagne=parsed_reservation.advertising_campagne,
|
|
# Metadata
|
|
)
|
|
conversion = Conversion(**conversion_data.model_dump())
|
|
session.add(conversion)
|
|
_LOGGER.debug(
|
|
"Created conversion (pms_id=%s)",
|
|
pms_reservation_id,
|
|
)
|
|
|
|
# Flush to ensure conversion has an ID before creating room reservations
|
|
await session.flush()
|
|
|
|
# Fetch ALL existing rooms for this conversion (not just the ones in current XML)
|
|
existing_rooms_result = await session.execute(
|
|
select(ConversionRoom).where(ConversionRoom.conversion_id == conversion.id)
|
|
)
|
|
existing_rooms = {
|
|
room.pms_hotel_reservation_id: room
|
|
for room in existing_rooms_result.scalars().all()
|
|
}
|
|
|
|
# Track which room IDs are present in the current XML
|
|
current_pms_hotel_reservation_ids = set()
|
|
|
|
# Process room reservations
|
|
for room_reservation in parsed_reservation.room_reservations:
|
|
current_pms_hotel_reservation_ids.add(
|
|
room_reservation.pms_hotel_reservation_id
|
|
)
|
|
stats["daily_sales_count"] += room_reservation.daily_sales_count
|
|
|
|
# Check if room reservation already exists using batch-loaded data
|
|
existing_room_reservation = existing_rooms.get(
|
|
room_reservation.pms_hotel_reservation_id
|
|
)
|
|
|
|
if existing_room_reservation:
|
|
# Update existing room reservation with all fields
|
|
existing_room_reservation.arrival_date = room_reservation.arrival_date
|
|
existing_room_reservation.departure_date = (
|
|
room_reservation.departure_date
|
|
)
|
|
existing_room_reservation.room_status = room_reservation.room_status
|
|
existing_room_reservation.room_type = room_reservation.room_type
|
|
existing_room_reservation.num_adults = room_reservation.num_adults
|
|
existing_room_reservation.rate_plan_code = (
|
|
room_reservation.rate_plan_code
|
|
)
|
|
existing_room_reservation.connected_room_type = (
|
|
room_reservation.connected_room_type
|
|
)
|
|
existing_room_reservation.daily_sales = (
|
|
room_reservation.daily_sales
|
|
if room_reservation.daily_sales
|
|
else None
|
|
)
|
|
existing_room_reservation.total_revenue = room_reservation.total_revenue
|
|
existing_room_reservation.updated_at = datetime.now()
|
|
_LOGGER.debug(
|
|
"Updated room reservation %s (pms_id=%s, room=%s)",
|
|
existing_room_reservation.id,
|
|
pms_reservation_id,
|
|
room_reservation.room_number,
|
|
)
|
|
else:
|
|
# Create new room reservation
|
|
room_reservation_record = ConversionRoom(
|
|
conversion_id=conversion.id,
|
|
pms_hotel_reservation_id=room_reservation.pms_hotel_reservation_id,
|
|
arrival_date=room_reservation.arrival_date,
|
|
departure_date=room_reservation.departure_date,
|
|
room_status=room_reservation.room_status,
|
|
room_type=room_reservation.room_type,
|
|
room_number=room_reservation.room_number,
|
|
num_adults=room_reservation.num_adults,
|
|
rate_plan_code=room_reservation.rate_plan_code,
|
|
connected_room_type=room_reservation.connected_room_type,
|
|
daily_sales=(
|
|
room_reservation.daily_sales
|
|
if room_reservation.daily_sales
|
|
else None
|
|
),
|
|
total_revenue=room_reservation.total_revenue,
|
|
created_at=datetime.now(),
|
|
updated_at=datetime.now(),
|
|
)
|
|
session.add(room_reservation_record)
|
|
_LOGGER.debug(
|
|
"Created room reservation (pms_id=%s, room=%s, adults=%s)",
|
|
pms_reservation_id,
|
|
room_reservation.room_number,
|
|
room_reservation.num_adults,
|
|
)
|
|
|
|
# Delete room entries that are no longer present in the current XML
|
|
# This handles cases where a reservation is updated and room numbers change
|
|
rooms_to_delete = [
|
|
room
|
|
for pms_id, room in existing_rooms.items()
|
|
if pms_id not in current_pms_hotel_reservation_ids
|
|
]
|
|
|
|
if rooms_to_delete:
|
|
for room in rooms_to_delete:
|
|
await session.delete(room)
|
|
_LOGGER.debug(
|
|
"Deleted room reservation %s (pms_id=%s, room=%s) - no longer in current XML",
|
|
room.id,
|
|
room.pms_hotel_reservation_id,
|
|
room.room_number,
|
|
)
|
|
|
|
return stats
|
|
|
|
async def _match_by_advertising(
|
|
self,
|
|
advertising_campagne: str,
|
|
hotel_id: str | None,
|
|
hashed_first_name: str | None,
|
|
hashed_last_name: str | None,
|
|
hashed_email: str | None,
|
|
advertising_partner: str | None,
|
|
session: AsyncSession | None = None,
|
|
raw_first_name: str | None = None,
|
|
raw_last_name: str | None = None,
|
|
raw_email: str | None = None,
|
|
) -> Reservation | None:
|
|
"""Match reservation by advertising tracking data (fbclid/gclid/md5_unique_id).
|
|
|
|
Args:
|
|
advertising_campagne: Tracking ID from PMS (could be truncated click_id or md5_unique_id)
|
|
hotel_id: Hotel ID for filtering
|
|
hashed_first_name: Guest first name (hashed) for disambiguation
|
|
hashed_last_name: Guest last name (hashed) for disambiguation
|
|
hashed_email: Guest email (hashed) for disambiguation
|
|
advertising_partner: Partner info (matches utm_medium)
|
|
session: AsyncSession to use. If None, uses self.session.
|
|
raw_first_name: Plain guest first name (optional fallback)
|
|
raw_last_name: Plain guest last name (optional fallback)
|
|
raw_email: Plain guest email (optional fallback)
|
|
|
|
Returns:
|
|
Matched Reservation or None
|
|
|
|
"""
|
|
if session is None:
|
|
session = self.session
|
|
# Find reservations where:
|
|
# - fbclid/gclid starts with the advertising_campagne value, OR
|
|
# - md5_unique_id matches exactly (for direct ID matching)
|
|
query = select(Reservation).where(
|
|
or_(
|
|
Reservation.fbclid.like(f"{advertising_campagne}%"),
|
|
Reservation.gclid.like(f"{advertising_campagne}%"),
|
|
Reservation.md5_unique_id == advertising_campagne,
|
|
)
|
|
)
|
|
|
|
# Eagerly load the customer relationship
|
|
query = query.options(selectinload(Reservation.customer))
|
|
|
|
# Add hotel filter if available
|
|
if hotel_id:
|
|
query = query.where(Reservation.hotel_id == hotel_id)
|
|
|
|
# Execute query
|
|
db_result = await session.execute(query)
|
|
reservations = db_result.scalars().all()
|
|
|
|
if not reservations:
|
|
return None
|
|
|
|
# Determine if this looks like a md5_unique_id (32 hex characters) or a click_id
|
|
is_md5_lookup = len(advertising_campagne or "") == 32
|
|
|
|
needs_filtering = len(reservations) > 1 or not is_md5_lookup
|
|
|
|
if not needs_filtering:
|
|
# Confident single match via md5_unique_id
|
|
return reservations[0]
|
|
|
|
# If multiple matches or click-id matches, try to narrow down using hashed guest details
|
|
_LOGGER.info(
|
|
(
|
|
"Ambiguous advertising match for %s (hotel=%s, candidates=%d, md5_lookup=%s). "
|
|
"Applying guest detail filtering."
|
|
),
|
|
advertising_campagne,
|
|
hotel_id,
|
|
len(reservations),
|
|
is_md5_lookup,
|
|
)
|
|
|
|
matched_reservation = self._filter_reservations_by_guest_details(
|
|
reservations,
|
|
raw_first_name,
|
|
raw_last_name,
|
|
raw_email,
|
|
advertising_partner,
|
|
hashed_first_name=hashed_first_name,
|
|
hashed_last_name=hashed_last_name,
|
|
hashed_email=hashed_email,
|
|
)
|
|
|
|
if matched_reservation is None:
|
|
# If we still can't narrow it down, use the first match and log warning
|
|
_LOGGER.warning(
|
|
"Could not narrow down multiple reservations for advertisingCampagne %s "
|
|
"(hotel=%s, guest=%s %s, email=%s). Using first match.",
|
|
advertising_campagne,
|
|
hotel_id,
|
|
raw_first_name,
|
|
raw_last_name,
|
|
raw_email,
|
|
)
|
|
matched_reservation = reservations[0]
|
|
|
|
return matched_reservation
|
|
|
|
async def _match_by_guest_details_hashed(
|
|
self,
|
|
guest_first_name: str | None,
|
|
guest_last_name: str | None,
|
|
guest_email: str | None,
|
|
session: AsyncSession | None = None,
|
|
*,
|
|
conversion_guest: ConversionGuest | None = None,
|
|
) -> Customer | None:
|
|
"""Match guest by name and email directly to Customer (no Reservation needed).
|
|
|
|
This method bypasses the Reservation table entirely and matches directly against
|
|
hashed customer data. Used for guest-detail matching where we don't need to link
|
|
to a specific reservation.
|
|
|
|
Args:
|
|
guest_first_name: Guest first name (pre-hashed)
|
|
guest_last_name: Guest last name (pre-hashed)
|
|
guest_email: Guest email (pre-hashed)
|
|
session: AsyncSession to use. If None, uses self.session.
|
|
|
|
Returns:
|
|
Matched Customer or None
|
|
|
|
"""
|
|
if session is None:
|
|
session = self.session
|
|
|
|
# Query all hashed customers that match the guest details
|
|
query = select(Customer)
|
|
|
|
# Build filter conditions
|
|
conditions = []
|
|
if guest_email:
|
|
conditions.append(Customer.hashed_email == guest_email)
|
|
if guest_first_name and guest_last_name:
|
|
conditions.append(
|
|
(Customer.hashed_given_name == guest_first_name)
|
|
& (Customer.hashed_surname == guest_last_name)
|
|
)
|
|
|
|
if not conditions:
|
|
return None
|
|
|
|
# Combine conditions with OR (match if email matches OR name matches)
|
|
query = query.where(or_(*conditions))
|
|
|
|
db_result = await session.execute(query)
|
|
matches = db_result.scalars().all()
|
|
|
|
if not matches:
|
|
return None
|
|
|
|
# If single match, return it
|
|
if len(matches) == 1:
|
|
return matches[0]
|
|
|
|
best_customer: Customer | None = None
|
|
best_score = -1
|
|
tie = False
|
|
|
|
for candidate in matches:
|
|
candidate_score = self._score_guest_customer_match(
|
|
conversion_guest,
|
|
candidate,
|
|
hashed_first_name=guest_first_name,
|
|
hashed_last_name=guest_last_name,
|
|
hashed_email=guest_email,
|
|
)
|
|
if candidate_score > best_score:
|
|
best_score = candidate_score
|
|
best_customer = candidate
|
|
tie = False
|
|
elif candidate_score == best_score:
|
|
tie = True
|
|
|
|
if best_customer and best_score > 0 and not tie:
|
|
_LOGGER.debug(
|
|
"Multiple hashed customer matches; selected candidate %s via score %s",
|
|
best_customer.id,
|
|
best_score,
|
|
)
|
|
return best_customer
|
|
|
|
_LOGGER.warning(
|
|
"Multiple hashed customer matches found for guest details, using first match"
|
|
)
|
|
return matches[0]
|
|
|
|
def _filter_reservations_by_guest_details(
|
|
self,
|
|
reservations: list[Reservation],
|
|
guest_first_name: str | None,
|
|
guest_last_name: str | None,
|
|
guest_email: str | None,
|
|
advertising_partner: str | None,
|
|
*,
|
|
hashed_first_name: str | None = None,
|
|
hashed_last_name: str | None = None,
|
|
hashed_email: str | None = None,
|
|
) -> Reservation | None:
|
|
"""Filter reservations using guest details to find a single match.
|
|
|
|
Prefers hashed comparisons (exact match on hashed email or hashed name pair) and
|
|
falls back to plaintext comparison if hashes are unavailable. Finally tries
|
|
advertising partner vs utm_medium.
|
|
|
|
Args:
|
|
reservations: List of candidate reservations
|
|
guest_first_name: Guest first name (plaintext, optional)
|
|
guest_last_name: Guest last name (plaintext, optional)
|
|
guest_email: Guest email (plaintext, optional)
|
|
advertising_partner: Partner info (e.g., "Facebook_Mobile_Feed")
|
|
hashed_first_name: Hashed first name for cross-checking
|
|
hashed_last_name: Hashed last name for cross-checking
|
|
hashed_email: Hashed email for cross-checking
|
|
|
|
Returns:
|
|
Single best-match Reservation, or None if no good match found
|
|
|
|
"""
|
|
candidates = reservations
|
|
|
|
# Attempt hashed email match first
|
|
if hashed_email:
|
|
email_matches = [
|
|
reservation
|
|
for reservation in candidates
|
|
if reservation.customer
|
|
and reservation.customer.hashed_email
|
|
and reservation.customer.hashed_email == hashed_email
|
|
]
|
|
if len(email_matches) == 1:
|
|
_LOGGER.debug("Found unique match via hashed email")
|
|
return email_matches[0]
|
|
if email_matches:
|
|
candidates = email_matches
|
|
|
|
# Attempt hashed name match (first + last)
|
|
if hashed_first_name and hashed_last_name:
|
|
name_matches = [
|
|
reservation
|
|
for reservation in candidates
|
|
if reservation.customer
|
|
and reservation.customer.hashed_given_name == hashed_first_name
|
|
and reservation.customer.hashed_surname == hashed_last_name
|
|
]
|
|
if len(name_matches) == 1:
|
|
_LOGGER.debug("Found unique match via hashed names")
|
|
return name_matches[0]
|
|
if name_matches:
|
|
candidates = name_matches
|
|
|
|
# Fallback to plaintext comparison if provided
|
|
if guest_first_name or guest_last_name or guest_email:
|
|
for reservation in candidates:
|
|
customer = reservation.customer
|
|
if not customer:
|
|
continue
|
|
|
|
name_match = True
|
|
email_match = True
|
|
|
|
if guest_first_name:
|
|
name_match = name_match and (
|
|
customer.given_name
|
|
and customer.given_name.lower() == guest_first_name.lower()
|
|
)
|
|
|
|
if guest_last_name:
|
|
name_match = name_match and (
|
|
customer.surname
|
|
and customer.surname.lower() == guest_last_name.lower()
|
|
)
|
|
|
|
if guest_email:
|
|
email_match = email_match and (
|
|
customer.email_address
|
|
and customer.email_address.lower() == guest_email.lower()
|
|
)
|
|
|
|
if name_match and email_match:
|
|
_LOGGER.debug(
|
|
"Found exact plaintext match on guest details for %s %s",
|
|
guest_first_name,
|
|
guest_last_name,
|
|
)
|
|
return reservation
|
|
|
|
# Try to narrow down by advertising_partner matching utm_medium
|
|
if advertising_partner:
|
|
for reservation in candidates:
|
|
if (
|
|
reservation.utm_medium
|
|
and reservation.utm_medium.lower() == advertising_partner.lower()
|
|
):
|
|
_LOGGER.debug(
|
|
"Found match on advertising_partner=%s matching utm_medium",
|
|
advertising_partner,
|
|
)
|
|
return reservation
|
|
|
|
# No single clear match found
|
|
return None
|
|
|
|
async def _extract_unmatched_guests(
|
|
self, session: AsyncSession
|
|
) -> dict[str, Customer]:
|
|
"""Phase 3b: Extract unique guests from unmatched conversions and match them to customers.
|
|
|
|
Returns a mapping of guest_id -> Customer for all unique guests found in
|
|
unmatched conversions. Only processes each guest once.
|
|
|
|
This includes:
|
|
1. Conversions with no customer match at all (reservation_id IS NULL AND customer_id IS NULL)
|
|
2. Conversions matched to a customer but not a reservation (reservation_id IS NULL AND customer_id IS NOT NULL)
|
|
- These may have been matched in a previous run and need re-evaluation for reservation linking
|
|
|
|
Args:
|
|
session: AsyncSession for database queries
|
|
|
|
Returns:
|
|
Dictionary mapping guest_id to matched Customer (or None if no match)
|
|
|
|
"""
|
|
# Find all conversions that either:
|
|
# - Have no match at all (reservation_id IS NULL AND customer_id IS NULL), OR
|
|
# - Have a customer but no reservation (for re-linking in case new reservations were added)
|
|
result = await session.execute(
|
|
select(Conversion)
|
|
.where(
|
|
(Conversion.reservation_id.is_(None))
|
|
& (Conversion.guest_id.isnot(None))
|
|
)
|
|
.options(selectinload(Conversion.guest))
|
|
)
|
|
unmatched_conversions = result.scalars().all()
|
|
|
|
if not unmatched_conversions:
|
|
_LOGGER.debug("Phase 3b: No unmatched conversions with guests")
|
|
return {}
|
|
|
|
# Extract unique guests (by guest_id)
|
|
unique_guests: dict[str, Conversion] = {}
|
|
for conversion in unmatched_conversions:
|
|
if conversion.guest_id not in unique_guests:
|
|
unique_guests[conversion.guest_id] = conversion
|
|
|
|
_LOGGER.info(
|
|
"Phase 3b: Found %d unique guests from %d unmatched conversions",
|
|
len(unique_guests),
|
|
len(unmatched_conversions),
|
|
)
|
|
|
|
# Match each unique guest to a hashed customer
|
|
guest_to_hashed_customer: dict[str, Customer] = {}
|
|
for guest_id, conversion in unique_guests.items():
|
|
conversion_guest = conversion.guest
|
|
if not conversion_guest:
|
|
continue
|
|
|
|
# Try to match by guest details
|
|
matched_hashed_customer = await self._match_by_guest_details_hashed(
|
|
conversion_guest.hashed_first_name,
|
|
conversion_guest.hashed_last_name,
|
|
conversion_guest.hashed_email,
|
|
session,
|
|
conversion_guest=conversion_guest,
|
|
)
|
|
|
|
if matched_hashed_customer:
|
|
guest_to_hashed_customer[guest_id] = matched_hashed_customer
|
|
_LOGGER.debug(
|
|
"Phase 3b: Matched guest %s to hashed_customer %d",
|
|
guest_id,
|
|
matched_hashed_customer.id,
|
|
)
|
|
else:
|
|
guest_to_hashed_customer[guest_id] = None
|
|
_LOGGER.debug("Phase 3b: No match found for guest %s", guest_id)
|
|
|
|
return guest_to_hashed_customer
|
|
|
|
async def _link_matched_guests_to_reservations(
|
|
self,
|
|
guest_to_customer_dict: dict[str, Customer],
|
|
session: AsyncSession,
|
|
stats: dict[str, int],
|
|
) -> None:
|
|
"""Phase 3c: Link conversions from matched guests to reservations based on dates.
|
|
|
|
For each guest matched to a hashed_customer, find all conversions from that guest
|
|
that don't have a reservation yet, and try to link them to reservations based on
|
|
date matching.
|
|
|
|
This includes:
|
|
1. Conversions with no customer match (will link customer first)
|
|
2. Conversions already linked to a customer from a previous run (will try to link to reservation)
|
|
|
|
After all conversions for a guest are processed, check if the guest is a regular
|
|
by looking at whether they have paying conversions that predate any reservations.
|
|
|
|
Args:
|
|
guest_to_customer: Mapping from guest_id to matched Customer
|
|
session: AsyncSession for database queries
|
|
stats: Shared stats dictionary to update
|
|
|
|
"""
|
|
for guest_id, matched_hashed_customer in guest_to_customer_dict.items():
|
|
if not matched_hashed_customer or not matched_hashed_customer.id:
|
|
continue
|
|
|
|
# Find all conversions from this guest that don't have a reservation
|
|
# (whether or not they have a customer match - we might be re-running after new reservations added)
|
|
result = await session.execute(
|
|
select(Conversion)
|
|
.where(
|
|
(Conversion.guest_id == guest_id)
|
|
& (Conversion.reservation_id.is_(None))
|
|
)
|
|
.options(
|
|
selectinload(Conversion.conversion_rooms),
|
|
selectinload(Conversion.guest),
|
|
)
|
|
)
|
|
conversions = result.scalars().all()
|
|
|
|
if not conversions:
|
|
continue
|
|
|
|
_LOGGER.debug(
|
|
"Phase 3c: Processing %d conversions for guest %s (customer_id=%d)",
|
|
len(conversions),
|
|
guest_id,
|
|
matched_hashed_customer.id,
|
|
)
|
|
|
|
# Try to link each conversion to a reservation for this customer
|
|
for conversion in conversions:
|
|
(
|
|
matched_reservation,
|
|
is_attributable,
|
|
) = await self._check_if_attributable(
|
|
matched_hashed_customer.id, conversion, session
|
|
)
|
|
|
|
if matched_reservation and is_attributable:
|
|
# Only update stats if this is a NEW match (wasn't matched before)
|
|
was_previously_matched = conversion.customer_id is not None
|
|
|
|
conversion.reservation_id = matched_reservation.id
|
|
conversion.customer_id = matched_hashed_customer.id
|
|
conversion.directly_attributable = True
|
|
conversion.guest_matched = True
|
|
conversion.updated_at = datetime.now()
|
|
|
|
if not was_previously_matched:
|
|
stats["matched_to_reservation"] += 1
|
|
|
|
_LOGGER.info(
|
|
"Phase 3c: Linked conversion (pms_id=%s) to reservation %d via guest matching",
|
|
conversion.pms_reservation_id,
|
|
matched_reservation.id,
|
|
)
|
|
elif matched_hashed_customer and conversion.customer_id is None:
|
|
# Only count new customer matches (conversions that didn't have a customer before)
|
|
conversion.customer_id = matched_hashed_customer.id
|
|
conversion.directly_attributable = False
|
|
conversion.guest_matched = True
|
|
conversion.updated_at = datetime.now()
|
|
stats["matched_to_customer"] += 1
|
|
|
|
# After all conversions for this guest are processed, check if guest is regular
|
|
# Look at ALL conversions from this guest to see if there are pre-dated payments
|
|
if conversions and conversions[0].guest:
|
|
await self._check_if_guest_is_regular(
|
|
guest_id, matched_hashed_customer.id, session
|
|
)
|
|
|
|
async def _check_regularity_for_all_matched_guests(
|
|
self, session: AsyncSession
|
|
) -> None:
|
|
"""Phase 3d: Check regularity for ALL matched guests (both ID-matched and guest-detail-matched).
|
|
|
|
This is called after all matching is complete to evaluate every guest that has been
|
|
matched to a customer, regardless of match type. This ensures consistent regularity
|
|
evaluation across all matched conversions.
|
|
|
|
This is run on ALL matched guests, not just newly matched ones, to ensure that if
|
|
the regularity logic changes, it gets re-applied to all guests on the next run.
|
|
This maintains idempotency of the matching process.
|
|
|
|
Args:
|
|
session: AsyncSession for database queries
|
|
|
|
"""
|
|
# Collect every (hotel, guest) -> customer pair derived from conversions.
|
|
result = await session.execute(
|
|
select(Conversion.hotel_id, Conversion.guest_id, Conversion.customer_id).where(
|
|
Conversion.guest_id.isnot(None), Conversion.customer_id.isnot(None)
|
|
)
|
|
)
|
|
guest_customer_rows = result.all()
|
|
|
|
if not guest_customer_rows:
|
|
_LOGGER.debug("Phase 3d: No matched guests to check for regularity")
|
|
return
|
|
|
|
# Group by guest and by customer to detect conflicts in both directions.
|
|
guest_customer_sets: dict[tuple[str | None, int], set[int]] = {}
|
|
customer_guest_sets: dict[int, set[tuple[str | None, int]]] = {}
|
|
for hotel_id, guest_id, customer_id in guest_customer_rows:
|
|
if hotel_id is None or guest_id is None or customer_id is None:
|
|
continue
|
|
guest_key = (hotel_id, guest_id)
|
|
guest_customer_sets.setdefault(guest_key, set()).add(customer_id)
|
|
customer_guest_sets.setdefault(customer_id, set()).add(guest_key)
|
|
|
|
if not guest_customer_sets:
|
|
_LOGGER.debug("Phase 3d: No matched guests to check for regularity")
|
|
return
|
|
|
|
guest_duplicates = {
|
|
key: customer_ids
|
|
for key, customer_ids in guest_customer_sets.items()
|
|
if len(customer_ids) > 1
|
|
}
|
|
if guest_duplicates:
|
|
await self._deduplicate_guest_customer_links(guest_duplicates, session)
|
|
|
|
customer_duplicates = {
|
|
customer_id: guest_keys
|
|
for customer_id, guest_keys in customer_guest_sets.items()
|
|
if len(guest_keys) > 1
|
|
}
|
|
if customer_duplicates:
|
|
await self._deduplicate_customer_guest_links(customer_duplicates, session)
|
|
|
|
refreshed = await session.execute(
|
|
select(
|
|
Conversion.hotel_id, Conversion.guest_id, Conversion.customer_id
|
|
).where(Conversion.guest_id.isnot(None), Conversion.customer_id.isnot(None))
|
|
)
|
|
guest_to_customer: dict[tuple[str | None, int], int] = {}
|
|
for hotel_id, guest_id, customer_id in refreshed.all():
|
|
if hotel_id is None or guest_id is None or customer_id is None:
|
|
continue
|
|
guest_to_customer[(hotel_id, guest_id)] = customer_id
|
|
|
|
if not guest_to_customer:
|
|
_LOGGER.debug(
|
|
"Phase 3d: No guests remained linked to customers after deduplication"
|
|
)
|
|
return
|
|
|
|
_LOGGER.debug(
|
|
"Phase 3d: Checking regularity for %d matched guests", len(guest_to_customer)
|
|
)
|
|
|
|
for (hotel_id, guest_id), customer_id in guest_to_customer.items():
|
|
await self._check_if_guest_is_regular(guest_id, customer_id, session)
|
|
|
|
async def _match_conversions_from_db_sequential(
|
|
self, pms_reservation_ids: list[str], stats: dict[str, int]
|
|
) -> None:
|
|
"""Phase 3a: Match conversions sequentially using database data only.
|
|
|
|
Performs ID-based matching for all conversions. Then extracts unique guests
|
|
and performs guest detail matching in phases 3b and 3c.
|
|
"""
|
|
semaphore = asyncio.Semaphore(1) # Process one at a time
|
|
async with asyncio.TaskGroup() as tg:
|
|
for pms_id in pms_reservation_ids:
|
|
tg.create_task(
|
|
self._match_conversion_from_db_safe(pms_id, semaphore, stats)
|
|
)
|
|
|
|
# After Phase 3a (ID-based matching), do Phase 3b and 3c (guest detail matching)
|
|
if self.session_maker:
|
|
session = await self.session_maker.create_session()
|
|
else:
|
|
session = self.session
|
|
|
|
try:
|
|
# Phase 3b: Extract and cache unique guests from unmatched conversions
|
|
guest_to_hashed_customer = await self._extract_unmatched_guests(session)
|
|
|
|
# Phase 3c: Link matched guests to reservations
|
|
if guest_to_hashed_customer:
|
|
await self._link_matched_guests_to_reservations(
|
|
guest_to_hashed_customer, session, stats
|
|
)
|
|
|
|
# Phase 3d: Check regularity for all matched guests (both ID and guest-detail matched)
|
|
await self._check_regularity_for_all_matched_guests(session)
|
|
|
|
await session.commit()
|
|
except Exception as e:
|
|
await session.rollback()
|
|
_LOGGER.exception("Error in Phase 3b/3c/3d guest matching: %s", e)
|
|
finally:
|
|
if self.session_maker:
|
|
await session.close()
|
|
|
|
async def _match_conversions_from_db_concurrent(
|
|
self, pms_reservation_ids: list[str], stats: dict[str, int]
|
|
) -> None:
|
|
"""Phase 3a: Match conversions concurrently using database data only.
|
|
|
|
Performs ID-based matching for all conversions concurrently. Then extracts unique guests
|
|
and performs guest detail matching sequentially in phases 3b and 3c.
|
|
|
|
Each concurrent task gets its own independent database session
|
|
from the SessionMaker.
|
|
"""
|
|
if not self.session_maker:
|
|
_LOGGER.error(
|
|
"Concurrent matching requested but SessionMaker not available. "
|
|
"Falling back to sequential matching."
|
|
)
|
|
await self._match_conversions_from_db_sequential(pms_reservation_ids, stats)
|
|
return
|
|
|
|
semaphore = asyncio.Semaphore(MAX_CONCURRENT_RESERVATIONS)
|
|
async with asyncio.TaskGroup() as tg:
|
|
for pms_id in pms_reservation_ids:
|
|
tg.create_task(
|
|
self._match_conversion_from_db_safe(pms_id, semaphore, stats)
|
|
)
|
|
|
|
# After Phase 3a (ID-based matching), do Phase 3b and 3c (guest detail matching)
|
|
# Use sequential processing for guest matching to avoid duplicating work
|
|
session = await self.session_maker.create_session()
|
|
|
|
try:
|
|
# Phase 3b: Extract and cache unique guests from unmatched conversions
|
|
guest_to_hashed_customer = await self._extract_unmatched_guests(session)
|
|
|
|
# Phase 3c: Link matched guests to reservations
|
|
if guest_to_hashed_customer:
|
|
await self._link_matched_guests_to_reservations(
|
|
guest_to_hashed_customer, session, stats
|
|
)
|
|
|
|
# Phase 3d: Check regularity for all matched guests (both ID and guest-detail matched)
|
|
await self._check_regularity_for_all_matched_guests(session)
|
|
|
|
await session.commit()
|
|
except Exception as e:
|
|
await session.rollback()
|
|
_LOGGER.exception("Error in Phase 3b/3c/3d guest matching: %s", e)
|
|
finally:
|
|
await session.close()
|
|
|
|
async def _match_conversion_from_db_safe(
|
|
self,
|
|
pms_reservation_id: int,
|
|
semaphore: asyncio.Semaphore,
|
|
stats: dict[str, int],
|
|
) -> None:
|
|
"""Phase 2: Safely match a conversion using only database data with transaction management.
|
|
|
|
In concurrent mode, creates its own session from SessionMaker.
|
|
In sequential mode, uses the shared session.
|
|
|
|
Args:
|
|
pms_reservation_id: PMS reservation ID to match
|
|
semaphore: Semaphore to limit concurrent operations
|
|
stats: Shared stats dictionary (thread-safe due to GIL)
|
|
|
|
"""
|
|
async with semaphore:
|
|
# In concurrent mode, create a new session for this task
|
|
if self.session_maker:
|
|
session = await self.session_maker.create_session()
|
|
else:
|
|
session = self.session
|
|
|
|
try:
|
|
# Phase 2: Match conversion using only database data
|
|
await self._match_conversion_using_db_data(
|
|
pms_reservation_id, session, stats
|
|
)
|
|
|
|
# Commit this task's transaction
|
|
await session.commit()
|
|
_LOGGER.debug(
|
|
"Successfully matched conversion for reservation %s",
|
|
pms_reservation_id,
|
|
)
|
|
|
|
except Exception as e:
|
|
# Rollback this task's transaction
|
|
await session.rollback()
|
|
_LOGGER.exception(
|
|
"Error matching conversion for reservation %s: %s",
|
|
pms_reservation_id,
|
|
e,
|
|
)
|
|
stats["errors"] += 1
|
|
finally:
|
|
# Close the session if it was created by SessionMaker
|
|
if self.session_maker:
|
|
await session.close()
|
|
|
|
async def _match_conversion_using_db_data(
|
|
self,
|
|
pms_reservation_id: int,
|
|
session: AsyncSession | None = None,
|
|
stats: dict[str, int] | None = None,
|
|
) -> None:
|
|
"""Phase 3a: Match a conversion using ID-based matching only.
|
|
|
|
This method reads both the conversion and conversion_guest from the database
|
|
and uses their stored hashed data to match to existing reservations via
|
|
advertising data (fbclid/gclid/md5_unique_id).
|
|
|
|
Guest detail matching is deferred to Phase 3b/3c to avoid reprocessing the same
|
|
guest multiple times.
|
|
|
|
Updates stats dictionary in-place if provided.
|
|
|
|
Args:
|
|
pms_reservation_id: PMS reservation ID to match
|
|
session: AsyncSession to use
|
|
stats: Shared stats dictionary to update (optional)
|
|
|
|
"""
|
|
if session is None:
|
|
session = self.session
|
|
|
|
if not self.hotel_id:
|
|
_LOGGER.error(
|
|
"Cannot match conversion: hotel_id not set in ConversionService"
|
|
)
|
|
return
|
|
|
|
# Get the conversion from the database with related data
|
|
result = await session.execute(
|
|
select(Conversion)
|
|
.where(
|
|
Conversion.hotel_id == self.hotel_id,
|
|
Conversion.pms_reservation_id == pms_reservation_id,
|
|
)
|
|
.options(
|
|
selectinload(Conversion.guest),
|
|
selectinload(Conversion.conversion_rooms),
|
|
)
|
|
)
|
|
conversion = result.scalar_one_or_none()
|
|
|
|
if not conversion:
|
|
# This should be extremely rare since we filtered in process_conversion_xml
|
|
_LOGGER.debug(
|
|
"Conversion not found for pms_reservation_id=%s (may have been deleted)",
|
|
pms_reservation_id,
|
|
)
|
|
return
|
|
|
|
# Get conversion_guest if it exists (has the hashed data)
|
|
conversion_guest = conversion.guest
|
|
|
|
# Extract hashed and raw data from conversion_guest
|
|
hashed_first_name = None
|
|
hashed_last_name = None
|
|
hashed_email = None
|
|
raw_first_name = None
|
|
raw_last_name = None
|
|
raw_email = None
|
|
|
|
if conversion_guest:
|
|
hashed_first_name = conversion_guest.hashed_first_name
|
|
hashed_last_name = conversion_guest.hashed_last_name
|
|
hashed_email = conversion_guest.hashed_email
|
|
raw_first_name = conversion_guest.guest_first_name
|
|
raw_last_name = conversion_guest.guest_last_name
|
|
raw_email = conversion_guest.guest_email
|
|
|
|
# Phase 3a: Only try ID-based matching (fbclid/gclid/md5_unique_id)
|
|
# Guest detail matching is deferred to Phase 3b/3c
|
|
matched_reservation = None
|
|
matched_customer = None
|
|
|
|
if conversion.advertising_campagne:
|
|
matched_reservation = await self._match_by_advertising(
|
|
conversion.advertising_campagne,
|
|
conversion.hotel_id,
|
|
hashed_first_name,
|
|
hashed_last_name,
|
|
hashed_email,
|
|
conversion.advertising_partner,
|
|
session,
|
|
raw_first_name=raw_first_name,
|
|
raw_last_name=raw_last_name,
|
|
raw_email=raw_email,
|
|
)
|
|
|
|
if matched_reservation:
|
|
matched_customer = matched_reservation.customer
|
|
|
|
_LOGGER.info(
|
|
"Phase 3a: Matched conversion by advertising ID (pms_id=%s, reservation_id=%d)",
|
|
pms_reservation_id,
|
|
matched_reservation.id,
|
|
)
|
|
|
|
# Update the conversion with matched entities if found
|
|
if matched_reservation or matched_customer:
|
|
conversion.reservation_id = (
|
|
matched_reservation.id if matched_reservation else None
|
|
)
|
|
conversion.customer_id = matched_customer.id if matched_customer else None
|
|
|
|
|
|
# ID-based matches are always directly attributable
|
|
conversion.directly_attributable = True
|
|
conversion.guest_matched = False
|
|
|
|
conversion.updated_at = datetime.now()
|
|
|
|
# Update stats if provided
|
|
if stats is not None:
|
|
if matched_reservation:
|
|
stats["matched_to_reservation"] += 1
|
|
elif matched_customer:
|
|
stats["matched_to_customer"] += 1
|
|
else:
|
|
stats["unmatched"] += 1
|
|
|
|
async def _check_if_guest_is_regular(
|
|
self,
|
|
guest_id: str,
|
|
customer_id: int,
|
|
session: AsyncSession,
|
|
) -> None:
|
|
"""Check if a guest is a regular customer based on conversion and reservation history.
|
|
|
|
A guest is regular if they have conversions with paying bookings that predate their first
|
|
reservation sent by us. This indicates they were already a customer before we started tracking.
|
|
|
|
This check is done AFTER all conversions for a guest have been processed and matched,
|
|
so it can evaluate the complete picture of their payment history vs our reservation history.
|
|
|
|
Args:
|
|
guest_id: The guest ID to evaluate
|
|
customer_id: The matched customer ID
|
|
session: AsyncSession for database queries
|
|
|
|
"""
|
|
# Get the ConversionGuest record
|
|
guest_result = await session.execute(
|
|
select(ConversionGuest).where(ConversionGuest.guest_id == guest_id)
|
|
)
|
|
conversion_guest = guest_result.scalar_one_or_none()
|
|
|
|
if not conversion_guest:
|
|
return
|
|
|
|
# Find the earliest paying conversion for this guest (across all hotels)
|
|
# Look for conversions with actual revenue
|
|
earliest_paying_conversion_result = await session.execute(
|
|
select(Conversion)
|
|
.join(ConversionRoom, Conversion.id == ConversionRoom.conversion_id)
|
|
.where(
|
|
Conversion.guest_id == guest_id,
|
|
ConversionRoom.total_revenue.isnot(None),
|
|
ConversionRoom.total_revenue > Decimal(0),
|
|
)
|
|
.order_by(Conversion.reservation_date.asc())
|
|
.limit(1)
|
|
)
|
|
earliest_paying_conversion = (
|
|
earliest_paying_conversion_result.scalar_one_or_none()
|
|
)
|
|
|
|
if not earliest_paying_conversion:
|
|
# No paying conversions found for this guest
|
|
conversion_guest.is_regular = False
|
|
return
|
|
|
|
# Find the earliest reservation sent to this customer
|
|
earliest_reservation_result = await session.execute(
|
|
select(Reservation)
|
|
.where(Reservation.customer_id == customer_id)
|
|
.order_by(Reservation.start_date.asc())
|
|
.limit(1)
|
|
)
|
|
earliest_reservation = earliest_reservation_result.scalar_one_or_none()
|
|
|
|
if not earliest_reservation:
|
|
# No reservations for this customer yet, can't determine regularity
|
|
conversion_guest.is_regular = False
|
|
return
|
|
|
|
# Guest is regular if their earliest paying conversion predates our first reservation
|
|
# (meaning they were already a customer before we sent them a reservation)
|
|
# Compare against the reservation's creation date (when WE created/sent it), not check-in date
|
|
# Convert created_at to date for comparison with reservation_date (both are dates)
|
|
is_regular = (
|
|
earliest_paying_conversion.reservation_date
|
|
< earliest_reservation.created_at.date()
|
|
)
|
|
conversion_guest.is_regular = is_regular
|
|
|
|
async def _deduplicate_guest_customer_links(
|
|
self,
|
|
duplicates: dict[tuple[str | None, int], set[int]],
|
|
session: AsyncSession,
|
|
) -> None:
|
|
"""Resolve guest/customer conflicts by comparing hashed details and severing bad links."""
|
|
for (hotel_id, guest_id), customer_ids in duplicates.items():
|
|
guest_result = await session.execute(
|
|
select(ConversionGuest).where(
|
|
ConversionGuest.hotel_id == hotel_id,
|
|
ConversionGuest.guest_id == guest_id,
|
|
)
|
|
)
|
|
conversion_guest = guest_result.scalar_one_or_none()
|
|
|
|
if not conversion_guest:
|
|
_LOGGER.warning(
|
|
"Guest %s (hotel=%s) missing when resolving duplicates; removing links to customers %s",
|
|
guest_id,
|
|
hotel_id,
|
|
sorted(customer_ids),
|
|
)
|
|
for customer_id in customer_ids:
|
|
await self._sever_guest_customer_link(
|
|
hotel_id, guest_id, customer_id, session
|
|
)
|
|
continue
|
|
|
|
preferred_customer_id = await self._choose_best_customer_for_guest(
|
|
conversion_guest, customer_ids, session
|
|
)
|
|
|
|
if preferred_customer_id:
|
|
_LOGGER.warning(
|
|
"Guest %s (hotel=%s) linked to multiple customers %s; keeping %s based on hashed data",
|
|
guest_id,
|
|
hotel_id,
|
|
sorted(customer_ids),
|
|
preferred_customer_id,
|
|
)
|
|
else:
|
|
_LOGGER.warning(
|
|
"Guest %s (hotel=%s) linked to multiple customers %s but none matched hashed data. Removing all links.",
|
|
guest_id,
|
|
hotel_id,
|
|
sorted(customer_ids),
|
|
)
|
|
|
|
for customer_id in customer_ids:
|
|
if customer_id != preferred_customer_id:
|
|
await self._sever_guest_customer_link(
|
|
hotel_id, guest_id, customer_id, session
|
|
)
|
|
|
|
def _score_guest_customer_match(
|
|
self,
|
|
conversion_guest: ConversionGuest | None,
|
|
customer: Customer | None,
|
|
*,
|
|
hashed_first_name: str | None = None,
|
|
hashed_last_name: str | None = None,
|
|
hashed_email: str | None = None,
|
|
) -> int:
|
|
"""Score how well a guest matches a customer using hashed data."""
|
|
if not customer:
|
|
return -1
|
|
|
|
score = 0
|
|
guest_email_hash = (
|
|
hashed_email or (conversion_guest.hashed_email if conversion_guest else None)
|
|
)
|
|
guest_first_hash = (
|
|
hashed_first_name
|
|
or (conversion_guest.hashed_first_name if conversion_guest else None)
|
|
)
|
|
guest_last_hash = (
|
|
hashed_last_name
|
|
or (conversion_guest.hashed_last_name if conversion_guest else None)
|
|
)
|
|
|
|
if guest_email_hash and customer.hashed_email == guest_email_hash:
|
|
score += 200
|
|
if guest_first_hash and guest_last_hash:
|
|
if (
|
|
customer.hashed_given_name == guest_first_hash
|
|
and customer.hashed_surname == guest_last_hash
|
|
):
|
|
score += 50
|
|
elif guest_first_hash and customer.hashed_given_name == guest_first_hash:
|
|
score += 10
|
|
elif guest_last_hash and customer.hashed_surname == guest_last_hash:
|
|
score += 10
|
|
|
|
if conversion_guest:
|
|
if (
|
|
conversion_guest.hashed_country_code
|
|
and customer.hashed_country_code
|
|
== conversion_guest.hashed_country_code
|
|
):
|
|
score += 5
|
|
if (
|
|
conversion_guest.hashed_birth_date
|
|
and customer.hashed_birth_date == conversion_guest.hashed_birth_date
|
|
):
|
|
score += 2
|
|
|
|
return score
|
|
|
|
async def _choose_best_customer_for_guest(
|
|
self,
|
|
conversion_guest: ConversionGuest,
|
|
candidate_customer_ids: set[int],
|
|
session: AsyncSession,
|
|
) -> int | None:
|
|
"""Pick the most likely customer based on hashed data."""
|
|
if not candidate_customer_ids:
|
|
return None
|
|
|
|
result = await session.execute(
|
|
select(Customer).where(Customer.id.in_(candidate_customer_ids))
|
|
)
|
|
candidates = result.scalars().all()
|
|
|
|
if not candidates:
|
|
return None
|
|
|
|
best_customer_id = None
|
|
best_score = -1
|
|
is_tied = False
|
|
|
|
for customer in candidates:
|
|
score = self._score_guest_customer_match(conversion_guest, customer)
|
|
if score > best_score:
|
|
best_score = score
|
|
best_customer_id = customer.id
|
|
is_tied = False
|
|
elif score == best_score and score != -1:
|
|
is_tied = True
|
|
|
|
if best_score <= 0 or is_tied:
|
|
return None
|
|
|
|
return best_customer_id
|
|
|
|
async def _deduplicate_customer_guest_links(
|
|
self,
|
|
duplicates: dict[int, set[tuple[str | None, int]]],
|
|
session: AsyncSession,
|
|
) -> None:
|
|
"""Ensure each customer is linked to at most one guest."""
|
|
for customer_id, guest_keys in duplicates.items():
|
|
customer_result = await session.execute(
|
|
select(Customer).where(Customer.id == customer_id)
|
|
)
|
|
customer = customer_result.scalar_one_or_none()
|
|
|
|
guest_records: list[tuple[str | None, int, ConversionGuest | None]] = []
|
|
for hotel_id, guest_id in guest_keys:
|
|
guest_result = await session.execute(
|
|
select(ConversionGuest).where(
|
|
ConversionGuest.hotel_id == hotel_id,
|
|
ConversionGuest.guest_id == guest_id,
|
|
)
|
|
)
|
|
guest_records.append((hotel_id, guest_id, guest_result.scalar_one_or_none()))
|
|
|
|
if not customer:
|
|
_LOGGER.warning(
|
|
"Customer %s missing while deduplicating guests; severing links %s",
|
|
customer_id,
|
|
guest_keys,
|
|
)
|
|
for hotel_id, guest_id, _ in guest_records:
|
|
await self._sever_guest_customer_link(
|
|
hotel_id, guest_id, customer_id, session
|
|
)
|
|
continue
|
|
|
|
best_key: tuple[str | None, int] | None = None
|
|
best_score = -1
|
|
is_tied = False
|
|
for hotel_id, guest_id, guest in guest_records:
|
|
score = self._score_guest_customer_match(guest, customer)
|
|
if score > best_score:
|
|
best_score = score
|
|
best_key = (hotel_id, guest_id)
|
|
is_tied = False
|
|
elif score == best_score:
|
|
is_tied = True
|
|
|
|
if not best_key or best_score <= 0 or is_tied:
|
|
_LOGGER.warning(
|
|
"Customer %s linked to guests %s but no clear match; removing all links",
|
|
customer_id,
|
|
guest_keys,
|
|
)
|
|
for hotel_id, guest_id, _ in guest_records:
|
|
await self._sever_guest_customer_link(
|
|
hotel_id, guest_id, customer_id, session
|
|
)
|
|
continue
|
|
|
|
_LOGGER.warning(
|
|
"Customer %s linked to multiple guests %s; keeping guest %s (hotel=%s, score=%s)",
|
|
customer_id,
|
|
guest_keys,
|
|
best_key[1],
|
|
best_key[0],
|
|
best_score,
|
|
)
|
|
for hotel_id, guest_id, _ in guest_records:
|
|
if (hotel_id, guest_id) != best_key:
|
|
await self._sever_guest_customer_link(
|
|
hotel_id, guest_id, customer_id, session
|
|
)
|
|
|
|
async def _sever_guest_customer_link(
|
|
self,
|
|
hotel_id: str | None,
|
|
guest_id: int,
|
|
customer_id: int,
|
|
session: AsyncSession,
|
|
) -> None:
|
|
"""Remove incorrect guest/customer links from conversions."""
|
|
result = await session.execute(
|
|
select(Conversion)
|
|
.where(
|
|
Conversion.hotel_id == hotel_id,
|
|
Conversion.guest_id == guest_id,
|
|
Conversion.customer_id == customer_id,
|
|
)
|
|
.options(selectinload(Conversion.conversion_rooms))
|
|
)
|
|
conversions = result.scalars().all()
|
|
|
|
if not conversions:
|
|
return
|
|
|
|
for conversion in conversions:
|
|
conversion.customer_id = None
|
|
conversion.reservation_id = None
|
|
conversion.directly_attributable = False
|
|
conversion.guest_matched = False
|
|
conversion.updated_at = datetime.now()
|
|
|
|
_LOGGER.warning(
|
|
"Removed %d conversion links for guest %s (hotel=%s) customer %s",
|
|
len(conversions),
|
|
guest_id,
|
|
hotel_id,
|
|
customer_id,
|
|
)
|
|
|
|
|
|
|
|
async def _check_if_attributable(
|
|
self,
|
|
matched_customer_id: int,
|
|
conversion: Conversion,
|
|
session: AsyncSession,
|
|
exclude_reservation_id: int | None = None,
|
|
) -> tuple[Reservation | None, bool]:
|
|
"""Search for and check if a customer has an attributable reservation for this conversion.
|
|
|
|
Finds reservations linked to the customer and checks if any have dates matching this conversion.
|
|
A conversion is attributable ONLY if the conversion_room dates match a reservation's dates closely.
|
|
|
|
Args:
|
|
matched_customer_id: The customer ID to search reservations for
|
|
conversion: The Conversion record being evaluated
|
|
session: AsyncSession for database queries
|
|
exclude_reservation_id: Reservation ID to exclude from search (if already matched)
|
|
|
|
Returns:
|
|
Tuple of (matched_reservation, is_attributable) where:
|
|
- matched_reservation: The Reservation that matches (if any)
|
|
- is_attributable: True if the reservation's dates match this conversion
|
|
|
|
"""
|
|
# Check if conversion_room dates exist (criterion for attributability)
|
|
if not conversion.conversion_rooms:
|
|
return None, False
|
|
|
|
# Find reservations for this customer
|
|
query = select(Reservation).where(
|
|
Reservation.customer_id == matched_customer_id
|
|
)
|
|
if exclude_reservation_id:
|
|
query = query.where(Reservation.id != exclude_reservation_id)
|
|
|
|
db_result = await session.execute(query)
|
|
reservations = db_result.scalars().all()
|
|
|
|
if not reservations:
|
|
return None, False
|
|
|
|
# Check each reservation for date match
|
|
for reservation in reservations:
|
|
for room in conversion.conversion_rooms:
|
|
if (
|
|
room.arrival_date
|
|
and room.departure_date
|
|
and reservation.start_date
|
|
and reservation.end_date
|
|
):
|
|
# Check if dates match or mostly match (within 7 day tolerance)
|
|
arrival_match = (
|
|
abs((room.arrival_date - reservation.start_date).days) <= 7
|
|
)
|
|
departure_match = (
|
|
abs((room.departure_date - reservation.end_date).days) <= 7
|
|
)
|
|
|
|
if arrival_match and departure_match:
|
|
_LOGGER.info(
|
|
"Found attributable reservation for customer %d: "
|
|
"room dates %s-%s match reservation dates %s-%s",
|
|
matched_customer_id,
|
|
room.arrival_date,
|
|
room.departure_date,
|
|
reservation.start_date,
|
|
reservation.end_date,
|
|
)
|
|
return reservation, True
|
|
|
|
return None, False
|