Matching guests works nicely

This commit is contained in:
Jonas Linter
2025-11-17 14:25:53 +01:00
parent 24067847b4
commit f3978381df
7 changed files with 339331 additions and 146 deletions

6
.gitignore vendored
View File

@@ -31,6 +31,12 @@ config/postgres.yaml
# ignore db # ignore db
alpinebits.db alpinebits.db
# ignore sql
*.sql
*.csv
# test output files # test output files
test_output.txt test_output.txt
output.xml output.xml

338636
config/alpinebits.log Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -2,54 +2,55 @@
# Use annotatedyaml for secrets and environment-specific overrides # Use annotatedyaml for secrets and environment-specific overrides
database: database:
url: "sqlite+aiosqlite:///alpinebits.db" # For local dev, use SQLite. For prod, override with PostgreSQL URL. url: "postgresql+asyncpg://meta_user:meta_password@localhost:5555/meta_insights"
# url: "postgresql+asyncpg://user:password@host:port/dbname" # Example for Postgres schema: "alpinebits"
# schema: "alpinebits" # Optional: PostgreSQL schema name (default: public)
# AlpineBits Python config # AlpineBits Python config
# Use annotatedyaml for secrets and environment-specific overrides # Use annotatedyaml for secrets and environment-specific overrides
logger:
level: "INFO" # Set to DEBUG for more verbose output
file: "config/alpinebits.log" # Log file path, or null for console only
server: server:
codecontext: "ADVERTISING" codecontext: "ADVERTISING"
code: 70597314 code: 70597314
companyname: "99tales Gmbh" companyname: "99tales Gmbh"
res_id_source_context: "99tales" res_id_source_context: "99tales"
logger:
level: "INFO" # Set to DEBUG for more verbose output
file: "alpinebits.log" # Log file path, or null for console only
alpine_bits_auth: alpine_bits_auth:
- hotel_id: "39054_001" - hotel_id: "39054_001"
hotel_name: "Bemelmans Post" hotel_name: "Bemelmans Post"
username: "bemelman" username: "bemelman"
password: !secret BEMELMANS_PASSWORD password: !secret BEMELMANS_PASSWORD
meta_account: null # Optional: Meta advertising account ID meta_account: "238334370765317"
google_account: null # Optional: Google advertising account ID google_account: "7581209925" # Optional: Meta advertising account ID
- hotel_id: "135" - hotel_id: "135"
hotel_name: "Testhotel" hotel_name: "Testhotel"
username: "sebastian" username: "sebastian"
password: !secret BOB_PASSWORD password: !secret BOB_PASSWORD
meta_account: null # Optional: Meta advertising account ID
google_account: null # Optional: Google advertising account ID
- hotel_id: "39052_001" - hotel_id: "39052_001"
hotel_name: "Jagthof Kaltern" hotel_name: "Jagthof Kaltern"
username: "jagthof" username: "jagthof"
password: !secret JAGTHOF_PASSWORD password: !secret JAGTHOF_PASSWORD
meta_account: null # Optional: Meta advertising account ID meta_account: "948363300784757"
google_account: null # Optional: Google advertising account ID google_account: "1951919786" # Optional: Meta advertising account ID
- hotel_id: "39040_001" - hotel_id: "39040_001"
hotel_name: "Residence Erika" hotel_name: "Residence Erika"
username: "erika" username: "erika"
password: !secret ERIKA_PASSWORD password: !secret ERIKA_PASSWORD
meta_account: null # Optional: Meta advertising account ID google_account: "6604634947"
google_account: null # Optional: Google advertising account ID
api_tokens: api_tokens:
- tLTI8wXF1OVEvUX7kdZRhSW3Qr5feBCz0mHo-kbnEp0 - tLTI8wXF1OVEvUX7kdZRhSW3Qr5feBCz0mHo-kbnEp0
# Email configuration (SMTP service config - kept for when port is unblocked) # Email configuration (SMTP service config - kept for when port is unblocked)
email: email:
# SMTP server configuration # SMTP server configuration
@@ -81,18 +82,18 @@ notifications:
#- type: "email" #- type: "email"
# address: "jonas@vaius.ai" # address: "jonas@vaius.ai"
- type: "pushover" - type: "pushover"
priority: 1 # Pushover priority: -2=lowest, -1=low, 0=normal, 1=high, 2=emergency priority: 0 # Pushover priority: -2=lowest, -1=low, 0=normal, 1=high, 2=emergency
# Daily report configuration (applies to all recipients) # Daily report configuration (applies to all recipients)
daily_report: daily_report:
enabled: true # Set to true to enable daily reports enabled: false # Set to true to enable daily reports
send_time: "08:00" # Time to send daily report (24h format, local time) send_time: "08:00" # Time to send daily report (24h format, local time)
include_stats: true # Include reservation/customer stats include_stats: true # Include reservation/customer stats
include_errors: true # Include error summary include_errors: true # Include error summary
# Error alert configuration (applies to all recipients) # Error alert configuration (applies to all recipients)
error_alerts: error_alerts:
enabled: true # Set to true to enable error alerts enabled: false # Set to true to enable error alerts
# Alert is sent immediately if threshold is reached # Alert is sent immediately if threshold is reached
error_threshold: 5 # Send immediate alert after N errors error_threshold: 5 # Send immediate alert after N errors
# Otherwise, alert is sent after buffer time expires # Otherwise, alert is sent after buffer time expires
@@ -103,3 +104,5 @@ notifications:
log_levels: log_levels:
- "ERROR" - "ERROR"
- "CRITICAL" - "CRITICAL"

