Done but not really complete

This commit is contained in:
Jonas Linter
2025-11-17 10:32:26 +01:00
parent 0c37254317
commit 24067847b4
3 changed files with 218 additions and 125 deletions

View File

@@ -1,13 +1,15 @@
"""Service for handling conversion data from hotel PMS XML files.""" """Service for handling conversion data from hotel PMS XML files."""
import json
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
from datetime import datetime from datetime import datetime
from decimal import Decimal
from typing import Any from typing import Any
from sqlalchemy import or_, select from sqlalchemy import or_, select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from .db import Conversion, Customer, HashedCustomer, Reservation from .db import Conversion, RoomReservation, Customer, HashedCustomer, Reservation
from .logging_config import get_logger from .logging_config import get_logger
_LOGGER = get_logger(__name__) _LOGGER = get_logger(__name__)
@@ -199,6 +201,48 @@ class ConversionService:
) )
return stats return stats
# Create Conversion entry first (once per PMS reservation)
conversion = Conversion(
# Links to existing entities (nullable)
reservation_id=matched_reservation.id if matched_reservation else None,
customer_id=matched_customer.id if matched_customer else None,
hashed_customer_id=matched_hashed_customer.id
if matched_hashed_customer
else None,
# Reservation metadata
hotel_id=hotel_id,
pms_reservation_id=pms_reservation_id,
reservation_number=reservation_number,
reservation_date=reservation_date,
creation_time=creation_time,
reservation_type=reservation_type,
booking_channel=booking_channel,
# Guest information
guest_first_name=guest_first_name,
guest_last_name=guest_last_name,
guest_email=guest_email,
guest_country_code=guest_country_code,
# Advertising data
advertising_medium=advertising_medium,
advertising_partner=advertising_partner,
advertising_campagne=advertising_campagne,
# Metadata
created_at=datetime.now(),
updated_at=datetime.now(),
)
self.session.add(conversion)
# Update stats for the conversion record itself
if matched_reservation:
stats["matched_to_reservation"] += 1
if matched_customer:
stats["matched_to_customer"] += 1
if matched_hashed_customer:
stats["matched_to_hashed_customer"] += 1
if not any([matched_reservation, matched_customer, matched_hashed_customer]):
stats["unmatched"] += 1
# Process room reservations
for room_reservation in room_reservations.findall("roomReservation"): for room_reservation in room_reservations.findall("roomReservation"):
# Extract room reservation details # Extract room reservation details
arrival_str = room_reservation.get("arrival") arrival_str = room_reservation.get("arrival")
@@ -208,6 +252,7 @@ class ConversionService:
room_number = room_reservation.get("roomNumber") room_number = room_reservation.get("roomNumber")
adults_str = room_reservation.get("adults") adults_str = room_reservation.get("adults")
rate_plan_code = room_reservation.get("ratePlanCode") rate_plan_code = room_reservation.get("ratePlanCode")
connected_room_type = room_reservation.get("connectedRoomType")
arrival_date = None arrival_date = None
if arrival_str: if arrival_str:
@@ -232,53 +277,80 @@ class ConversionService:
except ValueError: except ValueError:
_LOGGER.warning("Invalid adults value: %s", adults_str) _LOGGER.warning("Invalid adults value: %s", adults_str)
# Process daily sales # Create composite ID for upsert: pms_reservation_id + room_number
daily_sales = room_reservation.find("dailySales") # This allows updating the same room reservation if it appears again
if daily_sales is None: pms_hotel_reservation_id = f"{pms_reservation_id}_{room_number}"
continue
for daily_sale in daily_sales.findall("dailySale"): # Process daily sales and extract total revenue
daily_sales_elem = room_reservation.find("dailySales")
daily_sales_list = []
total_revenue = Decimal("0")
if daily_sales_elem is not None:
for daily_sale in daily_sales_elem.findall("dailySale"):
stats["daily_sales_count"] += 1 stats["daily_sales_count"] += 1
# Extract daily sale data # Extract daily sale data
sale_date_str = daily_sale.get("date") sale_date_str = daily_sale.get("date")
sale_date = None daily_sale_obj = {}
if sale_date_str:
try:
sale_date = datetime.strptime(
sale_date_str, "%Y-%m-%d"
).date()
except ValueError:
_LOGGER.warning("Invalid sale date format: %s", sale_date_str)
# Create conversion record if sale_date_str:
conversion = Conversion( daily_sale_obj["date"] = sale_date_str
# Links to existing entities (nullable)
reservation_id=matched_reservation.id # Extract all revenue fields
if matched_reservation revenue_total_str = daily_sale.get("revenueTotal")
else None, if revenue_total_str:
customer_id=matched_customer.id if matched_customer else None, daily_sale_obj["revenueTotal"] = revenue_total_str
hashed_customer_id=matched_hashed_customer.id try:
if matched_hashed_customer total_revenue += Decimal(revenue_total_str)
else None, except (ValueError, TypeError):
# Reservation metadata _LOGGER.warning(
hotel_id=hotel_id, "Invalid revenueTotal value: %s", revenue_total_str
pms_reservation_id=pms_reservation_id, )
reservation_number=reservation_number,
reservation_date=reservation_date, # Add other revenue fields if present
creation_time=creation_time, if daily_sale.get("revenueLogis"):
reservation_type=reservation_type, daily_sale_obj["revenueLogis"] = daily_sale.get("revenueLogis")
booking_channel=booking_channel, if daily_sale.get("revenueBoard"):
# Guest information daily_sale_obj["revenueBoard"] = daily_sale.get("revenueBoard")
guest_first_name=guest_first_name, if daily_sale.get("revenueFB"):
guest_last_name=guest_last_name, daily_sale_obj["revenueFB"] = daily_sale.get("revenueFB")
guest_email=guest_email, if daily_sale.get("revenueSpa"):
guest_country_code=guest_country_code, daily_sale_obj["revenueSpa"] = daily_sale.get("revenueSpa")
# Advertising data if daily_sale.get("revenueOther"):
advertising_medium=advertising_medium, daily_sale_obj["revenueOther"] = daily_sale.get("revenueOther")
advertising_partner=advertising_partner,
advertising_campagne=advertising_campagne, if daily_sale_obj: # Only add if has data
# Room reservation details daily_sales_list.append(daily_sale_obj)
# Try to find existing room reservation for upsert
existing_result = await self.session.execute(
select(RoomReservation).where(
RoomReservation.pms_hotel_reservation_id == pms_hotel_reservation_id
)
)
existing_room_reservation = existing_result.scalar_one_or_none()
if existing_room_reservation:
# Update existing room reservation
existing_room_reservation.room_status = room_status
existing_room_reservation.num_adults = num_adults
existing_room_reservation.daily_sales = daily_sales_list if daily_sales_list else None
existing_room_reservation.total_revenue = (
str(total_revenue) if total_revenue > 0 else None
)
existing_room_reservation.updated_at = datetime.now()
_LOGGER.debug(
"Updated room reservation %s (pms_id=%s, room=%s)",
existing_room_reservation.id,
pms_reservation_id,
room_number,
)
else:
# Create new room reservation
room_reservation_record = RoomReservation(
conversion_id=conversion.id,
pms_hotel_reservation_id=pms_hotel_reservation_id,
arrival_date=arrival_date, arrival_date=arrival_date,
departure_date=departure_date, departure_date=departure_date,
room_status=room_status, room_status=room_status,
@@ -286,31 +358,19 @@ class ConversionService:
room_number=room_number, room_number=room_number,
num_adults=num_adults, num_adults=num_adults,
rate_plan_code=rate_plan_code, rate_plan_code=rate_plan_code,
# Daily sale data connected_room_type=connected_room_type,
sale_date=sale_date, daily_sales=daily_sales_list if daily_sales_list else None,
revenue_total=daily_sale.get("revenueTotal"), total_revenue=str(total_revenue) if total_revenue > 0 else None,
revenue_logis=daily_sale.get("revenueLogis"),
revenue_board=daily_sale.get("revenueBoard"),
revenue_fb=daily_sale.get("revenueFB"),
revenue_spa=daily_sale.get("revenueSpa"),
revenue_other=daily_sale.get("revenueOther"),
# Metadata
created_at=datetime.now(), created_at=datetime.now(),
updated_at=datetime.now(),
)
self.session.add(room_reservation_record)
_LOGGER.debug(
"Created room reservation (pms_id=%s, room=%s, adults=%s)",
pms_reservation_id,
room_number,
num_adults,
) )
self.session.add(conversion)
# Update stats
if matched_reservation:
stats["matched_to_reservation"] += 1
if matched_customer:
stats["matched_to_customer"] += 1
if matched_hashed_customer:
stats["matched_to_hashed_customer"] += 1
if not any(
[matched_reservation, matched_customer, matched_hashed_customer]
):
stats["unmatched"] += 1
return stats return stats

