Started setting up conversion_imports. Not entirely sure how it ultimatly works. Need to grab some real data for a while first

This commit is contained in:
Jonas Linter
2025-10-22 15:19:17 +02:00
parent 76ab37f097
commit 81074d839a
5 changed files with 520 additions and 1 deletions

View File

@@ -0,0 +1,383 @@
"""Service for handling conversion data from hotel PMS XML files."""
import xml.etree.ElementTree as ET
from datetime import datetime
from typing import Any
from sqlalchemy import or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from .db import Conversion, Customer, HashedCustomer, Reservation
from .logging_config import get_logger
_LOGGER = get_logger(__name__)
class ConversionService:
"""Service for processing and storing conversion/daily sales data."""
def __init__(self, session: AsyncSession):
self.session = session
async def process_conversion_xml(self, xml_content: str) -> dict[str, Any]:
"""Parse conversion XML and save daily sales data to database.
Args:
xml_content: XML string containing reservation and daily sales data
Returns:
Dictionary with processing statistics
"""
try:
root = ET.fromstring(xml_content)
except ET.ParseError as e:
_LOGGER.error("Failed to parse conversion XML: %s", e)
raise ValueError(f"Invalid XML content: {e}") from e
stats = {
"total_reservations": 0,
"deleted_reservations": 0,
"total_daily_sales": 0,
"matched_to_reservation": 0,
"matched_to_customer": 0,
"matched_to_hashed_customer": 0,
"unmatched": 0,
"errors": 0,
}
# Process deleted reservations
for deleted_res in root.findall("Deletedreservation"):
stats["deleted_reservations"] += 1
pms_reservation_id = deleted_res.get("ID")
await self._handle_deleted_reservation(pms_reservation_id)
# Process active reservations
for reservation in root.findall("reservation"):
stats["total_reservations"] += 1
try:
reservation_stats = await self._process_reservation(reservation)
stats["total_daily_sales"] += reservation_stats["daily_sales_count"]
stats["matched_to_reservation"] += reservation_stats.get(
"matched_to_reservation", 0
)
stats["matched_to_customer"] += reservation_stats.get(
"matched_to_customer", 0
)
stats["matched_to_hashed_customer"] += reservation_stats.get(
"matched_to_hashed_customer", 0
)
stats["unmatched"] += reservation_stats.get("unmatched", 0)
except Exception as e:
_LOGGER.exception(
"Error processing reservation %s: %s",
reservation.get("id"),
e,
)
stats["errors"] += 1
await self.session.commit()
return stats
async def _handle_deleted_reservation(self, pms_reservation_id: str):
"""Handle deleted reservation by marking conversions as deleted or removing them."""
# For now, we'll just log it. You could add a 'deleted' flag to the Conversion table
# or actually delete the conversion records
_LOGGER.info(
"Processing deleted reservation: PMS ID %s", pms_reservation_id
)
# Option 1: Delete conversion records
result = await self.session.execute(
select(Conversion).where(
Conversion.pms_reservation_id == pms_reservation_id
)
)
conversions = result.scalars().all()
for conversion in conversions:
await self.session.delete(conversion)
if conversions:
_LOGGER.info(
"Deleted %d conversion records for PMS reservation %s",
len(conversions),
pms_reservation_id,
)
async def _process_reservation(
self, reservation_elem: ET.Element
) -> dict[str, int]:
"""Process a single reservation element and its daily sales.
Returns statistics about what was matched.
"""
stats = {
"daily_sales_count": 0,
"matched_to_reservation": 0,
"matched_to_customer": 0,
"matched_to_hashed_customer": 0,
"unmatched": 0,
}
# Extract reservation metadata
hotel_id = reservation_elem.get("hotelID")
pms_reservation_id = reservation_elem.get("id")
reservation_number = reservation_elem.get("number")
reservation_date_str = reservation_elem.get("date")
creation_time_str = reservation_elem.get("creationTime")
reservation_type = reservation_elem.get("type")
booking_channel = reservation_elem.get("bookingChannel")
# Advertising/tracking data
advertising_medium = reservation_elem.get("advertisingMedium")
advertising_partner = reservation_elem.get("advertisingPartner")
advertising_campagne = reservation_elem.get("advertisingCampagne")
# Parse dates
reservation_date = None
if reservation_date_str:
try:
reservation_date = datetime.strptime(
reservation_date_str, "%Y-%m-%d"
).date()
except ValueError:
_LOGGER.warning(
"Invalid reservation date format: %s", reservation_date_str
)
creation_time = None
if creation_time_str:
try:
creation_time = datetime.fromisoformat(
creation_time_str.replace("Z", "+00:00")
)
except ValueError:
_LOGGER.warning(
"Invalid creation time format: %s", creation_time_str
)
# Find matching reservation, customer, and hashed_customer using advertising data
matched_reservation = None
matched_customer = None
matched_hashed_customer = None
if advertising_campagne:
match_result = await self._find_matching_entities(
advertising_campagne, hotel_id, reservation_date
)
matched_reservation = match_result["reservation"]
matched_customer = match_result["customer"]
matched_hashed_customer = match_result["hashed_customer"]
# Process all room reservations
room_reservations = reservation_elem.find("roomReservations")
if room_reservations is None:
_LOGGER.warning(
"No roomReservations found for reservation %s", pms_reservation_id
)
return stats
for room_reservation in room_reservations.findall("roomReservation"):
# Extract room reservation details
arrival_str = room_reservation.get("arrival")
departure_str = room_reservation.get("departure")
room_status = room_reservation.get("status")
room_type = room_reservation.get("roomType")
room_number = room_reservation.get("roomNumber")
adults_str = room_reservation.get("adults")
rate_plan_code = room_reservation.get("ratePlanCode")
arrival_date = None
if arrival_str:
try:
arrival_date = datetime.strptime(arrival_str, "%Y-%m-%d").date()
except ValueError:
_LOGGER.warning("Invalid arrival date format: %s", arrival_str)
departure_date = None
if departure_str:
try:
departure_date = datetime.strptime(
departure_str, "%Y-%m-%d"
).date()
except ValueError:
_LOGGER.warning("Invalid departure date format: %s", departure_str)
num_adults = None
if adults_str:
try:
num_adults = int(adults_str)
except ValueError:
_LOGGER.warning("Invalid adults value: %s", adults_str)
# Process daily sales
daily_sales = room_reservation.find("dailySales")
if daily_sales is None:
continue
for daily_sale in daily_sales.findall("dailySale"):
stats["daily_sales_count"] += 1
# Extract daily sale data
sale_date_str = daily_sale.get("date")
sale_date = None
if sale_date_str:
try:
sale_date = datetime.strptime(
sale_date_str, "%Y-%m-%d"
).date()
except ValueError:
_LOGGER.warning("Invalid sale date format: %s", sale_date_str)
# Create conversion record
conversion = Conversion(
# Links to existing entities (nullable)
reservation_id=matched_reservation.id
if matched_reservation
else None,
customer_id=matched_customer.id if matched_customer else None,
hashed_customer_id=matched_hashed_customer.id
if matched_hashed_customer
else None,
# Reservation metadata
hotel_id=hotel_id,
pms_reservation_id=pms_reservation_id,
reservation_number=reservation_number,
reservation_date=reservation_date,
creation_time=creation_time,
reservation_type=reservation_type,
booking_channel=booking_channel,
# Advertising data
advertising_medium=advertising_medium,
advertising_partner=advertising_partner,
advertising_campagne=advertising_campagne,
# Room reservation details
arrival_date=arrival_date,
departure_date=departure_date,
room_status=room_status,
room_type=room_type,
room_number=room_number,
num_adults=num_adults,
rate_plan_code=rate_plan_code,
# Daily sale data
sale_date=sale_date,
revenue_total=daily_sale.get("revenueTotal"),
revenue_logis=daily_sale.get("revenueLogis"),
revenue_board=daily_sale.get("revenueBoard"),
revenue_fb=daily_sale.get("revenueFB"),
revenue_spa=daily_sale.get("revenueSpa"),
revenue_other=daily_sale.get("revenueOther"),
# Metadata
created_at=datetime.now(),
)
self.session.add(conversion)
# Update stats
if matched_reservation:
stats["matched_to_reservation"] += 1
if matched_customer:
stats["matched_to_customer"] += 1
if matched_hashed_customer:
stats["matched_to_hashed_customer"] += 1
if not any(
[matched_reservation, matched_customer, matched_hashed_customer]
):
stats["unmatched"] += 1
return stats
async def _find_matching_entities(
self,
advertising_campagne: str,
hotel_id: str | None,
reservation_date: Any,
) -> dict[str, Any]:
"""Find matching Reservation, Customer, and HashedCustomer using advertising data.
The advertisingCampagne field contains a truncated (64 char) version of
fbclid/gclid, so we use prefix matching.
Args:
advertising_campagne: Truncated tracking ID from conversion XML
hotel_id: Hotel ID for additional filtering
reservation_date: Reservation date for additional filtering
Returns:
Dictionary with 'reservation', 'customer', and 'hashed_customer' keys
"""
result = {
"reservation": None,
"customer": None,
"hashed_customer": None,
}
if not advertising_campagne:
return result
# Find reservations where fbclid or gclid starts with the truncated value
# Use LIKE for prefix matching since the XML contains truncated values
query = select(Reservation).where(
or_(
Reservation.fbclid.like(f"{advertising_campagne}%"),
Reservation.gclid.like(f"{advertising_campagne}%"),
Reservation.utm_campaign.like(f"{advertising_campagne}%"),
)
)
# Add hotel filter if available
if hotel_id:
query = query.where(Reservation.hotel_code == hotel_id)
# Execute query
db_result = await self.session.execute(query)
reservations = db_result.scalars().all()
if not reservations:
_LOGGER.debug(
"No matching reservation found for advertisingCampagne: %s",
advertising_campagne,
)
return result
if len(reservations) > 1:
_LOGGER.warning(
"Multiple reservations match advertisingCampagne %s (hotel=%s): found %d matches. Using first match.",
advertising_campagne,
hotel_id,
len(reservations),
)
# Use the first matching reservation
matched_reservation = reservations[0]
result["reservation"] = matched_reservation
# Get associated customer and hashed_customer
if matched_reservation.customer_id:
customer_query = select(Customer).where(
Customer.id == matched_reservation.customer_id
)
customer_result = await self.session.execute(customer_query)
result["customer"] = customer_result.scalar_one_or_none()
# Get hashed customer
if result["customer"]:
hashed_query = select(HashedCustomer).where(
HashedCustomer.customer_id == result["customer"].id
)
hashed_result = await self.session.execute(hashed_query)
result["hashed_customer"] = hashed_result.scalar_one_or_none()
_LOGGER.info(
"Matched conversion to reservation_id=%s, customer_id=%s, hashed_customer_id=%s (advertisingCampagne=%s)",
result["reservation"].id if result["reservation"] else None,
result["customer"].id if result["customer"] else None,
result["hashed_customer"].id if result["hashed_customer"] else None,
advertising_campagne,
)
return result