New pydantic model for ConversionGuest

This commit is contained in:
Jonas Linter
2025-12-02 13:18:43 +01:00
parent ee80c57bcb
commit 56d67984cf
4 changed files with 282 additions and 141 deletions

View File

@@ -32,6 +32,7 @@ from sqlalchemy import and_, select, update
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.orm import selectinload
from alpine_bits_python.hotel_service import HotelService
from alpine_bits_python.schemas import WebhookRequestData
from .alpinebits_server import (
@@ -46,14 +47,15 @@ from .const import HttpStatusCode, WebhookStatus
from .conversion_service import ConversionService
from .csv_import import CSVImporter
from .db import Customer as DBCustomer
from .db import Reservation as DBReservation
from .db import (
Hotel,
ResilientAsyncSession,
SessionMaker,
WebhookEndpoint,
WebhookRequest,
create_database_engine,
)
from .db import Reservation as DBReservation
from .db_setup import run_startup_tasks
from .email_monitoring import ReservationStatsCollector
from .email_service import create_email_service
@@ -890,8 +892,6 @@ async def handle_webhook_unified(
webhook_request.status = WebhookStatus.PROCESSING
webhook_request.processing_started_at = timestamp
else:
webhook_request_data = WebhookRequestData(
payload_hash=payload_hash,
webhook_endpoint_id=webhook_endpoint.id,
@@ -905,7 +905,7 @@ async def handle_webhook_unified(
)
# 5. Create new webhook_request
webhook_request = WebhookRequest(**webhook_request_data.model_dump())
db_session.add(webhook_request)
await db_session.flush()
@@ -1134,6 +1134,7 @@ async def _process_conversion_xml_background(
filename: str,
session_maker: SessionMaker,
log_filename: Path,
hotel: Hotel,
):
"""Background task to process conversion XML.
@@ -1162,7 +1163,7 @@ async def _process_conversion_xml_background(
# Now process the conversion XML
_LOGGER.info("Starting database processing of %s", filename)
conversion_service = ConversionService(session_maker)
conversion_service = ConversionService(session_maker, hotel.hotel_id)
processing_stats = await conversion_service.process_conversion_xml(xml_content)
_LOGGER.info(
@@ -1250,6 +1251,10 @@ async def handle_xml_upload(
extension = Path(filename).suffix or ".xml"
log_filename = logs_dir / f"{base_filename}_{username}_{timestamp}{extension}"
hotel_service = HotelService(db_session)
hotel = await hotel_service.get_hotel_by_username(username)
_LOGGER.info(
"XML file queued for processing: %s by user %s (original: %s)",
log_filename,
@@ -1266,6 +1271,7 @@ async def handle_xml_upload(
filename,
session_maker,
log_filename,
hotel,
)
response_headers = {

View File

@@ -1,13 +1,12 @@
"""Service for handling conversion data from hotel PMS XML files."""
import asyncio
import hashlib
import xml.etree.ElementTree as ET
from datetime import UTC, datetime
from decimal import Decimal
from typing import Any
from sqlalchemy import insert, or_, select
from sqlalchemy import or_, select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
@@ -22,6 +21,7 @@ from .db import (
SessionMaker,
)
from .logging_config import get_logger
from .schemas import ConversionGuestData
_LOGGER = get_logger(__name__)
@@ -37,7 +37,11 @@ class ConversionService:
2. Concurrent mode: SessionMaker passed in, creates independent sessions per task
"""
def __init__(self, session: AsyncSession | SessionMaker | None = None, hotel_id: str | None = None):
def __init__(
self,
session: AsyncSession | SessionMaker | None = None,
hotel_id: str | None = None,
):
"""Initialize the ConversionService.
Args:
@@ -81,17 +85,19 @@ class ConversionService:
async def _extract_unique_guests_from_xml(
self, reservations: list
) -> dict[tuple[str, str | None], dict]:
) -> dict[tuple[str, int], ConversionGuestData]:
"""Extract and deduplicate all guest data from XML reservations.
Phase 0: Single pass through XML to collect all unique guests.
Uses (hotel_id, guest_id) as the key for deduplication.
Validates each guest using Pydantic before storing.
Args:
reservations: List of XML reservation elements
Returns:
Dictionary mapping (hotel_id, guest_id) to guest data dict
Dictionary mapping (hotel_id, guest_id) to validated ConversionGuestData
"""
guest_data_by_key = {}
now = datetime.now(UTC)
@@ -101,7 +107,10 @@ class ConversionService:
guest_elem = reservation_elem.find("guest")
if guest_elem is None:
_LOGGER.debug("No guest element found, skipping reservation %s (will be created with guest_id=None in Phase 2)", reservation_elem.get("id"))
_LOGGER.debug(
"No guest element found, skipping reservation %s (will be created with guest_id=None in Phase 2)",
reservation_elem.get("id"),
)
continue
guest_id = guest_elem.get("id")
@@ -111,33 +120,50 @@ class ConversionService:
guest_country_code = guest_elem.get("countryCode")
guest_birth_date_str = guest_elem.get("dateOfBirth")
guest_birth_date = None
if guest_birth_date_str:
try:
guest_birth_date = datetime.strptime(guest_birth_date_str, "%Y-%m-%d").date()
guest_birth_date = datetime.strptime(
guest_birth_date_str, "%Y-%m-%d"
).date()
except ValueError:
_LOGGER.warning("Invalid birth date format: %s", guest_birth_date_str)
_LOGGER.warning(
"Invalid birth date format: %s", guest_birth_date_str
)
key = (hotel_id, guest_id)
# Validate guest data with Pydantic during extraction
try:
validated_guest = ConversionGuestData(
hotel_id=hotel_id,
guest_id=guest_id, # Will be validated and converted to int
guest_first_name=guest_first_name,
guest_last_name=guest_last_name,
guest_email=guest_email,
guest_country_code=guest_country_code,
guest_birth_date=guest_birth_date,
first_seen=now,
last_seen=now,
)
# Store guest data by key (will keep the last occurrence from XML)
guest_data_by_key[key] = {
"hotel_id": hotel_id,
"guest_id": guest_id,
"guest_first_name": guest_first_name,
"guest_last_name": guest_last_name,
"guest_email": guest_email,
"guest_country_code": guest_country_code,
"guest_birth_date": guest_birth_date,
"now": now,
}
# Use validated guest_id (now an int) for the key
key = (hotel_id, validated_guest.guest_id)
# Store validated guest data (will keep the last occurrence from XML)
guest_data_by_key[key] = validated_guest
except ValueError:
_LOGGER.exception(
"Failed to validate guest data for reservation %s",
reservation_elem.get("id"),
)
continue
return guest_data_by_key
async def _bulk_upsert_guests(
self, session: AsyncSession, guest_data_by_key: dict[tuple[str, str | None], dict]
self,
session: AsyncSession,
guest_data_by_key: dict[tuple[str, int], ConversionGuestData],
) -> None:
"""Bulk upsert all unique guests to database using PostgreSQL ON CONFLICT.
@@ -147,7 +173,9 @@ class ConversionService:
Args:
session: AsyncSession to use
guest_data_by_key: Dictionary mapping (hotel_id, guest_id) to guest data
guest_data_by_key: Dictionary mapping (hotel_id, guest_id) to
validated ConversionGuestData
"""
if not guest_data_by_key:
return
@@ -160,28 +188,12 @@ class ConversionService:
batch_end = min(batch_start + batch_size, len(items))
batch_items = items[batch_start:batch_end]
# Prepare list of values for this batch
# Prepare list of values for this batch (already validated)
values_list = []
for (hotel_id, guest_id), guest_data in batch_items:
now = guest_data["now"]
values_list.append({
"hotel_id": guest_data["hotel_id"],
"guest_id": guest_data["guest_id"],
"guest_first_name": guest_data["guest_first_name"],
"guest_last_name": guest_data["guest_last_name"],
"guest_email": guest_data["guest_email"],
"guest_country_code": guest_data["guest_country_code"],
"guest_birth_date": guest_data["guest_birth_date"],
"hashed_first_name": ConversionGuest._normalize_and_hash(guest_data["guest_first_name"]),
"hashed_last_name": ConversionGuest._normalize_and_hash(guest_data["guest_last_name"]),
"hashed_email": ConversionGuest._normalize_and_hash(guest_data["guest_email"]),
"hashed_country_code": ConversionGuest._normalize_and_hash(guest_data["guest_country_code"]),
"hashed_birth_date": ConversionGuest._normalize_and_hash(
guest_data["guest_birth_date"].isoformat() if guest_data["guest_birth_date"] else None
),
"first_seen": now,
"last_seen": now,
})
for (hotel_id, guest_id), validated_guest in batch_items:
# Convert validated Pydantic model to dict for insertion
# (all validations and hash calculations are already done)
values_list.append(validated_guest.model_dump())
# Use PostgreSQL ON CONFLICT DO UPDATE for atomic upsert
stmt = pg_insert(ConversionGuest).values(values_list)
@@ -252,9 +264,13 @@ class ConversionService:
stats["deleted_reservations"] += 1
pms_reservation_id_str = deleted_res.get("ID")
try:
pms_reservation_id = int(pms_reservation_id_str) if pms_reservation_id_str else None
pms_reservation_id = (
int(pms_reservation_id_str) if pms_reservation_id_str else None
)
if pms_reservation_id is None:
_LOGGER.warning("Deleted reservation missing ID attribute, skipping")
_LOGGER.warning(
"Deleted reservation missing ID attribute, skipping"
)
continue
await self._handle_deleted_reservation(pms_reservation_id, session)
await session.commit()
@@ -295,7 +311,10 @@ class ConversionService:
# Phase 1: Bulk upsert all unique guests to database
if guest_data_by_key:
_LOGGER.debug("Phase 1: Bulk upserting %d unique guests to database", len(guest_data_by_key))
_LOGGER.debug(
"Phase 1: Bulk upserting %d unique guests to database",
len(guest_data_by_key),
)
if self.session_maker:
session = await self.session_maker.create_session()
else:
@@ -304,7 +323,9 @@ class ConversionService:
try:
await self._bulk_upsert_guests(session, guest_data_by_key)
await session.commit()
_LOGGER.info("Phase 1: Successfully upserted %d guests", len(guest_data_by_key))
_LOGGER.info(
"Phase 1: Successfully upserted %d guests", len(guest_data_by_key)
)
except Exception as e:
await session.rollback()
_LOGGER.exception("Phase 1: Error during bulk guest upsert: %s", e)
@@ -318,9 +339,13 @@ class ConversionService:
# Returns list of successfully created pms_reservation_ids
_LOGGER.debug("Phase 2: Creating/updating conversions")
if self.supports_concurrent:
pms_reservation_ids = await self._process_reservations_concurrent(reservations, stats)
pms_reservation_ids = await self._process_reservations_concurrent(
reservations, stats
)
else:
pms_reservation_ids = await self._process_reservations_sequential(reservations, stats)
pms_reservation_ids = await self._process_reservations_sequential(
reservations, stats
)
_LOGGER.debug(
"Phase 3: Found %d successfully created conversions out of %d total reservations",
@@ -335,9 +360,13 @@ class ConversionService:
if pms_reservation_ids:
_LOGGER.debug("Phase 3: Matching conversions to reservations/customers")
if self.supports_concurrent:
await self._match_conversions_from_db_concurrent(pms_reservation_ids, stats)
await self._match_conversions_from_db_concurrent(
pms_reservation_ids, stats
)
else:
await self._match_conversions_from_db_sequential(pms_reservation_ids, stats)
await self._match_conversions_from_db_sequential(
pms_reservation_ids, stats
)
return stats
@@ -375,9 +404,12 @@ class ConversionService:
try:
# Load all reservations with their hashed customers in one query
from sqlalchemy.orm import selectinload
query = select(Reservation).options(
selectinload(Reservation.customer).selectinload(Customer.hashed_version),
selectinload(Reservation.hashed_customer)
selectinload(Reservation.customer).selectinload(
Customer.hashed_version
),
selectinload(Reservation.hashed_customer),
)
result = await session.execute(query)
reservations = result.scalars().all()
@@ -420,6 +452,7 @@ class ConversionService:
Returns:
List of pms_reservation_ids that were successfully created/updated
"""
semaphore = asyncio.Semaphore(1) # Process one at a time
results = []
@@ -460,6 +493,7 @@ class ConversionService:
Returns:
List of pms_reservation_ids that were successfully created/updated
"""
if not self.session_maker:
_LOGGER.error(
@@ -568,16 +602,22 @@ class ConversionService:
"""
if not self.hotel_id:
_LOGGER.error("Cannot delete reservation: hotel_id not set in ConversionService")
_LOGGER.error(
"Cannot delete reservation: hotel_id not set in ConversionService"
)
return
_LOGGER.info("Processing deleted reservation: Hotel %s, PMS ID %s", self.hotel_id, pms_reservation_id)
_LOGGER.info(
"Processing deleted reservation: Hotel %s, PMS ID %s",
self.hotel_id,
pms_reservation_id,
)
# Delete conversion records for this hotel + pms_reservation_id
result = await session.execute(
select(Conversion).where(
Conversion.hotel_id == self.hotel_id,
Conversion.pms_reservation_id == pms_reservation_id
Conversion.pms_reservation_id == pms_reservation_id,
)
)
conversions = result.scalars().all()
@@ -684,7 +724,7 @@ class ConversionService:
existing_result = await session.execute(
select(Conversion).where(
Conversion.hotel_id == hotel_id,
Conversion.pms_reservation_id == pms_reservation_id
Conversion.pms_reservation_id == pms_reservation_id,
)
)
existing_conversion = existing_result.scalar_one_or_none()
@@ -744,9 +784,7 @@ class ConversionService:
# Fetch ALL existing rooms for this conversion (not just the ones in current XML)
existing_rooms_result = await session.execute(
select(ConversionRoom).where(
ConversionRoom.conversion_id == conversion.id
)
select(ConversionRoom).where(ConversionRoom.conversion_id == conversion.id)
)
existing_rooms = {
room.pms_hotel_reservation_id: room
@@ -841,7 +879,6 @@ class ConversionService:
# Check if room reservation already exists using batch-loaded data
existing_room_reservation = existing_rooms.get(pms_hotel_reservation_id)
if existing_room_reservation:
# Update existing room reservation with all fields
existing_room_reservation.arrival_date = arrival_date
@@ -910,7 +947,6 @@ class ConversionService:
return stats
async def _match_by_advertising(
self,
advertising_campagne: str,
@@ -1026,9 +1062,7 @@ class ConversionService:
session = self.session
# Query all hashed customers that match the guest details
query = select(HashedCustomer).options(
selectinload(HashedCustomer.customer)
)
query = select(HashedCustomer).options(selectinload(HashedCustomer.customer))
# Build filter conditions
conditions = []
@@ -1165,6 +1199,7 @@ class ConversionService:
Returns:
Dictionary mapping guest_id to matched HashedCustomer (or None if no match)
"""
# Find all conversions that either:
# - Have no match at all (reservation_id IS NULL AND customer_id IS NULL), OR
@@ -1246,6 +1281,7 @@ class ConversionService:
guest_to_hashed_customer: Mapping from guest_id to matched HashedCustomer
session: AsyncSession for database queries
stats: Shared stats dictionary to update
"""
for guest_id, matched_hashed_customer in guest_to_hashed_customer.items():
if not matched_hashed_customer or not matched_hashed_customer.customer_id:
@@ -1259,7 +1295,10 @@ class ConversionService:
(Conversion.guest_id == guest_id)
& (Conversion.reservation_id.is_(None))
)
.options(selectinload(Conversion.conversion_rooms), selectinload(Conversion.guest))
.options(
selectinload(Conversion.conversion_rooms),
selectinload(Conversion.guest),
)
)
conversions = result.scalars().all()
@@ -1275,7 +1314,10 @@ class ConversionService:
# Try to link each conversion to a reservation for this customer
for conversion in conversions:
matched_reservation, is_attributable = await self._check_if_attributable(
(
matched_reservation,
is_attributable,
) = await self._check_if_attributable(
matched_hashed_customer.customer_id, conversion, session
)
@@ -1329,6 +1371,7 @@ class ConversionService:
Args:
session: AsyncSession for database queries
"""
# Get all ConversionGuests that have ANY customer link
# This includes:
@@ -1345,7 +1388,9 @@ class ConversionService:
_LOGGER.debug("Phase 3d: No matched guests to check for regularity")
return
_LOGGER.debug("Phase 3d: Checking regularity for %d matched guests", len(matched_guests))
_LOGGER.debug(
"Phase 3d: Checking regularity for %d matched guests", len(matched_guests)
)
for conversion_guest in matched_guests:
if not conversion_guest.hashed_customer_id:
@@ -1528,12 +1573,15 @@ class ConversionService:
pms_reservation_id: PMS reservation ID to match
session: AsyncSession to use
stats: Shared stats dictionary to update (optional)
"""
if session is None:
session = self.session
if not self.hotel_id:
_LOGGER.error("Cannot match conversion: hotel_id not set in ConversionService")
_LOGGER.error(
"Cannot match conversion: hotel_id not set in ConversionService"
)
return
# Get the conversion from the database with related data
@@ -1541,9 +1589,12 @@ class ConversionService:
select(Conversion)
.where(
Conversion.hotel_id == self.hotel_id,
Conversion.pms_reservation_id == pms_reservation_id
Conversion.pms_reservation_id == pms_reservation_id,
)
.options(
selectinload(Conversion.guest),
selectinload(Conversion.conversion_rooms),
)
.options(selectinload(Conversion.guest), selectinload(Conversion.conversion_rooms))
)
conversion = result.scalar_one_or_none()
@@ -1601,9 +1652,7 @@ class ConversionService:
conversion.reservation_id = (
matched_reservation.id if matched_reservation else None
)
conversion.customer_id = (
matched_customer.id if matched_customer else None
)
conversion.customer_id = matched_customer.id if matched_customer else None
conversion.hashed_customer_id = (
matched_hashed_customer.id if matched_hashed_customer else None
)
@@ -1647,6 +1696,7 @@ class ConversionService:
guest_id: The guest ID to evaluate
customer_id: The matched customer ID
session: AsyncSession for database queries
"""
# Get the ConversionGuest record
guest_result = await session.execute(
@@ -1670,7 +1720,9 @@ class ConversionService:
.order_by(Conversion.reservation_date.asc())
.limit(1)
)
earliest_paying_conversion = earliest_paying_conversion_result.scalar_one_or_none()
earliest_paying_conversion = (
earliest_paying_conversion_result.scalar_one_or_none()
)
if not earliest_paying_conversion:
# No paying conversions found for this guest
@@ -1695,7 +1747,10 @@ class ConversionService:
# (meaning they were already a customer before we sent them a reservation)
# Compare against the reservation's creation date (when WE created/sent it), not check-in date
# Convert created_at to date for comparison with reservation_date (both are dates)
is_regular = earliest_paying_conversion.reservation_date < earliest_reservation.created_at.date()
is_regular = (
earliest_paying_conversion.reservation_date
< earliest_reservation.created_at.date()
)
conversion_guest.is_regular = is_regular
if is_regular:
@@ -1735,6 +1790,7 @@ class ConversionService:
Tuple of (matched_reservation, is_attributable) where:
- matched_reservation: The Reservation that matches (if any)
- is_attributable: True if the reservation's dates match this conversion
"""
# Check if conversion_room dates exist (criterion for attributability)
if not conversion.conversion_rooms:
@@ -1763,12 +1819,12 @@ class ConversionService:
and reservation.end_date
):
# Check if dates match or mostly match (within 7 day tolerance)
arrival_match = abs(
(room.arrival_date - reservation.start_date).days
) <= 7
departure_match = abs(
(room.departure_date - reservation.end_date).days
) <= 7
arrival_match = (
abs((room.arrival_date - reservation.start_date).days) <= 7
)
departure_match = (
abs((room.departure_date - reservation.end_date).days) <= 7
)
if arrival_match and departure_match:
_LOGGER.info(

View File

@@ -455,6 +455,82 @@ class WebhookRequestData(BaseModel):
# Example usage in a service layer
class ConversionGuestData(BaseModel):
"""Validated conversion guest data from PMS XML.
Handles validation and hashing for guest records extracted from
hotel PMS conversion XML files.
"""
hotel_id: str = Field(..., min_length=1, max_length=50)
guest_id: int = Field(..., gt=0)
guest_first_name: str | None = Field(None, max_length=100)
guest_last_name: str | None = Field(None, max_length=100)
guest_email: str | None = Field(None, max_length=200)
guest_country_code: str | None = Field(None, max_length=10)
guest_birth_date: date | None = None
# Auto-calculated hashed fields
hashed_first_name: str | None = Field(None, max_length=64)
hashed_last_name: str | None = Field(None, max_length=64)
hashed_email: str | None = Field(None, max_length=64)
hashed_country_code: str | None = Field(None, max_length=64)
hashed_birth_date: str | None = Field(None, max_length=64)
# Timestamps
first_seen: datetime = Field(default_factory=lambda: datetime.now(UTC))
last_seen: datetime = Field(default_factory=lambda: datetime.now(UTC))
@staticmethod
def _normalize_and_hash(value: str | None) -> str | None:
"""Normalize and hash a value for privacy-preserving matching.
Uses the same logic as ConversionGuest._normalize_and_hash.
"""
if value is None or value == "":
return None
# Normalize: lowercase, strip whitespace
normalized = value.lower().strip()
if not normalized:
return None
# Hash with SHA256
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()
@model_validator(mode="after")
def calculate_hashes(self) -> "ConversionGuestData":
"""Auto-calculate hashed fields from plain text fields."""
if self.hashed_first_name is None:
self.hashed_first_name = self._normalize_and_hash(self.guest_first_name)
if self.hashed_last_name is None:
self.hashed_last_name = self._normalize_and_hash(self.guest_last_name)
if self.hashed_email is None:
self.hashed_email = self._normalize_and_hash(self.guest_email)
if self.hashed_country_code is None:
self.hashed_country_code = self._normalize_and_hash(self.guest_country_code)
if self.hashed_birth_date is None and self.guest_birth_date is not None:
self.hashed_birth_date = self._normalize_and_hash(
self.guest_birth_date.isoformat()
)
return self
@field_validator("guest_id", mode="before")
@classmethod
def convert_guest_id_to_int(cls, v: Any) -> int:
"""Convert guest_id to integer (handles string input from XML)."""
if v is None:
raise ValueError("guest_id cannot be None")
if isinstance(v, int):
return v
if isinstance(v, str):
try:
return int(v)
except ValueError as e:
raise ValueError(f"guest_id must be a valid integer, got: {v}") from e
raise ValueError(f"guest_id must be int or str, got: {type(v)}")
model_config = {"from_attributes": True}
class ReservationService:
"""Example service showing how to use Pydantic models with SQLAlchemy."""