View File

@@ -3,7 +3,7 @@ import hashlib
import os import os
from typing import Any, AsyncGenerator, Callable, TypeVar from typing import Any, AsyncGenerator, Callable, TypeVar
from sqlalchemy import Boolean, Column, Date, DateTime, ForeignKey, Integer, String from sqlalchemy import Boolean, Column, Date, DateTime, ForeignKey, Integer, String, JSON
from sqlalchemy.exc import DBAPIError from sqlalchemy.exc import DBAPIError
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine, async_sessionmaker from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import declarative_base, relationship from sqlalchemy.orm import declarative_base, relationship
@@ -337,11 +337,14 @@ class AckedRequest(Base):
class Conversion(Base): class Conversion(Base):
"""Conversion/daily sales data from hotel PMS. """Conversion data from hotel PMS.
Tracks actual sales revenue for reservations. Each row represents one day Represents a single reservation event from the PMS XML with all its metadata.
of a reservation stay. Linked to reservations via advertising tracking data Each row links to one reservation from the PMS system. A reservation can have
(fbclid, gclid, etc) stored in advertisingCampagne field. multiple room reservations (stored in RoomReservation table).
Linked to reservations via advertising tracking data (fbclid, gclid, etc)
stored in advertisingCampagne field.
""" """
__tablename__ = "conversions" __tablename__ = "conversions"
@@ -382,30 +385,66 @@ class Conversion(Base):
String, index=True String, index=True
) # advertisingCampagne (contains fbclid/gclid) ) # advertisingCampagne (contains fbclid/gclid)
# Room reservation details
arrival_date = Column(Date)
departure_date = Column(Date)
room_status = Column(String) # status attribute (e.g., "reserved", "checked-in")
room_type = Column(String) # roomType attribute
room_number = Column(String) # roomNumber attribute
num_adults = Column(Integer) # adults attribute
rate_plan_code = Column(String) # ratePlanCode attribute
# Daily sales data (one row per day)
sale_date = Column(Date, index=True) # date attribute from dailySale
revenue_total = Column(
String
) # revenueTotal - keeping as string to preserve decimals
revenue_logis = Column(String) # revenueLogis (accommodation)
revenue_board = Column(String) # revenueBoard (meal plan)
revenue_fb = Column(String) # revenueFB (food & beverage)
revenue_spa = Column(String) # revenueSpa
revenue_other = Column(String) # revenueOther
# Metadata # Metadata
created_at = Column(DateTime(timezone=True)) # When this record was imported created_at = Column(DateTime(timezone=True)) # When this record was imported
updated_at = Column(DateTime(timezone=True)) # When this record was last updated
# Relationships # Relationships
reservation = relationship("Reservation", backref="conversions") reservation = relationship("Reservation", backref="conversions")
customer = relationship("Customer", backref="conversions") customer = relationship("Customer", backref="conversions")
hashed_customer = relationship("HashedCustomer", backref="conversions") hashed_customer = relationship("HashedCustomer", backref="conversions")
room_reservations = relationship(
"RoomReservation", back_populates="conversion", cascade="all, delete-orphan"
)
class RoomReservation(Base):
"""Room reservation data from hotel PMS.
Represents a single room reservation within a conversion/PMS reservation.
One conversion can have multiple room reservations (e.g., customer books 3 rooms).
Daily sales are stored as a JSON blob with an extracted total_revenue field
for efficient querying.
"""
__tablename__ = "room_reservations"
id = Column(Integer, primary_key=True)
# Link to the parent conversion/PMS reservation
conversion_id = Column(
Integer, ForeignKey("conversions.id"), nullable=False, index=True
)
# Unique identifier for this room reservation (for upserts)
# Composite: pms_reservation_id + room_number
pms_hotel_reservation_id = Column(String, unique=True, index=True)
# Room reservation details
arrival_date = Column(Date, index=True) # arrival attribute
departure_date = Column(Date, index=True) # departure attribute
room_status = Column(String) # status attribute (e.g., "reserved", "departed")
room_type = Column(String) # roomType attribute (e.g., "VDS", "EZR")
room_number = Column(String, index=True) # roomNumber attribute
num_adults = Column(Integer) # adults attribute
rate_plan_code = Column(String) # ratePlanCode attribute
connected_room_type = Column(String) # connectedRoomType attribute
# Daily sales data stored as JSON
# Format: [
# {"date": "2021-10-09", "revenueTotal": "13.6", "revenueOther": "13.6"},
# {"date": "2021-10-10", "revenueTotal": "306.1", "revenueLogis": "254", ...},
# ...
# ]
daily_sales = Column(JSON, nullable=True) # JSON array of daily sales
# Extracted total revenue for efficient querying (sum of all revenue_total in daily_sales)
# Kept as string to preserve decimal precision
total_revenue = Column(String, nullable=True)
# Metadata
created_at = Column(DateTime(timezone=True)) # When this record was imported
updated_at = Column(DateTime(timezone=True)) # When this record was last updated
# Relationships
conversion = relationship("Conversion", back_populates="room_reservations")