View File

@@ -43,7 +43,7 @@ from .config_loader import load_config
from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT, HttpStatusCode from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT, HttpStatusCode
from .conversion_service import ConversionService from .conversion_service import ConversionService
from .customer_service import CustomerService from .customer_service import CustomerService
from .db import Base, ResilientAsyncSession, create_database_engine from .db import Base, ResilientAsyncSession, SessionMaker, create_database_engine
from .db import Customer as DBCustomer from .db import Customer as DBCustomer
from .db import Reservation as DBReservation from .db import Reservation as DBReservation
from .email_monitoring import ReservationStatsCollector from .email_monitoring import ReservationStatsCollector
@@ -295,9 +295,13 @@ async def lifespan(app: FastAPI):
# Create resilient session wrapper for automatic connection recovery # Create resilient session wrapper for automatic connection recovery
resilient_session = ResilientAsyncSession(AsyncSessionLocal, engine) resilient_session = ResilientAsyncSession(AsyncSessionLocal, engine)
# Create SessionMaker for concurrent processing
session_maker = SessionMaker(AsyncSessionLocal)
app.state.engine = engine app.state.engine = engine
app.state.async_sessionmaker = AsyncSessionLocal app.state.async_sessionmaker = AsyncSessionLocal
app.state.resilient_session = resilient_session app.state.resilient_session = resilient_session
app.state.session_maker = session_maker
app.state.config = config app.state.config = config
app.state.alpine_bits_server = AlpineBitsServer(config) app.state.alpine_bits_server = AlpineBitsServer(config)
app.state.event_dispatcher = event_dispatcher app.state.event_dispatcher = event_dispatcher
@@ -409,6 +413,17 @@ async def get_async_session(request: Request):
yield session yield session
def get_session_maker(request: Request) -> SessionMaker:
"""Get the SessionMaker for creating independent database sessions.
This dependency provides a SessionMaker that can be used to create
multiple independent sessions for concurrent processing tasks.
Useful for processing large datasets concurrently where each task
needs its own database transaction context.
"""
return request.app.state.session_maker
def get_resilient_session(request: Request) -> ResilientAsyncSession: def get_resilient_session(request: Request) -> ResilientAsyncSession:
"""Get the resilient session manager from app state. """Get the resilient session manager from app state.
@@ -1256,6 +1271,7 @@ async def handle_xml_upload(
filename: str, filename: str,
credentials_tupel: tuple = Depends(validate_basic_auth), credentials_tupel: tuple = Depends(validate_basic_auth),
db_session=Depends(get_async_session), db_session=Depends(get_async_session),
session_maker: SessionMaker = Depends(get_session_maker),
): ):
"""Endpoint for receiving XML files for conversion processing via PUT. """Endpoint for receiving XML files for conversion processing via PUT.
@@ -1344,7 +1360,9 @@ async def handle_xml_upload(
) )
# Process the conversion XML and save to database # Process the conversion XML and save to database
conversion_service = ConversionService(db_session) # Use SessionMaker for concurrent processing of large XML files
# This allows multiple reservations to be processed in parallel with independent sessions
conversion_service = ConversionService(session_maker)
processing_stats = await conversion_service.process_conversion_xml(xml_content) processing_stats = await conversion_service.process_conversion_xml(xml_content)
_LOGGER.info( _LOGGER.info(

View File

@@ -1,5 +1,6 @@
"""Service for handling conversion data from hotel PMS XML files.""" """Service for handling conversion data from hotel PMS XML files."""
import asyncio
import json import json
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
from datetime import datetime from datetime import datetime
@@ -8,18 +9,57 @@ from typing import Any
from sqlalchemy import or_, select from sqlalchemy import or_, select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from .db import Conversion, RoomReservation, Customer, HashedCustomer, Reservation from .db import Conversion, RoomReservation, Customer, HashedCustomer, Reservation, SessionMaker
from .logging_config import get_logger from .logging_config import get_logger
_LOGGER = get_logger(__name__) _LOGGER = get_logger(__name__)
# Limit concurrent reservation processing to avoid overwhelming the database
MAX_CONCURRENT_RESERVATIONS = 10
class ConversionService: class ConversionService:
"""Service for processing and storing conversion/daily sales data.""" """Service for processing and storing conversion/daily sales data.
def __init__(self, session: AsyncSession): Supports two modes of operation:
1. Sequential mode: Single AsyncSession passed in, uses sequential processing
2. Concurrent mode: SessionMaker passed in, creates independent sessions per task
"""
def __init__(self, session: AsyncSession | SessionMaker | None = None):
"""Initialize the ConversionService.
Args:
session: Can be either:
- AsyncSession: Single session for sequential processing
- SessionMaker: Factory for creating sessions in concurrent mode
- None: Not recommended, but allowed for subclassing
"""
self.session = None
self.session_maker = None
self.supports_concurrent = False
# Cache for reservation and customer data within a single XML processing run
# Maps hotel_code -> list of (reservation, customer) tuples
# This significantly speeds up matching when processing large XML files
self._reservation_cache: dict[str | None, list[tuple[Reservation, Customer | None]]] = {}
self._cache_initialized = False
if isinstance(session, SessionMaker):
self.session_maker = session
self.supports_concurrent = True
_LOGGER.info("ConversionService initialized in concurrent mode with SessionMaker")
elif isinstance(session, AsyncSession):
self.session = session self.session = session
self.supports_concurrent = False
_LOGGER.info("ConversionService initialized in sequential mode with single session")
elif session is not None:
raise TypeError(
f"session must be AsyncSession or SessionMaker, got {type(session)}"
)
async def process_conversion_xml(self, xml_content: str) -> dict[str, Any]: async def process_conversion_xml(self, xml_content: str) -> dict[str, Any]:
"""Parse conversion XML and save daily sales data to database. """Parse conversion XML and save daily sales data to database.
@@ -37,6 +77,9 @@ class ConversionService:
_LOGGER.error("Failed to parse conversion XML: %s", e) _LOGGER.error("Failed to parse conversion XML: %s", e)
raise ValueError(f"Invalid XML content: {e}") from e raise ValueError(f"Invalid XML content: {e}") from e
# Initialize cache for this XML processing run
await self._load_reservation_cache()
stats = { stats = {
"total_reservations": 0, "total_reservations": 0,
"deleted_reservations": 0, "deleted_reservations": 0,
@@ -48,18 +91,177 @@ class ConversionService:
"errors": 0, "errors": 0,
} }
# Get a session for deleted reservations processing
if self.session_maker:
session = await self.session_maker.create_session()
else:
session = self.session
# Process deleted reservations # Process deleted reservations
for deleted_res in root.findall("Deletedreservation"): for deleted_res in root.findall("Deletedreservation"):
stats["deleted_reservations"] += 1 stats["deleted_reservations"] += 1
pms_reservation_id = deleted_res.get("ID") pms_reservation_id = deleted_res.get("ID")
await self._handle_deleted_reservation(pms_reservation_id) try:
await self._handle_deleted_reservation(pms_reservation_id, session)
await session.commit()
except Exception as e:
await session.rollback()
_LOGGER.exception(
"Error deleting reservation %s: %s",
pms_reservation_id,
e,
)
stats["errors"] += 1
# Close session if created by SessionMaker
if self.session_maker:
await session.close()
# Process active reservations # Process active reservations
for reservation in root.findall("reservation"): reservations = root.findall("reservation")
stats["total_reservations"] += 1 stats["total_reservations"] = len(reservations)
if not reservations:
return stats
# Use concurrent processing if supported, otherwise sequential
if self.supports_concurrent:
await self._process_reservations_concurrent(reservations, stats)
else:
await self._process_reservations_sequential(reservations, stats)
return stats
async def _load_reservation_cache(self) -> None:
"""Load all reservations and customers into cache for fast matching.
This method is called once at the start of processing a large XML file.
It loads all reservations with their associated customers into an in-memory
cache organized by hotel_code. This avoids repeated database queries during
matching operations.
The cache structure:
- Key: hotel_code (str or None)
- Value: List of (reservation, customer) tuples
This is especially beneficial for large XML files with many reservations
where matching criteria is the same across multiple reservations.
"""
if self._cache_initialized:
_LOGGER.debug("Reservation cache already initialized, skipping reload")
return
# Get a session for loading the cache
if self.session_maker:
session = await self.session_maker.create_session()
close_session = True
else:
session = self.session
close_session = False
if not session:
_LOGGER.warning("No session available for cache loading")
return
try: try:
reservation_stats = await self._process_reservation(reservation) # Load all reservations with their customers in one query
query = select(Reservation).options(selectinload(Reservation.customer))
result = await session.execute(query)
reservations = result.scalars().all()
_LOGGER.info("Loaded %d reservations into cache", len(reservations))
# Organize by hotel_code for efficient lookup
for reservation in reservations:
hotel_code = reservation.hotel_code
if hotel_code not in self._reservation_cache:
self._reservation_cache[hotel_code] = []
self._reservation_cache[hotel_code].append(
(reservation, reservation.customer)
)
self._cache_initialized = True
_LOGGER.info(
"Reservation cache initialized with %d hotel codes",
len(self._reservation_cache),
)
except Exception as e:
_LOGGER.error("Failed to load reservation cache: %s", e)
# Cache remains empty, fall back to direct queries
self._cache_initialized = True
finally:
# Close session if we created it
if close_session:
await session.close()
async def _process_reservations_sequential(
self, reservations: list, stats: dict[str, int]
) -> None:
"""Process reservations one at a time (original behavior)."""
semaphore = asyncio.Semaphore(1) # Process one at a time
async with asyncio.TaskGroup() as tg:
for reservation in reservations:
tg.create_task(
self._process_reservation_safe(
reservation, semaphore, stats
)
)
async def _process_reservations_concurrent(
self, reservations: list, stats: dict[str, int]
) -> None:
"""Process reservations concurrently with semaphore limiting.
Each concurrent task gets its own independent database session
from the SessionMaker.
"""
if not self.session_maker:
_LOGGER.error(
"Concurrent processing requested but SessionMaker not available. "
"Falling back to sequential processing."
)
await self._process_reservations_sequential(reservations, stats)
return
semaphore = asyncio.Semaphore(MAX_CONCURRENT_RESERVATIONS)
async with asyncio.TaskGroup() as tg:
for reservation in reservations:
tg.create_task(
self._process_reservation_safe(
reservation, semaphore, stats
)
)
async def _process_reservation_safe(
self,
reservation_elem: Any,
semaphore: asyncio.Semaphore,
stats: dict[str, int],
) -> None:
"""Safely process a single reservation with semaphore and transaction management.
In concurrent mode, creates its own session from SessionMaker.
In sequential mode, uses the shared session.
Args:
reservation_elem: XML element for the reservation
semaphore: Semaphore to limit concurrent operations
stats: Shared stats dictionary (thread-safe due to GIL)
"""
pms_reservation_id = reservation_elem.get("id")
async with semaphore:
# In concurrent mode, create a new session for this task
if self.session_maker:
session = await self.session_maker.create_session()
else:
session = self.session
try:
# Process reservation with this task's session
reservation_stats = await self._process_reservation(
reservation_elem, session
)
stats["total_daily_sales"] += reservation_stats["daily_sales_count"] stats["total_daily_sales"] += reservation_stats["daily_sales_count"]
stats["matched_to_reservation"] += reservation_stats.get( stats["matched_to_reservation"] += reservation_stats.get(
"matched_to_reservation", 0 "matched_to_reservation", 0
@@ -72,19 +274,34 @@ class ConversionService:
) )
stats["unmatched"] += reservation_stats.get("unmatched", 0) stats["unmatched"] += reservation_stats.get("unmatched", 0)
# Commit this task's transaction
await session.commit()
_LOGGER.debug(
"Successfully processed and committed reservation %s",
pms_reservation_id,
)
except Exception as e: except Exception as e:
# Rollback this task's transaction
await session.rollback()
_LOGGER.exception( _LOGGER.exception(
"Error processing reservation %s: %s", "Error processing reservation %s: %s",
reservation.get("id"), pms_reservation_id,
e, e,
) )
stats["errors"] += 1 stats["errors"] += 1
finally:
# Close the session if it was created by SessionMaker
if self.session_maker:
await session.close()
await self.session.commit() async def _handle_deleted_reservation(self, pms_reservation_id: str, session: AsyncSession):
return stats """Handle deleted reservation by marking conversions as deleted or removing them.
async def _handle_deleted_reservation(self, pms_reservation_id: str): Args:
"""Handle deleted reservation by marking conversions as deleted or removing them.""" pms_reservation_id: PMS reservation ID to delete
session: AsyncSession to use for the operation
"""
# For now, we'll just log it. You could add a 'deleted' flag to the Conversion table # For now, we'll just log it. You could add a 'deleted' flag to the Conversion table
# or actually delete the conversion records # or actually delete the conversion records
_LOGGER.info( _LOGGER.info(
@@ -92,7 +309,7 @@ class ConversionService:
) )
# Option 1: Delete conversion records # Option 1: Delete conversion records
result = await self.session.execute( result = await session.execute(
select(Conversion).where( select(Conversion).where(
Conversion.pms_reservation_id == pms_reservation_id Conversion.pms_reservation_id == pms_reservation_id
) )
@@ -100,7 +317,7 @@ class ConversionService:
conversions = result.scalars().all() conversions = result.scalars().all()
for conversion in conversions: for conversion in conversions:
await self.session.delete(conversion) await session.delete(conversion)
if conversions: if conversions:
_LOGGER.info( _LOGGER.info(
@@ -110,12 +327,19 @@ class ConversionService:
) )
async def _process_reservation( async def _process_reservation(
self, reservation_elem: ET.Element self, reservation_elem: ET.Element, session: AsyncSession | None = None
) -> dict[str, int]: ) -> dict[str, int]:
"""Process a single reservation element and its daily sales. """Process a single reservation element and its daily sales.
Args:
reservation_elem: XML element to process
session: AsyncSession to use. If None, uses self.session.
In concurrent mode, each task passes its own session.
Returns statistics about what was matched. Returns statistics about what was matched.
""" """
if session is None:
session = self.session
stats = { stats = {
"daily_sales_count": 0, "daily_sales_count": 0,
"matched_to_reservation": 0, "matched_to_reservation": 0,
@@ -179,7 +403,7 @@ class ConversionService:
matched_customer = None matched_customer = None
matched_hashed_customer = None matched_hashed_customer = None
if advertising_campagne: if advertising_campagne or True:
match_result = await self._find_matching_entities( match_result = await self._find_matching_entities(
advertising_campagne, advertising_campagne,
hotel_id, hotel_id,
@@ -188,6 +412,7 @@ class ConversionService:
guest_last_name, guest_last_name,
guest_email, guest_email,
advertising_partner, advertising_partner,
session,
) )
matched_reservation = match_result["reservation"] matched_reservation = match_result["reservation"]
matched_customer = match_result["customer"] matched_customer = match_result["customer"]
@@ -196,12 +421,51 @@ class ConversionService:
# Process all room reservations # Process all room reservations
room_reservations = reservation_elem.find("roomReservations") room_reservations = reservation_elem.find("roomReservations")
if room_reservations is None: if room_reservations is None:
_LOGGER.warning( _LOGGER.debug(
"No roomReservations found for reservation %s", pms_reservation_id "No roomReservations found for reservation %s", pms_reservation_id
) )
return stats return stats
# Create Conversion entry first (once per PMS reservation) # Check if conversion already exists (upsert logic)
existing_result = await session.execute(
select(Conversion).where(
Conversion.pms_reservation_id == pms_reservation_id
)
)
existing_conversion = existing_result.scalar_one_or_none()
if existing_conversion:
# Update existing conversion
existing_conversion.reservation_id = (
matched_reservation.id if matched_reservation else None
)
existing_conversion.customer_id = (
matched_customer.id if matched_customer else None
)
existing_conversion.hashed_customer_id = (
matched_hashed_customer.id if matched_hashed_customer else None
)
existing_conversion.reservation_number = reservation_number
existing_conversion.reservation_date = reservation_date
existing_conversion.creation_time = creation_time
existing_conversion.reservation_type = reservation_type
existing_conversion.booking_channel = booking_channel
existing_conversion.guest_first_name = guest_first_name
existing_conversion.guest_last_name = guest_last_name
existing_conversion.guest_email = guest_email
existing_conversion.guest_country_code = guest_country_code
existing_conversion.advertising_medium = advertising_medium
existing_conversion.advertising_partner = advertising_partner
existing_conversion.advertising_campagne = advertising_campagne
existing_conversion.updated_at = datetime.now()
conversion = existing_conversion
_LOGGER.info(
"Updated conversion %s (pms_id=%s)",
conversion.id,
pms_reservation_id,
)
else:
# Create new conversion entry
conversion = Conversion( conversion = Conversion(
# Links to existing entities (nullable) # Links to existing entities (nullable)
reservation_id=matched_reservation.id if matched_reservation else None, reservation_id=matched_reservation.id if matched_reservation else None,
@@ -230,7 +494,14 @@ class ConversionService:
created_at=datetime.now(), created_at=datetime.now(),
updated_at=datetime.now(), updated_at=datetime.now(),
) )
self.session.add(conversion) session.add(conversion)
_LOGGER.debug(
"Created conversion (pms_id=%s)",
pms_reservation_id,
)
# Flush to ensure conversion has an ID before creating room reservations
await session.flush()
# Update stats for the conversion record itself # Update stats for the conversion record itself
if matched_reservation: if matched_reservation:
@@ -242,6 +513,27 @@ class ConversionService:
if not any([matched_reservation, matched_customer, matched_hashed_customer]): if not any([matched_reservation, matched_customer, matched_hashed_customer]):
stats["unmatched"] += 1 stats["unmatched"] += 1
# Batch-load existing room reservations to avoid N+1 queries
room_numbers = [
rm.get("roomNumber")
for rm in room_reservations.findall("roomReservation")
]
pms_hotel_reservation_ids = [
f"{pms_reservation_id}_{room_num}" for room_num in room_numbers
]
existing_rooms_result = await session.execute(
select(RoomReservation).where(
RoomReservation.pms_hotel_reservation_id.in_(
pms_hotel_reservation_ids
)
)
)
existing_rooms = {
room.pms_hotel_reservation_id: room
for room in existing_rooms_result.scalars().all()
}
# Process room reservations # Process room reservations
for room_reservation in room_reservations.findall("roomReservation"): for room_reservation in room_reservations.findall("roomReservation"):
# Extract room reservation details # Extract room reservation details
@@ -323,19 +615,21 @@ class ConversionService:
if daily_sale_obj: # Only add if has data if daily_sale_obj: # Only add if has data
daily_sales_list.append(daily_sale_obj) daily_sales_list.append(daily_sale_obj)
# Try to find existing room reservation for upsert # Check if room reservation already exists using batch-loaded data
existing_result = await self.session.execute( existing_room_reservation = existing_rooms.get(pms_hotel_reservation_id)
select(RoomReservation).where(
RoomReservation.pms_hotel_reservation_id == pms_hotel_reservation_id
)
)
existing_room_reservation = existing_result.scalar_one_or_none()
if existing_room_reservation: if existing_room_reservation:
# Update existing room reservation # Update existing room reservation with all fields
existing_room_reservation.arrival_date = arrival_date
existing_room_reservation.departure_date = departure_date
existing_room_reservation.room_status = room_status existing_room_reservation.room_status = room_status
existing_room_reservation.room_type = room_type
existing_room_reservation.num_adults = num_adults existing_room_reservation.num_adults = num_adults
existing_room_reservation.daily_sales = daily_sales_list if daily_sales_list else None existing_room_reservation.rate_plan_code = rate_plan_code
existing_room_reservation.connected_room_type = connected_room_type
existing_room_reservation.daily_sales = (
daily_sales_list if daily_sales_list else None
)
existing_room_reservation.total_revenue = ( existing_room_reservation.total_revenue = (
str(total_revenue) if total_revenue > 0 else None str(total_revenue) if total_revenue > 0 else None
) )
@@ -364,7 +658,7 @@ class ConversionService:
created_at=datetime.now(), created_at=datetime.now(),
updated_at=datetime.now(), updated_at=datetime.now(),
) )
self.session.add(room_reservation_record) session.add(room_reservation_record)
_LOGGER.debug( _LOGGER.debug(
"Created room reservation (pms_id=%s, room=%s, adults=%s)", "Created room reservation (pms_id=%s, room=%s, adults=%s)",
pms_reservation_id, pms_reservation_id,
@@ -383,38 +677,131 @@ class ConversionService:
guest_last_name: str | None = None, guest_last_name: str | None = None,
guest_email: str | None = None, guest_email: str | None = None,
advertising_partner: str | None = None, advertising_partner: str | None = None,
session: AsyncSession | None = None,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Find matching Reservation, Customer, and HashedCustomer using advertising data. """Find matching Reservation, Customer, and HashedCustomer.
The advertisingCampagne field contains a truncated (64 char) version of Uses two strategies:
fbclid/gclid, so we use prefix matching. When multiple matches exist, 1. Advertising data matching (fbclid/gclid/utm_campaign) with guest details fallback
uses guest details (first_name, last_name, email) and utm_medium 2. If no advertising data match, falls back to email/name-based matching
(matched against advertisingPartner) to narrow down to a single match.
Args: Args:
advertising_campagne: Truncated tracking ID from conversion XML advertising_campagne: Truncated tracking ID from conversion XML
hotel_id: Hotel ID for additional filtering hotel_id: Hotel ID for additional filtering
reservation_date: Reservation date for additional filtering reservation_date: Reservation date for additional filtering
guest_first_name: Guest first name for disambiguation guest_first_name: Guest first name for matching
guest_last_name: Guest last name for disambiguation guest_last_name: Guest last name for matching
guest_email: Guest email for disambiguation guest_email: Guest email for matching
advertising_partner: Partner info (matches utm_medium for additional filtering) advertising_partner: Partner info (matches utm_medium for additional filtering)
session: AsyncSession to use. If None, uses self.session.
Returns: Returns:
Dictionary with 'reservation', 'customer', and 'hashed_customer' keys Dictionary with 'reservation', 'customer', and 'hashed_customer' keys
""" """
if session is None:
session = self.session
result = { result = {
"reservation": None, "reservation": None,
"customer": None, "customer": None,
"hashed_customer": None, "hashed_customer": None,
} }
if not advertising_campagne: # Strategy 1: Try to match by advertising data (fbclid/gclid/utm_campaign)
if advertising_campagne:
matched_reservation = await self._match_by_advertising(
advertising_campagne,
hotel_id,
guest_first_name,
guest_last_name,
guest_email,
advertising_partner,
session,
)
if matched_reservation:
result["reservation"] = matched_reservation
_LOGGER.info(
"Matched conversion by advertising data (advertisingCampagne=%s, hotel=%s)",
advertising_campagne,
hotel_id,
)
else:
_LOGGER.debug(
"No match found by advertising data (advertisingCampagne=%s), "
"falling back to email/name matching",
advertising_campagne,
)
# Strategy 2: If no advertising match, try email/name-based matching
if not result["reservation"] and (guest_email or guest_first_name or guest_last_name):
matched_reservation = await self._match_by_guest_details(
hotel_id, guest_first_name, guest_last_name, guest_email, session
)
if matched_reservation:
result["reservation"] = matched_reservation
_LOGGER.info(
"Matched conversion by guest details (name=%s %s, email=%s, hotel=%s)",
guest_first_name,
guest_last_name,
guest_email,
hotel_id,
)
else:
_LOGGER.debug(
"No match found by guest details (name=%s %s, email=%s)",
guest_first_name,
guest_last_name,
guest_email,
)
# If we found a reservation, get its customer and hashed_customer
if result["reservation"]:
if result["reservation"].customer_id:
customer_query = select(Customer).where(
Customer.id == result["reservation"].customer_id
)
customer_result = await session.execute(customer_query)
result["customer"] = customer_result.scalar_one_or_none()
# Get hashed customer
if result["customer"]:
hashed_query = select(HashedCustomer).where(
HashedCustomer.customer_id == result["customer"].id
)
hashed_result = await session.execute(hashed_query)
result["hashed_customer"] = hashed_result.scalar_one_or_none()
return result return result
async def _match_by_advertising(
self,
advertising_campagne: str,
hotel_id: str | None,
guest_first_name: str | None,
guest_last_name: str | None,
guest_email: str | None,
advertising_partner: str | None,
session: AsyncSession | None = None,
) -> Reservation | None:
"""Match reservation by advertising tracking data (fbclid/gclid/utm_campaign).
Args:
advertising_campagne: Truncated tracking ID
hotel_id: Hotel ID for filtering
guest_first_name: Guest first name for disambiguation
guest_last_name: Guest last name for disambiguation
guest_email: Guest email for disambiguation
advertising_partner: Partner info (matches utm_medium)
session: AsyncSession to use. If None, uses self.session.
Returns:
Matched Reservation or None
"""
if session is None:
session = self.session
# Find reservations where fbclid or gclid starts with the truncated value # Find reservations where fbclid or gclid starts with the truncated value
# Use LIKE for prefix matching since the XML contains truncated values
query = select(Reservation).where( query = select(Reservation).where(
or_( or_(
Reservation.fbclid.like(f"{advertising_campagne}%"), Reservation.fbclid.like(f"{advertising_campagne}%"),
@@ -423,23 +810,25 @@ class ConversionService:
) )
) )
# Eagerly load the customer relationship
query = query.options(selectinload(Reservation.customer))
# Add hotel filter if available # Add hotel filter if available
if hotel_id: if hotel_id:
query = query.where(Reservation.hotel_code == hotel_id) query = query.where(Reservation.hotel_code == hotel_id)
# Execute query # Execute query
db_result = await self.session.execute(query) db_result = await session.execute(query)
reservations = db_result.scalars().all() reservations = db_result.scalars().all()
if not reservations: if not reservations:
_LOGGER.debug( return None
"No matching reservation found for advertisingCampagne: %s",
advertising_campagne,
)
return result
# If multiple matches, try to narrow down using guest details and advertising_partner # If single match, return it
if len(reservations) > 1: if len(reservations) == 1:
return reservations[0]
# If multiple matches, try to narrow down using guest details
_LOGGER.debug( _LOGGER.debug(
"Multiple reservations match advertisingCampagne %s (hotel=%s): found %d matches. " "Multiple reservations match advertisingCampagne %s (hotel=%s): found %d matches. "
"Attempting to narrow down using guest details.", "Attempting to narrow down using guest details.",
@@ -468,40 +857,145 @@ class ConversionService:
guest_email, guest_email,
) )
matched_reservation = reservations[0] matched_reservation = reservations[0]
else:
matched_reservation = reservations[0]
result["reservation"] = matched_reservation return matched_reservation
# Get associated customer and hashed_customer async def _match_by_guest_details(
if matched_reservation.customer_id: self,
customer_query = select(Customer).where( hotel_id: str | None,
Customer.id == matched_reservation.customer_id guest_first_name: str | None,
guest_last_name: str | None,
guest_email: str | None,
session: AsyncSession | None = None,
) -> Reservation | None:
"""Match reservation by guest name and email using cached data.
This method uses the reservation cache populated at the start of XML processing.
If cache is not available, falls back to database queries.
Args:
hotel_id: Hotel ID for filtering
guest_first_name: Guest first name
guest_last_name: Guest last name
guest_email: Guest email
session: AsyncSession to use. If None, uses self.session.
Returns:
Matched Reservation or None
"""
if session is None:
session = self.session
# Try to use cache first
if self._cache_initialized and self._reservation_cache:
all_reservations = []
# Get reservations from cache for this hotel
if hotel_id and hotel_id in self._reservation_cache:
all_reservations = [
res for res, _ in self._reservation_cache[hotel_id]
]
elif not hotel_id:
# If no hotel_id specified, use all cached reservations
for reservations_list in self._reservation_cache.values():
all_reservations.extend([res for res, _ in reservations_list])
if all_reservations:
_LOGGER.debug(
"Using cached reservations for matching (hotel=%s, count=%d)",
hotel_id,
len(all_reservations),
) )
customer_result = await self.session.execute(customer_query) return self._match_reservations_by_guest_details(
result["customer"] = customer_result.scalar_one_or_none() all_reservations,
# Get hashed customer
if result["customer"]:
hashed_query = select(HashedCustomer).where(
HashedCustomer.customer_id == result["customer"].id
)
hashed_result = await self.session.execute(hashed_query)
result["hashed_customer"] = hashed_result.scalar_one_or_none()
_LOGGER.info(
"Matched conversion to reservation_id=%s, customer_id=%s, hashed_customer_id=%s "
"(advertisingCampagne=%s, guest=%s %s, email=%s)",
result["reservation"].id if result["reservation"] else None,
result["customer"].id if result["customer"] else None,
result["hashed_customer"].id if result["hashed_customer"] else None,
advertising_campagne,
guest_first_name, guest_first_name,
guest_last_name, guest_last_name,
guest_email, guest_email,
) )
return result # Fallback: Query database if cache is not available or empty
_LOGGER.debug(
"Cache unavailable or empty, falling back to database query (hotel=%s)",
hotel_id,
)
query = select(Reservation).options(selectinload(Reservation.customer))
if hotel_id:
query = query.where(Reservation.hotel_code == hotel_id)
db_result = await session.execute(query)
all_reservations = db_result.scalars().all()
return self._match_reservations_by_guest_details(
all_reservations, guest_first_name, guest_last_name, guest_email
)
def _match_reservations_by_guest_details(
self,
reservations: list[Reservation],
guest_first_name: str | None,
guest_last_name: str | None,
guest_email: str | None,
) -> Reservation | None:
"""Match a reservation from a list by guest name and email (non-async).
Args:
reservations: List of reservations to search through
guest_first_name: Guest first name
guest_last_name: Guest last name
guest_email: Guest email
Returns:
Matched Reservation or None
"""
# Filter by guest details
candidates = []
for reservation in reservations:
customer = reservation.customer
if not customer:
continue
# Match by email (highest priority)
if guest_email:
if (
customer.email_address
and customer.email_address.lower() == guest_email.lower()
):
_LOGGER.info(
"Found exact email match for %s (reservation_id=%s)",
guest_email,
reservation.id,
)
candidates.append((reservation, 3)) # Highest score
continue
# Match by name (first + last)
if guest_first_name and guest_last_name:
first_match = (
customer.given_name
and customer.given_name.lower() == guest_first_name.lower()
)
last_match = (
customer.surname
and customer.surname.lower() == guest_last_name.lower()
)
if first_match and last_match:
_LOGGER.info(
"Found exact name match for %s %s (reservation_id=%s)",
guest_first_name,
guest_last_name,
reservation.id,
)
candidates.append((reservation, 2)) # Medium-high score
continue
# Return highest-scoring match
if candidates:
candidates.sort(key=lambda x: x[1], reverse=True)
return candidates[0][0]
return None
def _filter_reservations_by_guest_details( def _filter_reservations_by_guest_details(
self, self,

View File

@@ -187,6 +187,33 @@ class ResilientAsyncSession:
raise last_error raise last_error
class SessionMaker:
"""Factory for creating independent AsyncSession instances.
This class enables concurrent processing by allowing each task to create
and manage its own database session. Useful for processing large datasets
where concurrent execution is desired but each concurrent task needs its own
database transaction context.
"""
def __init__(self, async_sessionmaker_: async_sessionmaker[AsyncSession]):
"""Initialize the SessionMaker.
Args:
async_sessionmaker_: SQLAlchemy async_sessionmaker factory
"""
self.async_sessionmaker = async_sessionmaker_
async def create_session(self) -> AsyncSession:
"""Create a new independent AsyncSession.
Returns:
A new AsyncSession instance ready for use. Caller is responsible
for managing the session lifecycle (closing when done).
"""
return self.async_sessionmaker()
async def get_resilient_session( async def get_resilient_session(
resilient_session: "ResilientAsyncSession", resilient_session: "ResilientAsyncSession",
) -> AsyncGenerator[AsyncSession, None]: ) -> AsyncGenerator[AsyncSession, None]:
@@ -416,9 +443,10 @@ class RoomReservation(Base):
Integer, ForeignKey("conversions.id"), nullable=False, index=True Integer, ForeignKey("conversions.id"), nullable=False, index=True
) )
# Unique identifier for this room reservation (for upserts) # Identifier for this room reservation (for upserts)
# Composite: pms_reservation_id + room_number # Composite: pms_reservation_id + room_number
pms_hotel_reservation_id = Column(String, unique=True, index=True) # Note: Not globally unique - same room number can exist across different hotels
pms_hotel_reservation_id = Column(String, index=True)
# Room reservation details # Room reservation details
arrival_date = Column(Date, index=True) # arrival attribute arrival_date = Column(Date, index=True) # arrival attribute

View File

@@ -6,9 +6,9 @@ import os
import uvicorn import uvicorn
if __name__ == "__main__": if __name__ == "__main__":
db_path = "alpinebits.db" # Adjust path if needed # db_path = "alpinebits.db" # Adjust path if needed
if os.path.exists(db_path): # if os.path.exists(db_path):
os.remove(db_path) # os.remove(db_path)
uvicorn.run( uvicorn.run(
"alpine_bits_python.api:app", "alpine_bits_python.api:app",