New pydantic model for ConversionGuest
This commit is contained in:
@@ -1,13 +1,12 @@
|
||||
"""Service for handling conversion data from hotel PMS XML files."""
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import xml.etree.ElementTree as ET
|
||||
from datetime import UTC, datetime
|
||||
from decimal import Decimal
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy import insert, or_, select
|
||||
from sqlalchemy import or_, select
|
||||
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.orm import selectinload
|
||||
@@ -22,6 +21,7 @@ from .db import (
|
||||
SessionMaker,
|
||||
)
|
||||
from .logging_config import get_logger
|
||||
from .schemas import ConversionGuestData
|
||||
|
||||
_LOGGER = get_logger(__name__)
|
||||
|
||||
@@ -37,7 +37,11 @@ class ConversionService:
|
||||
2. Concurrent mode: SessionMaker passed in, creates independent sessions per task
|
||||
"""
|
||||
|
||||
def __init__(self, session: AsyncSession | SessionMaker | None = None, hotel_id: str | None = None):
|
||||
def __init__(
|
||||
self,
|
||||
session: AsyncSession | SessionMaker | None = None,
|
||||
hotel_id: str | None = None,
|
||||
):
|
||||
"""Initialize the ConversionService.
|
||||
|
||||
Args:
|
||||
@@ -81,17 +85,19 @@ class ConversionService:
|
||||
|
||||
async def _extract_unique_guests_from_xml(
|
||||
self, reservations: list
|
||||
) -> dict[tuple[str, str | None], dict]:
|
||||
) -> dict[tuple[str, int], ConversionGuestData]:
|
||||
"""Extract and deduplicate all guest data from XML reservations.
|
||||
|
||||
Phase 0: Single pass through XML to collect all unique guests.
|
||||
Uses (hotel_id, guest_id) as the key for deduplication.
|
||||
Validates each guest using Pydantic before storing.
|
||||
|
||||
Args:
|
||||
reservations: List of XML reservation elements
|
||||
|
||||
Returns:
|
||||
Dictionary mapping (hotel_id, guest_id) to guest data dict
|
||||
Dictionary mapping (hotel_id, guest_id) to validated ConversionGuestData
|
||||
|
||||
"""
|
||||
guest_data_by_key = {}
|
||||
now = datetime.now(UTC)
|
||||
@@ -101,7 +107,10 @@ class ConversionService:
|
||||
guest_elem = reservation_elem.find("guest")
|
||||
|
||||
if guest_elem is None:
|
||||
_LOGGER.debug("No guest element found, skipping reservation %s (will be created with guest_id=None in Phase 2)", reservation_elem.get("id"))
|
||||
_LOGGER.debug(
|
||||
"No guest element found, skipping reservation %s (will be created with guest_id=None in Phase 2)",
|
||||
reservation_elem.get("id"),
|
||||
)
|
||||
continue
|
||||
|
||||
guest_id = guest_elem.get("id")
|
||||
@@ -111,33 +120,50 @@ class ConversionService:
|
||||
guest_country_code = guest_elem.get("countryCode")
|
||||
guest_birth_date_str = guest_elem.get("dateOfBirth")
|
||||
|
||||
|
||||
|
||||
guest_birth_date = None
|
||||
if guest_birth_date_str:
|
||||
try:
|
||||
guest_birth_date = datetime.strptime(guest_birth_date_str, "%Y-%m-%d").date()
|
||||
guest_birth_date = datetime.strptime(
|
||||
guest_birth_date_str, "%Y-%m-%d"
|
||||
).date()
|
||||
except ValueError:
|
||||
_LOGGER.warning("Invalid birth date format: %s", guest_birth_date_str)
|
||||
_LOGGER.warning(
|
||||
"Invalid birth date format: %s", guest_birth_date_str
|
||||
)
|
||||
|
||||
key = (hotel_id, guest_id)
|
||||
# Validate guest data with Pydantic during extraction
|
||||
try:
|
||||
validated_guest = ConversionGuestData(
|
||||
hotel_id=hotel_id,
|
||||
guest_id=guest_id, # Will be validated and converted to int
|
||||
guest_first_name=guest_first_name,
|
||||
guest_last_name=guest_last_name,
|
||||
guest_email=guest_email,
|
||||
guest_country_code=guest_country_code,
|
||||
guest_birth_date=guest_birth_date,
|
||||
first_seen=now,
|
||||
last_seen=now,
|
||||
)
|
||||
|
||||
# Store guest data by key (will keep the last occurrence from XML)
|
||||
guest_data_by_key[key] = {
|
||||
"hotel_id": hotel_id,
|
||||
"guest_id": guest_id,
|
||||
"guest_first_name": guest_first_name,
|
||||
"guest_last_name": guest_last_name,
|
||||
"guest_email": guest_email,
|
||||
"guest_country_code": guest_country_code,
|
||||
"guest_birth_date": guest_birth_date,
|
||||
"now": now,
|
||||
}
|
||||
# Use validated guest_id (now an int) for the key
|
||||
key = (hotel_id, validated_guest.guest_id)
|
||||
|
||||
# Store validated guest data (will keep the last occurrence from XML)
|
||||
guest_data_by_key[key] = validated_guest
|
||||
|
||||
except ValueError:
|
||||
_LOGGER.exception(
|
||||
"Failed to validate guest data for reservation %s",
|
||||
reservation_elem.get("id"),
|
||||
)
|
||||
continue
|
||||
|
||||
return guest_data_by_key
|
||||
|
||||
async def _bulk_upsert_guests(
|
||||
self, session: AsyncSession, guest_data_by_key: dict[tuple[str, str | None], dict]
|
||||
self,
|
||||
session: AsyncSession,
|
||||
guest_data_by_key: dict[tuple[str, int], ConversionGuestData],
|
||||
) -> None:
|
||||
"""Bulk upsert all unique guests to database using PostgreSQL ON CONFLICT.
|
||||
|
||||
@@ -147,7 +173,9 @@ class ConversionService:
|
||||
|
||||
Args:
|
||||
session: AsyncSession to use
|
||||
guest_data_by_key: Dictionary mapping (hotel_id, guest_id) to guest data
|
||||
guest_data_by_key: Dictionary mapping (hotel_id, guest_id) to
|
||||
validated ConversionGuestData
|
||||
|
||||
"""
|
||||
if not guest_data_by_key:
|
||||
return
|
||||
@@ -160,28 +188,12 @@ class ConversionService:
|
||||
batch_end = min(batch_start + batch_size, len(items))
|
||||
batch_items = items[batch_start:batch_end]
|
||||
|
||||
# Prepare list of values for this batch
|
||||
# Prepare list of values for this batch (already validated)
|
||||
values_list = []
|
||||
for (hotel_id, guest_id), guest_data in batch_items:
|
||||
now = guest_data["now"]
|
||||
values_list.append({
|
||||
"hotel_id": guest_data["hotel_id"],
|
||||
"guest_id": guest_data["guest_id"],
|
||||
"guest_first_name": guest_data["guest_first_name"],
|
||||
"guest_last_name": guest_data["guest_last_name"],
|
||||
"guest_email": guest_data["guest_email"],
|
||||
"guest_country_code": guest_data["guest_country_code"],
|
||||
"guest_birth_date": guest_data["guest_birth_date"],
|
||||
"hashed_first_name": ConversionGuest._normalize_and_hash(guest_data["guest_first_name"]),
|
||||
"hashed_last_name": ConversionGuest._normalize_and_hash(guest_data["guest_last_name"]),
|
||||
"hashed_email": ConversionGuest._normalize_and_hash(guest_data["guest_email"]),
|
||||
"hashed_country_code": ConversionGuest._normalize_and_hash(guest_data["guest_country_code"]),
|
||||
"hashed_birth_date": ConversionGuest._normalize_and_hash(
|
||||
guest_data["guest_birth_date"].isoformat() if guest_data["guest_birth_date"] else None
|
||||
),
|
||||
"first_seen": now,
|
||||
"last_seen": now,
|
||||
})
|
||||
for (hotel_id, guest_id), validated_guest in batch_items:
|
||||
# Convert validated Pydantic model to dict for insertion
|
||||
# (all validations and hash calculations are already done)
|
||||
values_list.append(validated_guest.model_dump())
|
||||
|
||||
# Use PostgreSQL ON CONFLICT DO UPDATE for atomic upsert
|
||||
stmt = pg_insert(ConversionGuest).values(values_list)
|
||||
@@ -252,9 +264,13 @@ class ConversionService:
|
||||
stats["deleted_reservations"] += 1
|
||||
pms_reservation_id_str = deleted_res.get("ID")
|
||||
try:
|
||||
pms_reservation_id = int(pms_reservation_id_str) if pms_reservation_id_str else None
|
||||
pms_reservation_id = (
|
||||
int(pms_reservation_id_str) if pms_reservation_id_str else None
|
||||
)
|
||||
if pms_reservation_id is None:
|
||||
_LOGGER.warning("Deleted reservation missing ID attribute, skipping")
|
||||
_LOGGER.warning(
|
||||
"Deleted reservation missing ID attribute, skipping"
|
||||
)
|
||||
continue
|
||||
await self._handle_deleted_reservation(pms_reservation_id, session)
|
||||
await session.commit()
|
||||
@@ -295,7 +311,10 @@ class ConversionService:
|
||||
|
||||
# Phase 1: Bulk upsert all unique guests to database
|
||||
if guest_data_by_key:
|
||||
_LOGGER.debug("Phase 1: Bulk upserting %d unique guests to database", len(guest_data_by_key))
|
||||
_LOGGER.debug(
|
||||
"Phase 1: Bulk upserting %d unique guests to database",
|
||||
len(guest_data_by_key),
|
||||
)
|
||||
if self.session_maker:
|
||||
session = await self.session_maker.create_session()
|
||||
else:
|
||||
@@ -304,7 +323,9 @@ class ConversionService:
|
||||
try:
|
||||
await self._bulk_upsert_guests(session, guest_data_by_key)
|
||||
await session.commit()
|
||||
_LOGGER.info("Phase 1: Successfully upserted %d guests", len(guest_data_by_key))
|
||||
_LOGGER.info(
|
||||
"Phase 1: Successfully upserted %d guests", len(guest_data_by_key)
|
||||
)
|
||||
except Exception as e:
|
||||
await session.rollback()
|
||||
_LOGGER.exception("Phase 1: Error during bulk guest upsert: %s", e)
|
||||
@@ -318,9 +339,13 @@ class ConversionService:
|
||||
# Returns list of successfully created pms_reservation_ids
|
||||
_LOGGER.debug("Phase 2: Creating/updating conversions")
|
||||
if self.supports_concurrent:
|
||||
pms_reservation_ids = await self._process_reservations_concurrent(reservations, stats)
|
||||
pms_reservation_ids = await self._process_reservations_concurrent(
|
||||
reservations, stats
|
||||
)
|
||||
else:
|
||||
pms_reservation_ids = await self._process_reservations_sequential(reservations, stats)
|
||||
pms_reservation_ids = await self._process_reservations_sequential(
|
||||
reservations, stats
|
||||
)
|
||||
|
||||
_LOGGER.debug(
|
||||
"Phase 3: Found %d successfully created conversions out of %d total reservations",
|
||||
@@ -335,9 +360,13 @@ class ConversionService:
|
||||
if pms_reservation_ids:
|
||||
_LOGGER.debug("Phase 3: Matching conversions to reservations/customers")
|
||||
if self.supports_concurrent:
|
||||
await self._match_conversions_from_db_concurrent(pms_reservation_ids, stats)
|
||||
await self._match_conversions_from_db_concurrent(
|
||||
pms_reservation_ids, stats
|
||||
)
|
||||
else:
|
||||
await self._match_conversions_from_db_sequential(pms_reservation_ids, stats)
|
||||
await self._match_conversions_from_db_sequential(
|
||||
pms_reservation_ids, stats
|
||||
)
|
||||
|
||||
return stats
|
||||
|
||||
@@ -375,9 +404,12 @@ class ConversionService:
|
||||
try:
|
||||
# Load all reservations with their hashed customers in one query
|
||||
from sqlalchemy.orm import selectinload
|
||||
|
||||
query = select(Reservation).options(
|
||||
selectinload(Reservation.customer).selectinload(Customer.hashed_version),
|
||||
selectinload(Reservation.hashed_customer)
|
||||
selectinload(Reservation.customer).selectinload(
|
||||
Customer.hashed_version
|
||||
),
|
||||
selectinload(Reservation.hashed_customer),
|
||||
)
|
||||
result = await session.execute(query)
|
||||
reservations = result.scalars().all()
|
||||
@@ -420,6 +452,7 @@ class ConversionService:
|
||||
|
||||
Returns:
|
||||
List of pms_reservation_ids that were successfully created/updated
|
||||
|
||||
"""
|
||||
semaphore = asyncio.Semaphore(1) # Process one at a time
|
||||
results = []
|
||||
@@ -460,6 +493,7 @@ class ConversionService:
|
||||
|
||||
Returns:
|
||||
List of pms_reservation_ids that were successfully created/updated
|
||||
|
||||
"""
|
||||
if not self.session_maker:
|
||||
_LOGGER.error(
|
||||
@@ -568,16 +602,22 @@ class ConversionService:
|
||||
|
||||
"""
|
||||
if not self.hotel_id:
|
||||
_LOGGER.error("Cannot delete reservation: hotel_id not set in ConversionService")
|
||||
_LOGGER.error(
|
||||
"Cannot delete reservation: hotel_id not set in ConversionService"
|
||||
)
|
||||
return
|
||||
|
||||
_LOGGER.info("Processing deleted reservation: Hotel %s, PMS ID %s", self.hotel_id, pms_reservation_id)
|
||||
_LOGGER.info(
|
||||
"Processing deleted reservation: Hotel %s, PMS ID %s",
|
||||
self.hotel_id,
|
||||
pms_reservation_id,
|
||||
)
|
||||
|
||||
# Delete conversion records for this hotel + pms_reservation_id
|
||||
result = await session.execute(
|
||||
select(Conversion).where(
|
||||
Conversion.hotel_id == self.hotel_id,
|
||||
Conversion.pms_reservation_id == pms_reservation_id
|
||||
Conversion.pms_reservation_id == pms_reservation_id,
|
||||
)
|
||||
)
|
||||
conversions = result.scalars().all()
|
||||
@@ -684,7 +724,7 @@ class ConversionService:
|
||||
existing_result = await session.execute(
|
||||
select(Conversion).where(
|
||||
Conversion.hotel_id == hotel_id,
|
||||
Conversion.pms_reservation_id == pms_reservation_id
|
||||
Conversion.pms_reservation_id == pms_reservation_id,
|
||||
)
|
||||
)
|
||||
existing_conversion = existing_result.scalar_one_or_none()
|
||||
@@ -744,9 +784,7 @@ class ConversionService:
|
||||
|
||||
# Fetch ALL existing rooms for this conversion (not just the ones in current XML)
|
||||
existing_rooms_result = await session.execute(
|
||||
select(ConversionRoom).where(
|
||||
ConversionRoom.conversion_id == conversion.id
|
||||
)
|
||||
select(ConversionRoom).where(ConversionRoom.conversion_id == conversion.id)
|
||||
)
|
||||
existing_rooms = {
|
||||
room.pms_hotel_reservation_id: room
|
||||
@@ -841,7 +879,6 @@ class ConversionService:
|
||||
# Check if room reservation already exists using batch-loaded data
|
||||
existing_room_reservation = existing_rooms.get(pms_hotel_reservation_id)
|
||||
|
||||
|
||||
if existing_room_reservation:
|
||||
# Update existing room reservation with all fields
|
||||
existing_room_reservation.arrival_date = arrival_date
|
||||
@@ -910,7 +947,6 @@ class ConversionService:
|
||||
|
||||
return stats
|
||||
|
||||
|
||||
async def _match_by_advertising(
|
||||
self,
|
||||
advertising_campagne: str,
|
||||
@@ -1026,9 +1062,7 @@ class ConversionService:
|
||||
session = self.session
|
||||
|
||||
# Query all hashed customers that match the guest details
|
||||
query = select(HashedCustomer).options(
|
||||
selectinload(HashedCustomer.customer)
|
||||
)
|
||||
query = select(HashedCustomer).options(selectinload(HashedCustomer.customer))
|
||||
|
||||
# Build filter conditions
|
||||
conditions = []
|
||||
@@ -1165,6 +1199,7 @@ class ConversionService:
|
||||
|
||||
Returns:
|
||||
Dictionary mapping guest_id to matched HashedCustomer (or None if no match)
|
||||
|
||||
"""
|
||||
# Find all conversions that either:
|
||||
# - Have no match at all (reservation_id IS NULL AND customer_id IS NULL), OR
|
||||
@@ -1246,6 +1281,7 @@ class ConversionService:
|
||||
guest_to_hashed_customer: Mapping from guest_id to matched HashedCustomer
|
||||
session: AsyncSession for database queries
|
||||
stats: Shared stats dictionary to update
|
||||
|
||||
"""
|
||||
for guest_id, matched_hashed_customer in guest_to_hashed_customer.items():
|
||||
if not matched_hashed_customer or not matched_hashed_customer.customer_id:
|
||||
@@ -1259,7 +1295,10 @@ class ConversionService:
|
||||
(Conversion.guest_id == guest_id)
|
||||
& (Conversion.reservation_id.is_(None))
|
||||
)
|
||||
.options(selectinload(Conversion.conversion_rooms), selectinload(Conversion.guest))
|
||||
.options(
|
||||
selectinload(Conversion.conversion_rooms),
|
||||
selectinload(Conversion.guest),
|
||||
)
|
||||
)
|
||||
conversions = result.scalars().all()
|
||||
|
||||
@@ -1275,7 +1314,10 @@ class ConversionService:
|
||||
|
||||
# Try to link each conversion to a reservation for this customer
|
||||
for conversion in conversions:
|
||||
matched_reservation, is_attributable = await self._check_if_attributable(
|
||||
(
|
||||
matched_reservation,
|
||||
is_attributable,
|
||||
) = await self._check_if_attributable(
|
||||
matched_hashed_customer.customer_id, conversion, session
|
||||
)
|
||||
|
||||
@@ -1329,6 +1371,7 @@ class ConversionService:
|
||||
|
||||
Args:
|
||||
session: AsyncSession for database queries
|
||||
|
||||
"""
|
||||
# Get all ConversionGuests that have ANY customer link
|
||||
# This includes:
|
||||
@@ -1345,7 +1388,9 @@ class ConversionService:
|
||||
_LOGGER.debug("Phase 3d: No matched guests to check for regularity")
|
||||
return
|
||||
|
||||
_LOGGER.debug("Phase 3d: Checking regularity for %d matched guests", len(matched_guests))
|
||||
_LOGGER.debug(
|
||||
"Phase 3d: Checking regularity for %d matched guests", len(matched_guests)
|
||||
)
|
||||
|
||||
for conversion_guest in matched_guests:
|
||||
if not conversion_guest.hashed_customer_id:
|
||||
@@ -1528,12 +1573,15 @@ class ConversionService:
|
||||
pms_reservation_id: PMS reservation ID to match
|
||||
session: AsyncSession to use
|
||||
stats: Shared stats dictionary to update (optional)
|
||||
|
||||
"""
|
||||
if session is None:
|
||||
session = self.session
|
||||
|
||||
if not self.hotel_id:
|
||||
_LOGGER.error("Cannot match conversion: hotel_id not set in ConversionService")
|
||||
_LOGGER.error(
|
||||
"Cannot match conversion: hotel_id not set in ConversionService"
|
||||
)
|
||||
return
|
||||
|
||||
# Get the conversion from the database with related data
|
||||
@@ -1541,9 +1589,12 @@ class ConversionService:
|
||||
select(Conversion)
|
||||
.where(
|
||||
Conversion.hotel_id == self.hotel_id,
|
||||
Conversion.pms_reservation_id == pms_reservation_id
|
||||
Conversion.pms_reservation_id == pms_reservation_id,
|
||||
)
|
||||
.options(
|
||||
selectinload(Conversion.guest),
|
||||
selectinload(Conversion.conversion_rooms),
|
||||
)
|
||||
.options(selectinload(Conversion.guest), selectinload(Conversion.conversion_rooms))
|
||||
)
|
||||
conversion = result.scalar_one_or_none()
|
||||
|
||||
@@ -1601,9 +1652,7 @@ class ConversionService:
|
||||
conversion.reservation_id = (
|
||||
matched_reservation.id if matched_reservation else None
|
||||
)
|
||||
conversion.customer_id = (
|
||||
matched_customer.id if matched_customer else None
|
||||
)
|
||||
conversion.customer_id = matched_customer.id if matched_customer else None
|
||||
conversion.hashed_customer_id = (
|
||||
matched_hashed_customer.id if matched_hashed_customer else None
|
||||
)
|
||||
@@ -1647,6 +1696,7 @@ class ConversionService:
|
||||
guest_id: The guest ID to evaluate
|
||||
customer_id: The matched customer ID
|
||||
session: AsyncSession for database queries
|
||||
|
||||
"""
|
||||
# Get the ConversionGuest record
|
||||
guest_result = await session.execute(
|
||||
@@ -1670,7 +1720,9 @@ class ConversionService:
|
||||
.order_by(Conversion.reservation_date.asc())
|
||||
.limit(1)
|
||||
)
|
||||
earliest_paying_conversion = earliest_paying_conversion_result.scalar_one_or_none()
|
||||
earliest_paying_conversion = (
|
||||
earliest_paying_conversion_result.scalar_one_or_none()
|
||||
)
|
||||
|
||||
if not earliest_paying_conversion:
|
||||
# No paying conversions found for this guest
|
||||
@@ -1695,7 +1747,10 @@ class ConversionService:
|
||||
# (meaning they were already a customer before we sent them a reservation)
|
||||
# Compare against the reservation's creation date (when WE created/sent it), not check-in date
|
||||
# Convert created_at to date for comparison with reservation_date (both are dates)
|
||||
is_regular = earliest_paying_conversion.reservation_date < earliest_reservation.created_at.date()
|
||||
is_regular = (
|
||||
earliest_paying_conversion.reservation_date
|
||||
< earliest_reservation.created_at.date()
|
||||
)
|
||||
conversion_guest.is_regular = is_regular
|
||||
|
||||
if is_regular:
|
||||
@@ -1735,6 +1790,7 @@ class ConversionService:
|
||||
Tuple of (matched_reservation, is_attributable) where:
|
||||
- matched_reservation: The Reservation that matches (if any)
|
||||
- is_attributable: True if the reservation's dates match this conversion
|
||||
|
||||
"""
|
||||
# Check if conversion_room dates exist (criterion for attributability)
|
||||
if not conversion.conversion_rooms:
|
||||
@@ -1763,12 +1819,12 @@ class ConversionService:
|
||||
and reservation.end_date
|
||||
):
|
||||
# Check if dates match or mostly match (within 7 day tolerance)
|
||||
arrival_match = abs(
|
||||
(room.arrival_date - reservation.start_date).days
|
||||
) <= 7
|
||||
departure_match = abs(
|
||||
(room.departure_date - reservation.end_date).days
|
||||
) <= 7
|
||||
arrival_match = (
|
||||
abs((room.arrival_date - reservation.start_date).days) <= 7
|
||||
)
|
||||
departure_match = (
|
||||
abs((room.departure_date - reservation.end_date).days) <= 7
|
||||
)
|
||||
|
||||
if arrival_match and departure_match:
|
||||
_LOGGER.info(
|
||||
|
||||
Reference in New Issue
Block a user