View File

@@ -308,41 +308,35 @@ async def _backfill_acked_requests_username(engine: AsyncEngine, config: dict[st
_LOGGER.info("Backfill complete: %d acknowledgements updated with username", total_updated) _LOGGER.info("Backfill complete: %d acknowledgements updated with username", total_updated)
async def migrate_add_guest_fields_to_conversions(engine: AsyncEngine) -> None: async def migrate_normalize_conversions(engine: AsyncEngine) -> None:
"""Migration: Add guest information fields to conversions table. """Migration: Normalize conversions and room reservations structure.
This migration adds guest details from the PMS XML for improved matching: This migration redesigns the conversion data structure:
- guest_first_name: First name of the guest - conversions: One row per PMS reservation (with guest/advertising metadata)
- guest_last_name: Last name of the guest - room_reservations: One row per room reservation (linked to conversion)
- guest_email: Email address of the guest - daily_sales: JSON array of daily sales within each room reservation
- guest_country_code: Country code of the guest - total_revenue: Extracted sum of all daily sales for efficiency
These fields are indexed to support efficient matching when the same Old structure: One row per daily sale (denormalized, lots of duplication)
fbclid/gclid matches multiple reservations. New structure: One row per room reservation, daily sales as JSON with extracted total
Safe to run multiple times - will skip if columns already exist. This allows:
- Upserts on room reservations (same room doesn't get duplicated)
- Better tracking of room data separate from daily sales data
- Efficient querying via extracted total_revenue field
- All daily sales details preserved in JSON for analysis
The tables are created via Base.metadata.create_all() at startup.
Safe to run multiple times - idempotent.
""" """
_LOGGER.info("Running migration: add_guest_fields_to_conversions") _LOGGER.info("Running migration: normalize_conversions")
_LOGGER.info(
added_count = 0 "Conversion data structure redesigned: "
"conversions (1 per PMS reservation) + "
# Add each column if it doesn't exist "room_reservations (1 per room, daily_sales as JSON). "
if await add_column_if_not_exists(engine, "conversions", "guest_first_name", "VARCHAR"): "Tables created/updated via Base.metadata.create_all()"
added_count += 1 )
if await add_column_if_not_exists(engine, "conversions", "guest_last_name", "VARCHAR"):
added_count += 1
if await add_column_if_not_exists(engine, "conversions", "guest_email", "VARCHAR"):
added_count += 1
if await add_column_if_not_exists(engine, "conversions", "guest_country_code", "VARCHAR"):
added_count += 1
if added_count > 0:
_LOGGER.info("Migration add_guest_fields_to_conversions: Added %d columns", added_count)
else:
_LOGGER.info("Migration add_guest_fields_to_conversions: No changes needed (already applied)")
async def run_all_migrations(engine: AsyncEngine, config: dict[str, Any] | None = None) -> None: async def run_all_migrations(engine: AsyncEngine, config: dict[str, Any] | None = None) -> None:
@@ -362,7 +356,7 @@ async def run_all_migrations(engine: AsyncEngine, config: dict[str, Any] | None
await migrate_add_room_types(engine) await migrate_add_room_types(engine)
await migrate_add_advertising_account_ids(engine, config) await migrate_add_advertising_account_ids(engine, config)
await migrate_add_username_to_acked_requests(engine, config) await migrate_add_username_to_acked_requests(engine, config)
await migrate_add_guest_fields_to_conversions(engine) await migrate_normalize_conversions(engine)
_LOGGER.info("Database migrations completed successfully") _LOGGER.info("Database migrations completed successfully")