Merge branch 'concurrency-fix' of https://gitea.99tales.net/jonas/alpinebits_python into concurrency-fix

This commit is contained in:
2025-12-01 10:15:58 +00:00
18 changed files with 3534 additions and 105 deletions

View File

@@ -0,0 +1,108 @@
"""Add hotel inventory and room availability tables
Revision ID: b2cfe2d3aabc
Revises: e7ee03d8f430
Create Date: 2025-11-27 12:00:00.000000
"""
from collections.abc import Sequence
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "b2cfe2d3aabc"
down_revision: str | Sequence[str] | None = "e7ee03d8f430"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
"""Upgrade schema with inventory and availability tables."""
op.create_table(
"hotel_inventory",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("hotel_id", sa.String(length=50), nullable=False),
sa.Column("inv_type_code", sa.String(length=8), nullable=False),
sa.Column("inv_code", sa.String(length=16), nullable=True),
sa.Column("room_name", sa.String(length=200), nullable=True),
sa.Column("max_occupancy", sa.Integer(), nullable=True),
sa.Column("source", sa.String(length=20), nullable=False),
sa.Column("first_seen", sa.DateTime(timezone=True), nullable=False),
sa.Column("last_updated", sa.DateTime(timezone=True), nullable=False),
sa.ForeignKeyConstraint(["hotel_id"], ["hotels.hotel_id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_hotel_inventory_hotel_id"),
"hotel_inventory",
["hotel_id"],
unique=False,
)
op.create_index(
op.f("ix_hotel_inventory_inv_type_code"),
"hotel_inventory",
["inv_type_code"],
unique=False,
)
op.create_index(
op.f("ix_hotel_inventory_inv_code"),
"hotel_inventory",
["inv_code"],
unique=False,
)
op.create_index(
"uq_hotel_inventory_unique_key",
"hotel_inventory",
["hotel_id", "inv_type_code", sa.text("COALESCE(inv_code, '')")],
unique=True,
)
op.create_table(
"room_availability",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("inventory_id", sa.Integer(), nullable=False),
sa.Column("date", sa.Date(), nullable=False),
sa.Column("count_type_2", sa.Integer(), nullable=True),
sa.Column("count_type_6", sa.Integer(), nullable=True),
sa.Column("count_type_9", sa.Integer(), nullable=True),
sa.Column("is_closing_season", sa.Boolean(), nullable=False, server_default=sa.false()),
sa.Column("last_updated", sa.DateTime(timezone=True), nullable=False),
sa.Column("update_type", sa.String(length=20), nullable=False),
sa.ForeignKeyConstraint(["inventory_id"], ["hotel_inventory.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("inventory_id", "date", name="uq_room_availability_unique_key"),
)
op.create_index(
op.f("ix_room_availability_inventory_id"),
"room_availability",
["inventory_id"],
unique=False,
)
op.create_index(
op.f("ix_room_availability_date"),
"room_availability",
["date"],
unique=False,
)
op.create_index(
"idx_room_availability_inventory_date",
"room_availability",
["inventory_id", "date"],
unique=False,
)
def downgrade() -> None:
"""Downgrade schema by removing availability tables."""
op.drop_index("idx_room_availability_inventory_date", table_name="room_availability")
op.drop_index(op.f("ix_room_availability_date"), table_name="room_availability")
op.drop_index(op.f("ix_room_availability_inventory_id"), table_name="room_availability")
op.drop_table("room_availability")
op.drop_index("uq_hotel_inventory_unique_key", table_name="hotel_inventory")
op.drop_index(op.f("ix_hotel_inventory_inv_code"), table_name="hotel_inventory")
op.drop_index(op.f("ix_hotel_inventory_inv_type_code"), table_name="hotel_inventory")
op.drop_index(op.f("ix_hotel_inventory_hotel_id"), table_name="hotel_inventory")
op.drop_table("hotel_inventory")

View File

@@ -86,6 +86,10 @@ class AlpineBitsActionName(Enum):
"action_OTA_HotelRatePlan_BaseRates",
"OTA_HotelRatePlan:BaseRates",
)
OTA_HOTEL_INV_COUNT_NOTIF_FREE_ROOMS = (
"action_OTA_HotelInvCountNotif",
"OTA_HotelInvCountNotif:FreeRooms",
)
def __init__(self, capability_name: str, request_name: str):
self.capability_name = capability_name
@@ -819,3 +823,7 @@ class AlpineBitsServer:
return False
return True
# Ensure FreeRoomsAction is registered with ServerCapabilities discovery
#from .free_rooms_action import FreeRoomsAction # noqa: E402,F401 disable for now

View File

@@ -32,6 +32,8 @@ from sqlalchemy import and_, select, update
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.orm import selectinload
from alpine_bits_python.schemas import WebhookRequestData
from .alpinebits_server import (
AlpineBitsActionName,
AlpineBitsClientInfo,
@@ -888,8 +890,9 @@ async def handle_webhook_unified(
webhook_request.status = WebhookStatus.PROCESSING
webhook_request.processing_started_at = timestamp
else:
# 5. Create new webhook_request
webhook_request = WebhookRequest(
webhook_request_data = WebhookRequestData(
payload_hash=payload_hash,
webhook_endpoint_id=webhook_endpoint.id,
hotel_id=webhook_endpoint.hotel_id,
@@ -900,6 +903,9 @@ async def handle_webhook_unified(
source_ip=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
)
# 5. Create new webhook_request
webhook_request = WebhookRequest(**webhook_request_data.model_dump())
db_session.add(webhook_request)
await db_session.flush()

View File

@@ -731,17 +731,10 @@ class ConversionService:
# Flush to ensure conversion has an ID before creating room reservations
await session.flush()
# Batch-load existing room reservations to avoid N+1 queries
room_numbers = [
rm.get("roomNumber") for rm in room_reservations.findall("roomReservation")
]
pms_hotel_reservation_ids = [
f"{pms_reservation_id}_{room_num}" for room_num in room_numbers
]
# Fetch ALL existing rooms for this conversion (not just the ones in current XML)
existing_rooms_result = await session.execute(
select(ConversionRoom).where(
ConversionRoom.pms_hotel_reservation_id.in_(pms_hotel_reservation_ids)
ConversionRoom.conversion_id == conversion.id
)
)
existing_rooms = {
@@ -749,6 +742,9 @@ class ConversionService:
for room in existing_rooms_result.scalars().all()
}
# Track which room IDs are present in the current XML
current_pms_hotel_reservation_ids = set()
# Process room reservations
for room_reservation in room_reservations.findall("roomReservation"):
# Extract room reservation details
@@ -786,6 +782,9 @@ class ConversionService:
# This allows updating the same room reservation if it appears again
pms_hotel_reservation_id = f"{pms_reservation_id}_{room_number}"
# Track this room as present in current XML
current_pms_hotel_reservation_ids.add(pms_hotel_reservation_id)
# Process daily sales and extract total revenue
daily_sales_elem = room_reservation.find("dailySales")
daily_sales_list = []
@@ -880,6 +879,24 @@ class ConversionService:
num_adults,
)
# Delete room entries that are no longer present in the current XML
# This handles cases where a reservation is updated and room numbers change
rooms_to_delete = [
room
for pms_id, room in existing_rooms.items()
if pms_id not in current_pms_hotel_reservation_ids
]
if rooms_to_delete:
for room in rooms_to_delete:
await session.delete(room)
_LOGGER.debug(
"Deleted room reservation %s (pms_id=%s, room=%s) - no longer in current XML",
room.id,
room.pms_hotel_reservation_id,
room.room_number,
)
return stats

View File

@@ -18,6 +18,8 @@ from sqlalchemy import (
Index,
Integer,
String,
UniqueConstraint,
func,
)
from sqlalchemy.exc import DBAPIError
from sqlalchemy.ext.asyncio import (
@@ -679,6 +681,66 @@ class ConversionRoom(Base):
conversion = relationship("Conversion", back_populates="conversion_rooms")
class HotelInventory(Base):
"""Room and category definitions synchronized via AlpineBits."""
__tablename__ = "hotel_inventory"
id = Column(Integer, primary_key=True)
hotel_id = Column(
String(50), ForeignKey("hotels.hotel_id", ondelete="CASCADE"), nullable=False, index=True
)
inv_type_code = Column(String(8), nullable=False, index=True)
inv_code = Column(String(16), nullable=True, index=True)
room_name = Column(String(200), nullable=True)
max_occupancy = Column(Integer, nullable=True)
source = Column(String(20), nullable=False)
first_seen = Column(DateTime(timezone=True), nullable=False)
last_updated = Column(DateTime(timezone=True), nullable=False)
hotel = relationship("Hotel", back_populates="inventory_items")
availability = relationship(
"RoomAvailability",
back_populates="inventory_item",
cascade="all, delete-orphan",
passive_deletes=True,
)
__table_args__ = (
Index(
"uq_hotel_inventory_unique_key",
"hotel_id",
"inv_type_code",
func.coalesce(inv_code, ""),
unique=True,
),
)
class RoomAvailability(Base):
"""Daily availability counts for inventory items."""
__tablename__ = "room_availability"
id = Column(Integer, primary_key=True)
inventory_id = Column(
Integer, ForeignKey("hotel_inventory.id", ondelete="CASCADE"), nullable=False, index=True
)
date = Column(Date, nullable=False, index=True)
count_type_2 = Column(Integer, nullable=True)
count_type_6 = Column(Integer, nullable=True)
count_type_9 = Column(Integer, nullable=True)
is_closing_season = Column(Boolean, nullable=False, default=False)
last_updated = Column(DateTime(timezone=True), nullable=False)
update_type = Column(String(20), nullable=False)
inventory_item = relationship("HotelInventory", back_populates="availability")
__table_args__ = (
UniqueConstraint("inventory_id", "date", name="uq_room_availability_unique_key"),
)
class Hotel(Base):
"""Hotel configuration (migrated from alpine_bits_auth in config.yaml)."""
@@ -710,6 +772,9 @@ class Hotel(Base):
# Relationships
webhook_endpoints = relationship("WebhookEndpoint", back_populates="hotel")
inventory_items = relationship(
"HotelInventory", back_populates="hotel", cascade="all, delete-orphan"
)
class WebhookEndpoint(Base):

View File

@@ -249,110 +249,143 @@ async def reprocess_stuck_webhooks(
These are webhooks that were not fully processed in the previous run,
likely due to a crash or unexpected shutdown.
This function is designed to NEVER block application startup.
All errors are caught and logged, but the app will start regardless.
Args:
sessionmaker: SQLAlchemy async sessionmaker
config: Application configuration dictionary
"""
_LOGGER.info("Checking for stuck webhooks to reprocess...")
try:
_LOGGER.info("Checking for stuck webhooks to reprocess...")
async with sessionmaker() as session:
# Find all webhooks stuck in 'processing' state
result = await session.execute(
select(WebhookRequest)
.where(WebhookRequest.status == WebhookStatus.PROCESSING)
.options(
selectinload(WebhookRequest.webhook_endpoint).selectinload(
WebhookEndpoint.hotel
async with sessionmaker() as session:
# Find all webhooks stuck in 'processing' state
result = await session.execute(
select(WebhookRequest)
.where(WebhookRequest.status == WebhookStatus.PROCESSING)
.options(
selectinload(WebhookRequest.webhook_endpoint).selectinload(
WebhookEndpoint.hotel
)
)
)
)
stuck_webhooks = result.scalars().all()
stuck_webhooks: list[WebhookRequest] = result.scalars().all()
if not stuck_webhooks:
_LOGGER.info("No stuck webhooks found")
return
if not stuck_webhooks:
_LOGGER.info("No stuck webhooks found")
return
_LOGGER.info("Found %d stuck webhooks to reprocess", len(stuck_webhooks))
_LOGGER.info("Found %d stuck webhooks to reprocess", len(stuck_webhooks))
reprocessed_count = 0
failed_count = 0
reprocessed_count = 0
failed_count = 0
for webhook_request in stuck_webhooks:
webhook_id = webhook_request.id
webhook_endpoint = webhook_request.webhook_endpoint
for webhook_request in stuck_webhooks:
webhook_id = webhook_request.id
webhook_endpoint = webhook_request.webhook_endpoint
if not webhook_endpoint:
_LOGGER.error(
"Webhook request %d has no webhook_endpoint, skipping", webhook_id
)
webhook_request.status = WebhookStatus.FAILED
webhook_request.last_error = (
"No webhook endpoint found during startup reprocessing"
)
webhook_request.processing_completed_at = datetime.now(UTC)
failed_count += 1
continue
if not webhook_endpoint:
_LOGGER.error(
"Webhook request %d has no webhook_endpoint, skipping", webhook_id
)
webhook_request.status = WebhookStatus.FAILED
webhook_request.last_error = (
"No webhook endpoint found during startup reprocessing"
)
webhook_request.processing_completed_at = datetime.now(UTC)
failed_count += 1
continue
if not webhook_request.payload_json:
_LOGGER.error(
"Webhook request %d has no payload (purged?), marking as failed",
webhook_id,
)
webhook_request.status = WebhookStatus.FAILED
webhook_request.last_error = (
"No payload available for reprocessing (purged)"
)
webhook_request.processing_completed_at = datetime.now(UTC)
failed_count += 1
continue
if not webhook_request.payload_json:
_LOGGER.error(
"Webhook request %d has no payload (purged?), marking as failed",
webhook_id,
)
webhook_request.status = WebhookStatus.FAILED
webhook_request.last_error = (
"No payload available for reprocessing (purged)"
)
webhook_request.processing_completed_at = datetime.now(UTC)
failed_count += 1
continue
try:
_LOGGER.info(
"Reprocessing webhook %d (hotel=%s, type=%s)",
webhook_id,
webhook_endpoint.hotel_id,
webhook_endpoint.webhook_type,
)
# Get processor for webhook_type
processor = webhook_registry.get_processor(
webhook_endpoint.webhook_type
)
if not processor:
raise ValueError(
f"No processor for type: {webhook_endpoint.webhook_type}"
try:
_LOGGER.info(
"Reprocessing webhook %d (hotel=%s, type=%s)",
webhook_id,
webhook_endpoint.hotel_id,
webhook_endpoint.webhook_type,
)
# Reprocess webhook with simplified interface
await processor.process(
webhook_request=webhook_request,
db_session=session,
config=config,
)
# Get processor for webhook_type
processor = webhook_registry.get_processor(
webhook_endpoint.webhook_type
)
if not processor:
raise ValueError(
f"No processor for type: {webhook_endpoint.webhook_type}"
)
# Update status to completed
webhook_request.status = WebhookStatus.COMPLETED
webhook_request.processing_completed_at = datetime.now(UTC)
reprocessed_count += 1
# Reprocess webhook with simplified interface
result = await processor.process(
webhook_request=webhook_request,
db_session=session,
config=config,
)
_LOGGER.info("Successfully reprocessed webhook %d", webhook_id)
# Check result status
result_status = result.get("status") if isinstance(result, dict) else "success"
except Exception as e:
_LOGGER.exception("Failed to reprocess webhook %d: %s", webhook_id, e)
webhook_request.status = WebhookStatus.FAILED
webhook_request.last_error = (
f"Reprocessing failed during startup: {str(e)[:1950]}"
)
webhook_request.processing_completed_at = datetime.now(UTC)
failed_count += 1
if result_status == "duplicate":
# Duplicate is not an error - mark as completed and continue
webhook_request.status = WebhookStatus.COMPLETED
webhook_request.processing_completed_at = datetime.now(UTC)
reprocessed_count += 1
_LOGGER.info(
"Webhook %d was a duplicate (already processed), marked as completed",
webhook_id
)
elif result_status in ("success", "completed"):
# Update status to completed
webhook_request.status = WebhookStatus.COMPLETED
webhook_request.processing_completed_at = datetime.now(UTC)
reprocessed_count += 1
_LOGGER.info("Successfully reprocessed webhook %d", webhook_id)
else:
# Unexpected status - treat as failure
_LOGGER.warning(
"Webhook %d returned unexpected status: %s",
webhook_id,
result_status
)
webhook_request.status = WebhookStatus.FAILED
webhook_request.last_error = f"Unexpected status: {result_status}"
webhook_request.processing_completed_at = datetime.now(UTC)
failed_count += 1
# Commit all changes
await session.commit()
except Exception as e:
_LOGGER.exception("Failed to reprocess webhook %d: %s", webhook_id, e)
webhook_request.status = WebhookStatus.FAILED
webhook_request.last_error = (
f"Reprocessing failed during startup: {str(e)[:1950]}"
)
webhook_request.processing_completed_at = datetime.now(UTC)
failed_count += 1
_LOGGER.info(
"Webhook reprocessing complete: %d successful, %d failed",
reprocessed_count,
failed_count,
# Commit all changes
await session.commit()
_LOGGER.info(
"Webhook reprocessing complete: %d successful, %d failed",
reprocessed_count,
failed_count,
)
except Exception as e:
# CRITICAL: Never let reprocessing block application startup
_LOGGER.exception(
"CRITICAL ERROR during webhook reprocessing, but allowing app to start: %s",
e
)

View File

@@ -0,0 +1,600 @@
"""Action handler for OTA_HotelInvCountNotif:FreeRooms."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import UTC, date, datetime, timedelta
from typing import Any
from sqlalchemy import delete, select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
from sqlalchemy.ext.asyncio import AsyncSession
from xsdata.formats.dataclass.serializers.config import SerializerConfig
from xsdata_pydantic.bindings import XmlParser, XmlSerializer
from .alpinebits_server import (
AlpineBitsAction,
AlpineBitsActionName,
AlpineBitsClientInfo,
AlpineBitsResponse,
Version,
validate_hotel_authentication,
)
from .const import HttpStatusCode
from .db import Hotel, HotelInventory, RoomAvailability
from .generated import (
ErrorType,
InvCountCountType,
OtaHotelInvCountNotifRq,
OtaHotelInvCountNotifRs,
UniqueIdInstance,
)
from .logging_config import get_logger
_LOGGER = get_logger(__name__)
SUPPORTED_CAPABILITIES = [
"OTA_HotelInvCountNotif_accept_rooms",
"OTA_HotelInvCountNotif_accept_categories",
"OTA_HotelInvCountNotif_accept_deltas",
"OTA_HotelInvCountNotif_accept_complete_set",
"OTA_HotelInvCountNotif_accept_out_of_order",
"OTA_HotelInvCountNotif_accept_out_of_market",
"OTA_HotelInvCountNotif_accept_closing_seasons",
]
CLOSING_SEASON_TYPE = "__CLOSE" # <= 8 chars per spec
SOURCE_FREEROOMS = "FreeRooms"
COUNT_TYPE_MAP = {
InvCountCountType.VALUE_2: "count_type_2",
InvCountCountType.VALUE_6: "count_type_6",
InvCountCountType.VALUE_9: "count_type_9",
}
@dataclass
class FreeRoomsProcessingError(Exception):
"""Custom exception that carries HTTP and OTA error metadata."""
message: str
status_code: HttpStatusCode = HttpStatusCode.BAD_REQUEST
error_type: ErrorType = ErrorType.VALUE_13
code: str = "450"
def __str__(self) -> str:
return self.message
class FreeRoomsAction(AlpineBitsAction):
"""Handler for OTA_HotelInvCountNotif:FreeRooms requests."""
def __init__(self, config: dict | None = None):
self.name = AlpineBitsActionName.OTA_HOTEL_INV_COUNT_NOTIF_FREE_ROOMS
self.version = [Version.V2024_10, Version.V2022_10]
self.config = config or {}
self.supports = SUPPORTED_CAPABILITIES
self._parser = XmlParser()
self._serializer = XmlSerializer(
config=SerializerConfig(
pretty_print=True,
xml_declaration=True,
encoding="UTF-8",
)
)
async def handle(
self,
action: str,
request_xml: str,
version: Version,
client_info: AlpineBitsClientInfo,
dbsession: AsyncSession | None = None,
server_capabilities=None,
) -> AlpineBitsResponse:
"""Process FreeRooms inventory updates."""
try:
self._validate_action_name(action)
if request_xml is None:
raise FreeRoomsProcessingError("Missing request payload")
if dbsession is None:
raise FreeRoomsProcessingError(
"Database session unavailable",
HttpStatusCode.INTERNAL_SERVER_ERROR,
)
try:
request = self._parser.from_string(request_xml, OtaHotelInvCountNotifRq)
except Exception as exc: # pragma: no cover - serialization already tested upstream
_LOGGER.exception("Failed to parse FreeRooms request: %s", exc)
raise FreeRoomsProcessingError("Invalid XML payload") from exc
hotel_code = request.inventories.hotel_code if request.inventories else None
if not hotel_code:
raise FreeRoomsProcessingError("HotelCode attribute is required")
if not client_info or not client_info.username or not client_info.password:
raise FreeRoomsProcessingError(
"Missing authentication context",
HttpStatusCode.UNAUTHORIZED,
error_type=ErrorType.VALUE_11,
code="401",
)
if not validate_hotel_authentication(
client_info.username,
client_info.password,
hotel_code,
self.config,
):
raise FreeRoomsProcessingError(
f"Unauthorized FreeRooms notification for hotel {hotel_code}",
HttpStatusCode.UNAUTHORIZED,
error_type=ErrorType.VALUE_11,
code="401",
)
hotel = await self._fetch_hotel(dbsession, hotel_code)
if hotel is None:
raise FreeRoomsProcessingError(
f"Hotel {hotel_code} is not provisioned on this server"
)
is_complete_set = (
request.unique_id is not None
and request.unique_id.instance == UniqueIdInstance.COMPLETE_SET
)
update_type = "CompleteSet" if is_complete_set else "Delta"
inventory_cache: dict[tuple[str, str | None], HotelInventory] = {}
try:
if is_complete_set:
await self._process_complete_set(
dbsession, hotel, request, update_type, inventory_cache
)
else:
await self._process_delta(
dbsession, hotel, request, update_type, inventory_cache
)
await dbsession.commit()
except FreeRoomsProcessingError:
await dbsession.rollback()
raise
except Exception as exc: # pragma: no cover - defensive
await dbsession.rollback()
_LOGGER.exception("Unexpected FreeRooms failure: %s", exc)
return self._error_response(
"Internal server error while processing FreeRooms notification",
HttpStatusCode.INTERNAL_SERVER_ERROR,
)
_LOGGER.info(
"Processed FreeRooms %s update for hotel %s (%d inventory items)",
update_type,
hotel_code,
len(request.inventories.inventory),
)
return self._success_response()
except FreeRoomsProcessingError as exc:
return self._error_response(
exc.message,
exc.status_code,
error_type=exc.error_type,
code=exc.code,
)
def _validate_action_name(self, action: str) -> None:
expected = self.name.value[1]
if (action or "").strip() != expected:
raise FreeRoomsProcessingError(
f"Invalid action {action}, expected {expected}",
HttpStatusCode.BAD_REQUEST,
)
async def _fetch_hotel(self, session: AsyncSession, hotel_code: str) -> Hotel | None:
stmt = select(Hotel).where(Hotel.hotel_id == hotel_code, Hotel.is_active.is_(True))
result = await session.execute(stmt)
return result.scalar_one_or_none()
async def _process_complete_set(
self,
session: AsyncSession,
hotel: Hotel,
request: OtaHotelInvCountNotifRq,
update_type: str,
inventory_cache: dict[tuple[str, str | None], HotelInventory],
) -> None:
await self._delete_existing_availability(session, hotel.hotel_id)
await self._process_inventories(
session, hotel, request, update_type, inventory_cache, enforce_closing_order=True
)
async def _process_delta(
self,
session: AsyncSession,
hotel: Hotel,
request: OtaHotelInvCountNotifRq,
update_type: str,
inventory_cache: dict[tuple[str, str | None], HotelInventory],
) -> None:
await self._process_inventories(
session, hotel, request, update_type, inventory_cache, enforce_closing_order=False
)
async def _delete_existing_availability(
self,
session: AsyncSession,
hotel_id: str,
) -> None:
subquery = select(HotelInventory.id).where(HotelInventory.hotel_id == hotel_id)
await session.execute(
delete(RoomAvailability).where(RoomAvailability.inventory_id.in_(subquery))
)
async def _process_inventories(
self,
session: AsyncSession,
hotel: Hotel,
request: OtaHotelInvCountNotifRq,
update_type: str,
inventory_cache: dict[tuple[str, str | None], HotelInventory],
enforce_closing_order: bool,
) -> None:
inventories = request.inventories.inventory if request.inventories else []
if not inventories:
raise FreeRoomsProcessingError(
"Request must include at least one Inventory block",
HttpStatusCode.BAD_REQUEST,
)
rows_to_upsert: list[dict[str, Any]] = []
now = datetime.now(UTC)
encountered_standard = False
for inventory in inventories:
sac = inventory.status_application_control
if sac is None:
raise FreeRoomsProcessingError(
"StatusApplicationControl element is required for each Inventory",
HttpStatusCode.BAD_REQUEST,
)
is_closing = self._is_closing_season(sac)
if is_closing:
if inventory.inv_counts is not None:
raise FreeRoomsProcessingError(
"Closing seasons cannot contain InvCounts data",
HttpStatusCode.BAD_REQUEST,
)
if update_type != "CompleteSet":
raise FreeRoomsProcessingError(
"Closing seasons are only allowed on CompleteSet updates",
HttpStatusCode.BAD_REQUEST,
)
if enforce_closing_order and encountered_standard:
raise FreeRoomsProcessingError(
"Closing seasons must appear before other inventory entries",
HttpStatusCode.BAD_REQUEST,
)
rows_to_upsert.extend(
await self._process_closing_season(
session, hotel, sac, update_type, now, inventory_cache
)
)
continue
encountered_standard = True
rows_to_upsert.extend(
await self._process_inventory_item(
session,
hotel,
sac,
inventory.inv_counts,
update_type,
now,
inventory_cache,
)
)
await self._upsert_availability_rows(session, rows_to_upsert)
async def _process_closing_season(
self,
session: AsyncSession,
hotel: Hotel,
sac: OtaHotelInvCountNotifRq.Inventories.Inventory.StatusApplicationControl,
update_type: str,
timestamp: datetime,
inventory_cache: dict[tuple[str, str | None], HotelInventory],
) -> list[dict[str, Any]]:
if sac.inv_type_code or sac.inv_code:
raise FreeRoomsProcessingError(
"Closing season entries cannot specify InvTypeCode or InvCode",
HttpStatusCode.BAD_REQUEST,
)
start_date, end_date = self._parse_date_range(sac.start, sac.end)
inventory_item = await self._ensure_inventory_item(
session,
hotel.hotel_id,
CLOSING_SEASON_TYPE,
None,
timestamp,
inventory_cache,
)
base_payload = {
"inventory_id": inventory_item.id,
"count_type_2": None,
"count_type_6": None,
"count_type_9": None,
"is_closing_season": True,
"last_updated": timestamp,
"update_type": update_type,
}
rows = []
for day in self._iter_days(start_date, end_date):
payload = dict(base_payload)
payload["date"] = day
rows.append(payload)
return rows
async def _process_inventory_item(
self,
session: AsyncSession,
hotel: Hotel,
sac: OtaHotelInvCountNotifRq.Inventories.Inventory.StatusApplicationControl,
inv_counts: (
OtaHotelInvCountNotifRq.Inventories.Inventory.InvCounts | None
),
update_type: str,
timestamp: datetime,
inventory_cache: dict[tuple[str, str | None], HotelInventory],
) -> list[dict[str, Any]]:
inv_type_code = (sac.inv_type_code or "").strip()
if not inv_type_code:
raise FreeRoomsProcessingError(
"InvTypeCode is required unless AllInvCode=\"true\"",
HttpStatusCode.BAD_REQUEST,
)
inv_code = sac.inv_code.strip() if sac.inv_code else None
start_date, end_date = self._parse_date_range(sac.start, sac.end)
counts = self._extract_counts(inv_counts)
base_counts = {
"count_type_2": counts.get("count_type_2"),
"count_type_6": counts.get("count_type_6"),
"count_type_9": counts.get("count_type_9"),
}
inventory_item = await self._ensure_inventory_item(
session,
hotel.hotel_id,
inv_type_code,
inv_code,
timestamp,
inventory_cache,
)
base_payload = {
"inventory_id": inventory_item.id,
"is_closing_season": False,
"last_updated": timestamp,
"update_type": update_type,
**base_counts,
}
rows = []
for day in self._iter_days(start_date, end_date):
payload = dict(base_payload)
payload["date"] = day
rows.append(payload)
return rows
def _parse_date_range(self, start_str: str, end_str: str) -> tuple[date, date]:
try:
start_date = date.fromisoformat(start_str)
end_date = date.fromisoformat(end_str)
except ValueError as exc:
raise FreeRoomsProcessingError(
f"Invalid date format: {exc!s}",
HttpStatusCode.BAD_REQUEST,
) from exc
if end_date < start_date:
raise FreeRoomsProcessingError(
"StatusApplicationControl End date cannot be before Start date",
HttpStatusCode.BAD_REQUEST,
)
return start_date, end_date
def _iter_days(self, start_date: date, end_date: date):
current = start_date
while current <= end_date:
yield current
current += timedelta(days=1)
def _is_closing_season(
self,
sac: OtaHotelInvCountNotifRq.Inventories.Inventory.StatusApplicationControl,
) -> bool:
return (sac.all_inv_code or "").strip().lower() == "true"
def _extract_counts(
self,
inv_counts: OtaHotelInvCountNotifRq.Inventories.Inventory.InvCounts | None,
) -> dict[str, int | None]:
if inv_counts is None or not inv_counts.inv_count:
return {}
parsed: dict[str, int] = {}
for count in inv_counts.inv_count:
column_name = COUNT_TYPE_MAP.get(count.count_type)
if column_name is None:
raise FreeRoomsProcessingError(
f"Unsupported CountType {count.count_type}",
HttpStatusCode.BAD_REQUEST,
)
if column_name in parsed:
raise FreeRoomsProcessingError(
f"Duplicate CountType {count.count_type.value} detected",
HttpStatusCode.BAD_REQUEST,
)
try:
value = int(count.count)
except ValueError as exc:
raise FreeRoomsProcessingError(
f"Invalid Count value '{count.count}'",
HttpStatusCode.BAD_REQUEST,
) from exc
if value < 0:
raise FreeRoomsProcessingError(
"Count values must be non-negative",
HttpStatusCode.BAD_REQUEST,
)
parsed[column_name] = value
return parsed
async def _ensure_inventory_item(
self,
session: AsyncSession,
hotel_id: str,
inv_type_code: str,
inv_code: str | None,
timestamp: datetime,
cache: dict[tuple[str, str | None], HotelInventory],
) -> HotelInventory:
cache_key = (inv_type_code, inv_code)
if cache_key in cache:
return cache[cache_key]
filters = [
HotelInventory.hotel_id == hotel_id,
HotelInventory.inv_type_code == inv_type_code,
]
if inv_code is None:
filters.append(HotelInventory.inv_code.is_(None))
else:
filters.append(HotelInventory.inv_code == inv_code)
stmt = select(HotelInventory).where(*filters)
result = await session.execute(stmt)
inventory_item = result.scalar_one_or_none()
if inventory_item:
inventory_item.last_updated = timestamp
else:
inventory_item = HotelInventory(
hotel_id=hotel_id,
inv_type_code=inv_type_code,
inv_code=inv_code,
source=SOURCE_FREEROOMS,
first_seen=timestamp,
last_updated=timestamp,
)
session.add(inventory_item)
await session.flush()
cache[cache_key] = inventory_item
return inventory_item
async def _upsert_availability_rows(
self,
session: AsyncSession,
rows: list[dict[str, Any]],
) -> None:
if not rows:
return
bind = session.get_bind()
dialect_name = bind.dialect.name if bind else ""
table = RoomAvailability.__table__
if dialect_name == "postgresql":
stmt = pg_insert(table).values(rows)
stmt = stmt.on_conflict_do_update(
index_elements=["inventory_id", "date"],
set_=self._build_upsert_set(stmt),
)
await session.execute(stmt)
return
if dialect_name == "sqlite":
stmt = sqlite_insert(table).values(rows)
stmt = stmt.on_conflict_do_update(
index_elements=["inventory_id", "date"],
set_=self._build_upsert_set(stmt),
)
await session.execute(stmt)
return
await self._upsert_with_fallback(session, rows)
def _build_upsert_set(self, stmt):
return {
"count_type_2": stmt.excluded.count_type_2,
"count_type_6": stmt.excluded.count_type_6,
"count_type_9": stmt.excluded.count_type_9,
"is_closing_season": stmt.excluded.is_closing_season,
"last_updated": stmt.excluded.last_updated,
"update_type": stmt.excluded.update_type,
}
async def _upsert_with_fallback(
self, session: AsyncSession, rows: list[dict[str, Any]]
) -> None:
for row in rows:
stmt = select(RoomAvailability).where(
RoomAvailability.inventory_id == row["inventory_id"],
RoomAvailability.date == row["date"],
)
result = await session.execute(stmt)
existing = result.scalar_one_or_none()
if existing:
existing.count_type_2 = row["count_type_2"]
existing.count_type_6 = row["count_type_6"]
existing.count_type_9 = row["count_type_9"]
existing.is_closing_season = row["is_closing_season"]
existing.last_updated = row["last_updated"]
existing.update_type = row["update_type"]
else:
session.add(RoomAvailability(**row))
def _success_response(self) -> AlpineBitsResponse:
response = OtaHotelInvCountNotifRs(version="7.000", success="")
xml = self._serializer.render(
response, ns_map={None: "http://www.opentravel.org/OTA/2003/05"}
)
return AlpineBitsResponse(xml, HttpStatusCode.OK)
def _error_response(
self,
message: str,
status_code: HttpStatusCode,
error_type: ErrorType = ErrorType.VALUE_13,
code: str = "450",
) -> AlpineBitsResponse:
error = OtaHotelInvCountNotifRs.Errors.Error(
type_value=error_type,
code=code,
content=[message],
)
errors = OtaHotelInvCountNotifRs.Errors(error=[error])
response = OtaHotelInvCountNotifRs(version="7.000", errors=errors)
xml = self._serializer.render(
response, ns_map={None: "http://www.opentravel.org/OTA/2003/05"}
)
return AlpineBitsResponse(xml, status_code)

View File

@@ -10,11 +10,15 @@ from XML generation (xsdata) follows clean architecture principles.
"""
import hashlib
import json
from datetime import date, datetime
from enum import Enum
from typing import Any
from pydantic import BaseModel, EmailStr, Field, field_validator, model_validator
from .const import WebhookStatus
# Country name to ISO 3166-1 alpha-2 code mapping
COUNTRY_NAME_TO_CODE = {
@@ -308,6 +312,148 @@ class CommentsData(BaseModel):
model_config = {"from_attributes": True}
class HotelData(BaseModel):
"""Validated hotel configuration data."""
hotel_id: str = Field(..., min_length=1, max_length=50)
hotel_name: str = Field(..., min_length=1, max_length=200)
username: str = Field(..., min_length=1, max_length=100)
password_hash: str = Field(..., min_length=1, max_length=200)
meta_account_id: str | None = Field(None, max_length=50)
google_account_id: str | None = Field(None, max_length=50)
push_endpoint_url: str | None = Field(None, max_length=500)
push_endpoint_token: str | None = Field(None, max_length=200)
push_endpoint_username: str | None = Field(None, max_length=100)
created_at: datetime = Field(default_factory=lambda: datetime.now())
updated_at: datetime = Field(default_factory=lambda: datetime.now())
is_active: bool = Field(default=True)
@field_validator("hotel_id", "hotel_name", "username")
@classmethod
def strip_whitespace(cls, v: str) -> str:
"""Remove leading/trailing whitespace."""
return v.strip()
model_config = {"from_attributes": True}
class WebhookEndpointData(BaseModel):
"""Validated webhook endpoint configuration data."""
hotel_id: str = Field(..., min_length=1, max_length=50)
webhook_secret: str = Field(..., min_length=1, max_length=64)
webhook_type: str = Field(..., min_length=1, max_length=50)
description: str | None = Field(None, max_length=200)
is_enabled: bool = Field(default=True)
created_at: datetime = Field(default_factory=lambda: datetime.now())
@field_validator("hotel_id", "webhook_secret", "webhook_type")
@classmethod
def strip_whitespace(cls, v: str) -> str:
"""Remove leading/trailing whitespace."""
return v.strip()
model_config = {"from_attributes": True}
class WebhookRequestData(BaseModel):
"""Validated webhook request data.
This model handles the special case where:
- payload_json is required for creation (to calculate payload_hash)
- payload_json becomes optional after processing (can be purged for privacy/storage)
- payload_hash is auto-calculated from payload_json when provided
"""
# Required fields
payload_json: dict[str, Any] | None = Field(
...,
description="Webhook payload (required for creation, nullable after purge)"
)
# Auto-calculated from payload_json
payload_hash: str | None = Field(
None,
min_length=64,
max_length=64,
description="SHA256 hash of canonical JSON payload (auto-calculated)"
)
# Optional foreign keys
webhook_endpoint_id: int | None = Field(None, gt=0)
hotel_id: str | None = Field(None, max_length=50)
# Processing tracking
status: WebhookStatus = Field(default=WebhookStatus.PENDING)
processing_started_at: datetime | None = None
processing_completed_at: datetime | None = None
# Retry handling
retry_count: int = Field(default=0, ge=0)
last_error: str | None = Field(None, max_length=2000)
# Payload metadata
purged_at: datetime | None = None
# Request metadata
created_at: datetime = Field(default_factory=lambda: datetime.now())
source_ip: str | None = Field(None, max_length=45)
user_agent: str | None = Field(None, max_length=500)
# Result tracking
created_customer_id: int | None = Field(None, gt=0)
created_reservation_id: int | None = Field(None, gt=0)
@model_validator(mode="after")
def calculate_payload_hash(self) -> "WebhookRequestData":
"""Auto-calculate payload_hash from payload_json if not provided.
Uses the same hashing algorithm as api.py:
- Canonical JSON with sorted keys
- UTF-8 encoding
- SHA256 hash
This runs after all field validation, so we can access the validated payload_json.
"""
# Only calculate if payload_json is provided and payload_hash is not set
if self.payload_json is not None and self.payload_hash is None:
# Create canonical JSON string (sorted keys for consistency)
payload_json_str = json.dumps(self.payload_json, sort_keys=True)
# Calculate SHA256 hash
self.payload_hash = hashlib.sha256(
payload_json_str.encode("utf-8")
).hexdigest()
return self
@model_validator(mode="after")
def validate_payload_hash_requirements(self) -> "WebhookRequestData":
"""Ensure payload_hash is present (either provided or calculated).
This validator runs after calculate_payload_hash, so payload_hash should
be set if payload_json was provided.
"""
if self.payload_hash is None:
raise ValueError(
"payload_hash is required. It can be auto-calculated from payload_json "
"or explicitly provided."
)
return self
@field_validator("status", mode="before")
@classmethod
def normalize_status(cls, v: str | WebhookStatus) -> WebhookStatus:
"""Normalize status to WebhookStatus enum."""
if isinstance(v, WebhookStatus):
return v
if isinstance(v, str):
return WebhookStatus(v)
raise ValueError(f"Invalid webhook status: {v}")
model_config = {"from_attributes": True}
# Example usage in a service layer
class ReservationService:
"""Example service showing how to use Pydantic models with SQLAlchemy."""

View File

@@ -273,10 +273,28 @@ async def process_wix_form_submission(
reservation, db_customer.id
)
except IntegrityError as e:
_LOGGER.exception("Database integrity error creating reservation: %s", e)
raise HTTPException(
status_code=500, detail="Database error creating reservation"
) from e
await db_session.rollback()
# Check if this is a duplicate (unique constraint violation)
error_msg = str(e.orig) if hasattr(e, 'orig') else str(e)
is_duplicate = any(keyword in error_msg.lower() for keyword in ['unique', 'duplicate', 'already exists'])
if is_duplicate:
_LOGGER.info(
"Duplicate reservation detected for unique_id=%s, skipping (this is expected for reprocessing)",
unique_id
)
return {
"status": "duplicate",
"message": "Reservation already exists (duplicate submission)",
"unique_id": unique_id,
"timestamp": timestamp,
}
else:
# Real integrity error (not a duplicate)
_LOGGER.exception("Database integrity error creating reservation: %s", e)
raise HTTPException(
status_code=500, detail="Database error creating reservation"
) from e
async def push_event():
# Fire event for listeners (push, etc.) - hotel-specific dispatch
@@ -403,7 +421,7 @@ async def process_generic_webhook_submission(
hotel_data = data.get("hotel_data", {})
form_data = data.get("form_data", {})
tracking_data = data.get("tracking_data", {})
offer_data = data.get("unterkunftTyp", {})
offer_data = form_data.get("unterkunftTyp", {})
selected_offers = []
@@ -581,9 +599,33 @@ async def process_generic_webhook_submission(
# Use ReservationService to create reservation
reservation_service = ReservationService(db_session)
db_reservation = await reservation_service.create_reservation(
reservation, db_customer.id
)
try:
db_reservation = await reservation_service.create_reservation(
reservation, db_customer.id
)
except IntegrityError as e:
await db_session.rollback()
# Check if this is a duplicate (unique constraint violation)
error_msg = str(e.orig) if hasattr(e, 'orig') else str(e)
is_duplicate = any(keyword in error_msg.lower() for keyword in ['unique', 'duplicate', 'already exists'])
if is_duplicate:
_LOGGER.info(
"Duplicate reservation detected for unique_id=%s, skipping (this is expected for reprocessing)",
unique_id
)
return {
"status": "duplicate",
"message": "Reservation already exists (duplicate submission)",
"unique_id": unique_id,
"timestamp": timestamp,
}
else:
# Real integrity error (not a duplicate)
_LOGGER.exception("Database integrity error creating reservation: %s", e)
raise HTTPException(
status_code=500, detail="Database error creating reservation"
) from e
async def push_event():
# Fire event for listeners (push, etc.) - hotel-specific dispatch

197
tests/helpers/README.md Normal file
View File

@@ -0,0 +1,197 @@
# Test Helpers
This directory contains helper utilities for creating test data.
## XML Builders
The `xml_builders` module provides convenient builder classes for creating reservation XML structures used in conversion service tests.
### Quick Start
```python
from tests.helpers import ReservationXMLBuilder
# Create a simple reservation
xml = (
ReservationXMLBuilder(
hotel_id="39054_001",
reservation_id="12345",
reservation_number="RES-001",
reservation_date="2025-11-14",
)
.set_guest(
guest_id="guest_001",
first_name="John",
last_name="Doe",
email="john@example.com",
)
.add_room(
arrival="2025-12-01",
departure="2025-12-05",
revenue_logis_per_day=150.0, # Fixed revenue per night
)
.build_xml()
)
```
### Features
#### ReservationXMLBuilder
The main builder class for creating reservation XML structures.
**Key Features:**
- Fluent API for method chaining
- Automatic daily sales generation from arrival to departure
- Convenient revenue-per-day specification (no need to manually create each dailySale)
- Support for advertising campaign data
- Guest information with optional fields
**Example - Multi-room reservation:**
```python
xml = (
ReservationXMLBuilder(
hotel_id="39054_001",
reservation_id="12345",
reservation_number="RES-001",
reservation_date="2025-11-14",
)
.set_guest(
guest_id="guest_001",
first_name="Jane",
last_name="Smith",
email="jane@example.com",
country_code="US",
)
.add_room(
arrival="2025-12-01",
departure="2025-12-05",
room_number="101",
room_type="DZV",
revenue_logis_per_day=150.0,
)
.add_room(
arrival="2025-12-01",
departure="2025-12-05",
room_number="102",
room_type="DZM",
revenue_logis_per_day=200.0,
)
.build_xml()
)
```
#### Daily Sales Generation
The builder automatically generates `<dailySale>` entries for each day from arrival to departure (inclusive).
- **Days before departure**: Include `revenueTotal` and `revenueLogis` attributes
- **Departure day**: No revenue attributes (just the date)
**Example:**
```python
# A 3-night stay (Dec 1-4)
.add_room(
arrival="2025-12-01",
departure="2025-12-04",
revenue_logis_per_day=160.0,
)
```
Generates:
```xml
<dailySales>
<dailySale date="2025-12-01" revenueTotal="160.0" revenueLogis="160.0"/>
<dailySale date="2025-12-02" revenueTotal="160.0" revenueLogis="160.0"/>
<dailySale date="2025-12-03" revenueTotal="160.0" revenueLogis="160.0"/>
<dailySale date="2025-12-04"/> <!-- No revenue on departure day -->
</dailySales>
```
#### MultiReservationXMLBuilder
For creating XML documents with multiple reservations:
```python
from tests.helpers import ReservationXMLBuilder, MultiReservationXMLBuilder
multi_builder = MultiReservationXMLBuilder()
# Add first reservation
res1 = (
ReservationXMLBuilder(...)
.set_guest(...)
.add_room(...)
)
multi_builder.add_reservation(res1)
# Add second reservation
res2 = (
ReservationXMLBuilder(...)
.set_guest(...)
.add_room(...)
)
multi_builder.add_reservation(res2)
xml = multi_builder.build_xml()
```
#### RoomReservationBuilder
Low-level builder for creating individual room reservations. Usually you'll use `ReservationXMLBuilder.add_room()` instead, but this is available for advanced use cases.
```python
from tests.helpers import RoomReservationBuilder
room_builder = RoomReservationBuilder(
arrival="2025-12-01",
departure="2025-12-05",
room_type="DZV",
room_number="101",
revenue_logis_per_day=150.0,
)
# Get the XML element (not a string)
room_elem = room_builder.build()
```
### Common Parameters
**ReservationXMLBuilder:**
- `hotel_id` - Hotel ID (required)
- `reservation_id` - Reservation ID (required)
- `reservation_number` - Reservation number (required)
- `reservation_date` - Reservation date YYYY-MM-DD (required)
- `creation_time` - Creation timestamp (optional, defaults to reservation_date + T00:00:00)
- `advertising_medium` - Advertising medium (optional)
- `advertising_partner` - Advertising partner (optional)
- `advertising_campagne` - Advertising campaign (optional)
**set_guest() parameters:**
- `guest_id` - Guest ID (required)
- `first_name` - First name (required)
- `last_name` - Last name (required)
- `email` - Email address (required)
- `language` - Language code (default: "en")
- `gender` - Gender (optional)
- `country_code` - Country code (optional)
- `country` - Country name (optional)
**add_room() parameters:**
- `arrival` - Arrival date YYYY-MM-DD (required)
- `departure` - Departure date YYYY-MM-DD (required)
- `room_type` - Room type code (default: "DZV")
- `room_number` - Room number (default: "101")
- `status` - Reservation status (default: "reserved")
- `adults` - Number of adults (default: 2)
- `children` - Number of children (default: 0)
- `infants` - Number of infants (default: 0)
- `rate_plan_code` - Rate plan code (default: "STANDARD")
- `revenue_logis_per_day` - Fixed revenue per night (optional, generates daily sales)
- `revenue_total_per_day` - Total revenue per night (optional, defaults to revenue_logis_per_day)
### See Also
- [tests/test_xml_builders.py](../test_xml_builders.py) - Unit tests demonstrating all features
- [tests/test_conversion_service.py](../test_conversion_service.py) - Integration examples (TestXMLBuilderUsage class)

13
tests/helpers/__init__.py Normal file
View File

@@ -0,0 +1,13 @@
"""Test helper utilities for creating test data."""
from .xml_builders import (
ReservationXMLBuilder,
MultiReservationXMLBuilder,
RoomReservationBuilder,
)
__all__ = [
"ReservationXMLBuilder",
"MultiReservationXMLBuilder",
"RoomReservationBuilder",
]

View File

@@ -0,0 +1,392 @@
"""XML builder helpers for creating test reservation data.
This module provides convenient builder classes for generating reservation XML
structures used in conversion service tests.
"""
from datetime import datetime, timedelta
from typing import Optional
from xml.etree import ElementTree as ET
class RoomReservationBuilder:
"""Builder for creating roomReservation XML elements with daily sales."""
def __init__(
self,
arrival: str,
departure: str,
room_type: str = "DZV",
room_number: str = "101",
status: str = "reserved",
adults: int = 2,
children: int = 0,
infants: int = 0,
rate_plan_code: str = "STANDARD",
connected_room_type: str = "0",
revenue_logis_per_day: Optional[float] = None,
revenue_total_per_day: Optional[float] = None,
):
"""Initialize room reservation builder.
Args:
arrival: Arrival date in YYYY-MM-DD format
departure: Departure date in YYYY-MM-DD format
room_type: Room type code
room_number: Room number
status: Reservation status (reserved, request, confirmed, etc.)
adults: Number of adults
children: Number of children
infants: Number of infants
rate_plan_code: Rate plan code
connected_room_type: Connected room type code
revenue_logis_per_day: Revenue per day (if None, no revenue attributes)
revenue_total_per_day: Total revenue per day (defaults to revenue_logis_per_day)
"""
self.arrival = arrival
self.departure = departure
self.room_type = room_type
self.room_number = room_number
self.status = status
self.adults = adults
self.children = children
self.infants = infants
self.rate_plan_code = rate_plan_code
self.connected_room_type = connected_room_type
self.revenue_logis_per_day = revenue_logis_per_day
self.revenue_total_per_day = revenue_total_per_day or revenue_logis_per_day
def build(self) -> ET.Element:
"""Build the roomReservation XML element with daily sales.
Returns:
XML Element for the room reservation
"""
room_attrs = {
"arrival": self.arrival,
"departure": self.departure,
"status": self.status,
"roomType": self.room_type,
"roomNumber": self.room_number,
"adults": str(self.adults),
"ratePlanCode": self.rate_plan_code,
"connectedRoomType": self.connected_room_type,
}
if self.children > 0:
room_attrs["children"] = str(self.children)
if self.infants > 0:
room_attrs["infants"] = str(self.infants)
room_elem = ET.Element("roomReservation", room_attrs)
# Create dailySales element
daily_sales_elem = ET.SubElement(room_elem, "dailySales")
# Generate daily sale entries from arrival to departure (inclusive of departure for the no-revenue entry)
arrival_date = datetime.strptime(self.arrival, "%Y-%m-%d")
departure_date = datetime.strptime(self.departure, "%Y-%m-%d")
current_date = arrival_date
while current_date <= departure_date:
date_str = current_date.strftime("%Y-%m-%d")
daily_sale_attrs = {"date": date_str}
# Add revenue attributes for all days except departure day
if current_date < departure_date and self.revenue_logis_per_day is not None:
daily_sale_attrs["revenueTotal"] = str(self.revenue_total_per_day)
daily_sale_attrs["revenueLogis"] = str(self.revenue_logis_per_day)
ET.SubElement(daily_sales_elem, "dailySale", daily_sale_attrs)
current_date += timedelta(days=1)
return room_elem
class ReservationXMLBuilder:
"""Builder for creating complete reservation XML structures for testing.
This builder provides a fluent interface for constructing reservation XML
that matches the format expected by the ConversionService.
Example usage:
builder = ReservationXMLBuilder(
hotel_id="39054_001",
reservation_id="12345",
reservation_number="RES-001",
reservation_date="2025-11-14"
)
builder.set_guest(
guest_id="guest_001",
first_name="John",
last_name="Doe",
email="john@example.com"
)
builder.add_room(
arrival="2025-12-01",
departure="2025-12-05",
revenue_logis_per_day=150.0
)
xml_string = builder.build_xml()
"""
def __init__(
self,
hotel_id: str,
reservation_id: str,
reservation_number: str,
reservation_date: str,
creation_time: Optional[str] = None,
reservation_type: str = "reservation",
advertising_medium: Optional[str] = None,
advertising_partner: Optional[str] = None,
advertising_campagne: Optional[str] = None,
):
"""Initialize reservation builder.
Args:
hotel_id: Hotel ID
reservation_id: Reservation ID
reservation_number: Reservation number
reservation_date: Reservation date in YYYY-MM-DD format
creation_time: Creation timestamp (defaults to reservation_date + T00:00:00)
reservation_type: Type of reservation (reservation, request, etc.)
advertising_medium: Advertising medium
advertising_partner: Advertising partner
advertising_campagne: Advertising campaign
"""
self.hotel_id = hotel_id
self.reservation_id = reservation_id
self.reservation_number = reservation_number
self.reservation_date = reservation_date
self.creation_time = creation_time or f"{reservation_date}T00:00:00"
self.reservation_type = reservation_type
self.advertising_medium = advertising_medium
self.advertising_partner = advertising_partner
self.advertising_campagne = advertising_campagne
self.guest_data: Optional[dict] = None
self.rooms: list[RoomReservationBuilder] = []
def set_guest(
self,
guest_id: str,
first_name: str,
last_name: str,
email: str,
language: str = "en",
gender: Optional[str] = None,
country_code: Optional[str] = None,
country: Optional[str] = None,
) -> "ReservationXMLBuilder":
"""Set guest information for the reservation.
Args:
guest_id: Guest ID
first_name: Guest first name
last_name: Guest last name
email: Guest email
language: Guest language code
gender: Guest gender
country_code: Guest country code
country: Guest country name
Returns:
Self for method chaining
"""
self.guest_data = {
"id": guest_id,
"firstName": first_name,
"lastName": last_name,
"email": email,
"language": language,
}
if gender:
self.guest_data["gender"] = gender
if country_code:
self.guest_data["countryCode"] = country_code
if country:
self.guest_data["country"] = country
return self
def add_room(
self,
arrival: str,
departure: str,
room_type: str = "DZV",
room_number: str = "101",
status: str = "reserved",
adults: int = 2,
children: int = 0,
infants: int = 0,
rate_plan_code: str = "STANDARD",
connected_room_type: str = "0",
revenue_logis_per_day: Optional[float] = None,
revenue_total_per_day: Optional[float] = None,
) -> "ReservationXMLBuilder":
"""Add a room reservation with convenient daily sales generation.
Args:
arrival: Arrival date in YYYY-MM-DD format
departure: Departure date in YYYY-MM-DD format
room_type: Room type code
room_number: Room number
status: Reservation status
adults: Number of adults
children: Number of children
infants: Number of infants
rate_plan_code: Rate plan code
connected_room_type: Connected room type
revenue_logis_per_day: Fixed revenue per day (auto-generates dailySale entries)
revenue_total_per_day: Total revenue per day (defaults to revenue_logis_per_day)
Returns:
Self for method chaining
"""
room_builder = RoomReservationBuilder(
arrival=arrival,
departure=departure,
room_type=room_type,
room_number=room_number,
status=status,
adults=adults,
children=children,
infants=infants,
rate_plan_code=rate_plan_code,
connected_room_type=connected_room_type,
revenue_logis_per_day=revenue_logis_per_day,
revenue_total_per_day=revenue_total_per_day,
)
self.rooms.append(room_builder)
return self
def add_room_builder(
self, room_builder: RoomReservationBuilder
) -> "ReservationXMLBuilder":
"""Add a pre-configured room builder.
Args:
room_builder: RoomReservationBuilder instance
Returns:
Self for method chaining
"""
self.rooms.append(room_builder)
return self
def build(self) -> ET.Element:
"""Build the reservation XML element.
Returns:
XML Element for the reservation
"""
reservation_attrs = {
"hotelID": self.hotel_id,
"id": self.reservation_id,
"number": self.reservation_number,
"date": self.reservation_date,
"creationTime": self.creation_time,
"type": self.reservation_type,
}
if self.advertising_medium:
reservation_attrs["advertisingMedium"] = self.advertising_medium
if self.advertising_partner:
reservation_attrs["advertisingPartner"] = self.advertising_partner
if self.advertising_campagne:
reservation_attrs["advertisingCampagne"] = self.advertising_campagne
reservation_elem = ET.Element("reservation", reservation_attrs)
# Add guest element
if self.guest_data:
ET.SubElement(reservation_elem, "guest", self.guest_data)
# Add roomReservations
if self.rooms:
room_reservations_elem = ET.SubElement(
reservation_elem, "roomReservations"
)
for room_builder in self.rooms:
room_elem = room_builder.build()
room_reservations_elem.append(room_elem)
return reservation_elem
def build_xml(self, include_xml_declaration: bool = True) -> str:
"""Build the complete XML string for this reservation.
Args:
include_xml_declaration: Whether to include <?xml version="1.0"?> declaration
Returns:
XML string
"""
reservation_elem = self.build()
# Wrap in <reservations> root element
root = ET.Element("reservations")
root.append(reservation_elem)
xml_str = ET.tostring(root, encoding="unicode")
if include_xml_declaration:
xml_str = '<?xml version="1.0" ?>\n' + xml_str
return xml_str
class MultiReservationXMLBuilder:
"""Builder for creating XML documents with multiple reservations.
Example:
builder = MultiReservationXMLBuilder()
builder.add_reservation(
ReservationXMLBuilder(...).set_guest(...).add_room(...)
)
builder.add_reservation(
ReservationXMLBuilder(...).set_guest(...).add_room(...)
)
xml_string = builder.build_xml()
"""
def __init__(self):
"""Initialize multi-reservation builder."""
self.reservations: list[ReservationXMLBuilder] = []
def add_reservation(
self, reservation_builder: ReservationXMLBuilder
) -> "MultiReservationXMLBuilder":
"""Add a reservation to the document.
Args:
reservation_builder: ReservationXMLBuilder instance
Returns:
Self for method chaining
"""
self.reservations.append(reservation_builder)
return self
def build_xml(self, include_xml_declaration: bool = True) -> str:
"""Build the complete XML string with all reservations.
Args:
include_xml_declaration: Whether to include <?xml version="1.0"?> declaration
Returns:
XML string with multiple reservations
"""
root = ET.Element("reservations")
for reservation_builder in self.reservations:
reservation_elem = reservation_builder.build()
root.append(reservation_elem)
xml_str = ET.tostring(root, encoding="unicode")
if include_xml_declaration:
xml_str = '<?xml version="1.0" ?>\n' + xml_str
return xml_str

215
tests/test_api_freerooms.py Normal file
View File

@@ -0,0 +1,215 @@
"""Integration tests for the FreeRooms endpoint."""
from __future__ import annotations
import asyncio
import gzip
import urllib.parse
from datetime import UTC, datetime
from unittest.mock import patch
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import select
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from alpine_bits_python.alpinebits_server import AlpineBitsServer
from alpine_bits_python.api import app
from alpine_bits_python.const import HttpStatusCode
from alpine_bits_python.db import Base, Hotel, RoomAvailability
def build_request_xml(body: str, include_unique_id: bool = True) -> str:
unique = (
'<UniqueID Type="16" ID="1" Instance="CompleteSet"/>'
if include_unique_id
else ""
)
return f"""<?xml version="1.0" encoding="UTF-8"?>
<OTA_HotelInvCountNotifRQ xmlns="http://www.opentravel.org/OTA/2003/05" Version="7.000">
{unique}
<Inventories HotelCode="HOTEL123" HotelName="Integration Hotel">
{body}
</Inventories>
</OTA_HotelInvCountNotifRQ>"""
INVENTORY_A = """
<Inventory>
<StatusApplicationControl Start="2025-10-01" End="2025-10-03" InvTypeCode="DBL"/>
<InvCounts>
<InvCount CountType="2" Count="3"/>
</InvCounts>
</Inventory>
"""
INVENTORY_B = """
<Inventory>
<StatusApplicationControl Start="2025-10-02" End="2025-10-02" InvTypeCode="DBL"/>
<InvCounts>
<InvCount CountType="2" Count="1"/>
</InvCounts>
</Inventory>
"""
@pytest.fixture
def freerooms_test_config():
return {
"server": {
"codecontext": "ADVERTISING",
"code": "70597314",
"companyname": "99tales Gmbh",
"res_id_source_context": "99tales",
},
"alpine_bits_auth": [
{
"hotel_id": "HOTEL123",
"hotel_name": "Integration Hotel",
"username": "testuser",
"password": "testpass",
}
],
"database": {"url": "sqlite+aiosqlite:///:memory:"},
}
@pytest.fixture
def freerooms_client(freerooms_test_config):
engine = create_async_engine("sqlite+aiosqlite:///:memory:", echo=False)
async def create_tables():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
asyncio.run(create_tables())
with patch("alpine_bits_python.api.load_config", return_value=freerooms_test_config), patch(
"alpine_bits_python.api.create_database_engine", return_value=engine
):
app.state.engine = engine
app.state.async_sessionmaker = async_sessionmaker(engine, expire_on_commit=False)
app.state.config = freerooms_test_config
app.state.alpine_bits_server = AlpineBitsServer(freerooms_test_config)
with TestClient(app) as test_client:
yield test_client
@pytest.fixture
def freerooms_headers():
return {
"Authorization": "Basic dGVzdHVzZXI6dGVzdHBhc3M=",
"X-AlpineBits-ClientProtocolVersion": "2024-10",
}
def seed_hotel_if_missing(client: TestClient):
async def _seed():
async_sessionmaker = client.app.state.async_sessionmaker
async with async_sessionmaker() as session:
result = await session.execute(
select(Hotel).where(Hotel.hotel_id == "HOTEL123")
)
if result.scalar_one_or_none():
return
session.add(
Hotel(
hotel_id="HOTEL123",
hotel_name="Integration Hotel",
username="testuser",
password_hash="integration-hash",
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
is_active=True,
)
)
await session.commit()
asyncio.run(_seed())
def fetch_availability(client: TestClient):
async def _fetch():
async_sessionmaker = client.app.state.async_sessionmaker
async with async_sessionmaker() as session:
result = await session.execute(
select(RoomAvailability).order_by(RoomAvailability.date)
)
return result.scalars().all()
return asyncio.run(_fetch())
def test_freerooms_endpoint_complete_set(freerooms_client: TestClient, freerooms_headers):
seed_hotel_if_missing(freerooms_client)
xml = build_request_xml(INVENTORY_A, include_unique_id=True)
response = freerooms_client.post(
"/api/alpinebits/server-2024-10",
data={"action": "OTA_HotelInvCountNotif:FreeRooms", "request": xml},
headers=freerooms_headers,
)
assert response.status_code == HttpStatusCode.OK
assert "<Success" in response.text
rows = fetch_availability(freerooms_client)
assert len(rows) == 3
assert rows[0].count_type_2 == 3
def test_freerooms_endpoint_delta_updates_existing_rows(
freerooms_client: TestClient, freerooms_headers
):
seed_hotel_if_missing(freerooms_client)
complete_xml = build_request_xml(INVENTORY_A, include_unique_id=True)
delta_xml = build_request_xml(INVENTORY_B, include_unique_id=False)
response = freerooms_client.post(
"/api/alpinebits/server-2024-10",
data={"action": "OTA_HotelInvCountNotif:FreeRooms", "request": complete_xml},
headers=freerooms_headers,
)
assert response.status_code == HttpStatusCode.OK
response = freerooms_client.post(
"/api/alpinebits/server-2024-10",
data={"action": "OTA_HotelInvCountNotif:FreeRooms", "request": delta_xml},
headers=freerooms_headers,
)
assert response.status_code == HttpStatusCode.OK
rows = fetch_availability(freerooms_client)
counts = {row.date.isoformat(): row.count_type_2 for row in rows}
assert counts["2025-10-02"] == 1
assert counts["2025-10-01"] == 3
def test_freerooms_endpoint_accepts_gzip_payload(
freerooms_client: TestClient, freerooms_headers
):
seed_hotel_if_missing(freerooms_client)
xml = build_request_xml(INVENTORY_A, include_unique_id=True)
encoded = urllib.parse.urlencode(
{"action": "OTA_HotelInvCountNotif:FreeRooms", "request": xml}
).encode("utf-8")
compressed = gzip.compress(encoded)
headers = {
**freerooms_headers,
"Content-Encoding": "gzip",
"Content-Type": "application/x-www-form-urlencoded",
}
response = freerooms_client.post(
"/api/alpinebits/server-2024-10",
data=compressed,
headers=headers,
)
assert response.status_code == HttpStatusCode.OK
assert "<Success" in response.text
rows = fetch_availability(freerooms_client)
assert len(rows) == 3

View File

@@ -340,13 +340,348 @@ class TestConversionServiceWithImportedData:
assert stats["total_daily_sales"] == 0
assert stats["errors"] == 0
@pytest.mark.asyncio
async def test_duplicate_reservations(self, test_db_session):
"""Test that room entries are correctly updated when reservation status changes.
This test detects a bug where ConversionRoom records are not properly upserted
when the same reservation is processed multiple times with different room numbers.
Scenario:
1. Process reservation with status='request', no revenue, room_number='101'
2. Process reservation with status='reservation', with revenue, room_number='102'
3. Swap: Process same reservations but reversed - first one now has status='reservation'
with room_number='201', second has status='request' with room_number='202'
4. The old room entries (101, 102) should no longer exist in the database
"""
from tests.helpers import ReservationXMLBuilder, MultiReservationXMLBuilder
# First batch: Process two reservations
multi_builder1 = MultiReservationXMLBuilder()
# Reservation 1: Request status, no revenue, room 101
res1_v1 = (
ReservationXMLBuilder(
hotel_id="39054_001",
reservation_id="res_001",
reservation_number="RES-001",
reservation_date="2025-11-14",
reservation_type="request",
)
.set_guest(
guest_id="guest_001",
first_name="Alice",
last_name="Johnson",
email="alice@example.com",
)
.add_room(
arrival="2025-12-01",
departure="2025-12-03",
room_number="101",
status="request",
# No revenue
)
)
multi_builder1.add_reservation(res1_v1)
# Reservation 2: Reservation status, with revenue, room 102
res2_v1 = (
ReservationXMLBuilder(
hotel_id="39054_001",
reservation_id="res_002",
reservation_number="RES-002",
reservation_date="2025-11-15",
reservation_type="reservation",
)
.set_guest(
guest_id="guest_002",
first_name="Bob",
last_name="Smith",
email="bob@example.com",
)
.add_room(
arrival="2025-12-10",
departure="2025-12-12",
room_number="102",
status="reserved",
revenue_logis_per_day=150.0,
)
)
multi_builder1.add_reservation(res2_v1)
xml_content1 = multi_builder1.build_xml()
# Process first batch
service = ConversionService(test_db_session)
stats1 = await service.process_conversion_xml(xml_content1)
assert stats1["total_reservations"] == 2
# Verify rooms exist in database
result = await test_db_session.execute(
select(ConversionRoom).where(ConversionRoom.room_number == "101")
)
room_101 = result.scalar_one_or_none()
assert room_101 is not None, "Room 101 should exist after first processing"
result = await test_db_session.execute(
select(ConversionRoom).where(ConversionRoom.room_number == "102")
)
room_102 = result.scalar_one_or_none()
assert room_102 is not None, "Room 102 should exist after first processing"
# Second batch: Swap the reservations and change room numbers
multi_builder2 = MultiReservationXMLBuilder()
# Reservation 1: NOW has reservation status, with revenue, room 201 (changed from 101)
res1_v2 = (
ReservationXMLBuilder(
hotel_id="39054_001",
reservation_id="res_001", # Same ID
reservation_number="RES-001", # Same number
reservation_date="2025-11-14",
reservation_type="reservation", # Changed from request
)
.set_guest(
guest_id="guest_001",
first_name="Alice",
last_name="Johnson",
email="alice@example.com",
)
.add_room(
arrival="2025-12-01",
departure="2025-12-03",
room_number="201", # Changed from 101
status="reserved",
revenue_logis_per_day=200.0, # Now has revenue
)
)
multi_builder2.add_reservation(res1_v2)
# Reservation 2: NOW has request status, no revenue, room 202 (changed from 102)
res2_v2 = (
ReservationXMLBuilder(
hotel_id="39054_001",
reservation_id="res_002", # Same ID
reservation_number="RES-002", # Same number
reservation_date="2025-11-15",
reservation_type="request", # Changed from reservation
)
.set_guest(
guest_id="guest_002",
first_name="Bob",
last_name="Smith",
email="bob@example.com",
)
.add_room(
arrival="2025-12-10",
departure="2025-12-12",
room_number="202", # Changed from 102
status="request",
# No revenue anymore
)
)
multi_builder2.add_reservation(res2_v2)
xml_content2 = multi_builder2.build_xml()
# Process second batch
stats2 = await service.process_conversion_xml(xml_content2)
assert stats2["total_reservations"] == 2
# BUG DETECTION: Old room entries (101, 102) should NOT exist anymore
# They should have been replaced by new room entries (201, 202)
result = await test_db_session.execute(
select(ConversionRoom).where(ConversionRoom.room_number == "101")
)
room_101_after = result.scalar_one_or_none()
assert room_101_after is None, (
"BUG: Room 101 should no longer exist after reprocessing with room 201. "
"Old room entries are not being removed when reservation is updated."
)
result = await test_db_session.execute(
select(ConversionRoom).where(ConversionRoom.room_number == "102")
)
room_102_after = result.scalar_one_or_none()
assert room_102_after is None, (
"BUG: Room 102 should no longer exist after reprocessing with room 202. "
"Old room entries are not being removed when reservation is updated."
)
# New room entries should exist
result = await test_db_session.execute(
select(ConversionRoom).where(ConversionRoom.room_number == "201")
)
room_201 = result.scalar_one_or_none()
assert room_201 is not None, "Room 201 should exist after second processing"
result = await test_db_session.execute(
select(ConversionRoom).where(ConversionRoom.room_number == "202")
)
room_202 = result.scalar_one_or_none()
assert room_202 is not None, "Room 202 should exist after second processing"
# Verify we only have 2 conversion room records total (not 4)
result = await test_db_session.execute(select(ConversionRoom))
all_rooms = result.scalars().all()
assert len(all_rooms) == 2, (
f"BUG: Expected 2 conversion rooms total, but found {len(all_rooms)}. "
f"Old room entries are not being deleted. Room numbers: {[r.room_number for r in all_rooms]}"
)
class TestXMLBuilderUsage:
"""Demonstrate usage of XML builder helpers for creating test data."""
@pytest.mark.asyncio
async def test_using_xml_builder_for_simple_reservation(self, test_db_session):
"""Example: Create a simple reservation using the XML builder helper."""
from tests.helpers import ReservationXMLBuilder
# Build a reservation with convenient fluent API
xml_content = (
ReservationXMLBuilder(
hotel_id="39054_001",
reservation_id="test_123",
reservation_number="RES-123",
reservation_date="2025-11-14",
)
.set_guest(
guest_id="guest_001",
first_name="John",
last_name="Doe",
email="john@example.com",
country_code="US",
)
.add_room(
arrival="2025-12-01",
departure="2025-12-05",
room_type="DZV",
room_number="101",
revenue_logis_per_day=150.0,
adults=2
)
.build_xml()
)
# Process the XML
service = ConversionService(test_db_session)
stats = await service.process_conversion_xml(xml_content)
assert stats["total_reservations"] == 1
assert stats["total_daily_sales"] == 5 # 4 nights + departure day
@pytest.mark.asyncio
async def test_using_xml_builder_for_multi_room_reservation(
self, test_db_session
):
"""Example: Create a reservation with multiple rooms."""
from tests.helpers import ReservationXMLBuilder
xml_content = (
ReservationXMLBuilder(
hotel_id="39054_001",
reservation_id="test_456",
reservation_number="RES-456",
reservation_date="2025-11-14",
)
.set_guest(
guest_id="guest_002",
first_name="Jane",
last_name="Smith",
email="jane@example.com",
)
.add_room(
arrival="2025-12-01",
departure="2025-12-05",
room_number="101",
revenue_logis_per_day=150.0,
)
.add_room(
arrival="2025-12-01",
departure="2025-12-05",
room_number="102",
revenue_logis_per_day=200.0,
)
.build_xml()
)
service = ConversionService(test_db_session)
stats = await service.process_conversion_xml(xml_content)
assert stats["total_reservations"] == 1
# 2 rooms × 5 daily sales each = 10 total
assert stats["total_daily_sales"] == 10
@pytest.mark.asyncio
async def test_using_multi_reservation_builder(self, test_db_session):
"""Example: Create multiple reservations in one XML document."""
from tests.helpers import ReservationXMLBuilder, MultiReservationXMLBuilder
multi_builder = MultiReservationXMLBuilder()
# Add first reservation
res1 = (
ReservationXMLBuilder(
hotel_id="39054_001",
reservation_id="test_001",
reservation_number="RES-001",
reservation_date="2025-11-14",
)
.set_guest(
guest_id="guest_001",
first_name="Alice",
last_name="Johnson",
email="alice@example.com",
)
.add_room(
arrival="2025-12-01",
departure="2025-12-03",
revenue_logis_per_day=100.0,
)
)
multi_builder.add_reservation(res1)
# Add second reservation
res2 = (
ReservationXMLBuilder(
hotel_id="39054_001",
reservation_id="test_002",
reservation_number="RES-002",
reservation_date="2025-11-15",
)
.set_guest(
guest_id="guest_002",
first_name="Bob",
last_name="Williams",
email="bob@example.com",
)
.add_room(
arrival="2025-12-10",
departure="2025-12-12",
revenue_logis_per_day=150.0,
)
)
multi_builder.add_reservation(res2)
xml_content = multi_builder.build_xml()
# Process the XML
service = ConversionService(test_db_session)
stats = await service.process_conversion_xml(xml_content)
assert stats["total_reservations"] == 2
# Res1: 3 days (2 nights), Res2: 3 days (2 nights) = 6 total
assert stats["total_daily_sales"] == 6
class TestHashedMatchingLogic:
"""Test the hashed matching logic used in ConversionService."""
@pytest.mark.asyncio
async def test_conversion_guest_hashed_fields_are_populated(
self, test_db_session

View File

@@ -0,0 +1,367 @@
"""Unit tests for FreeRoomsAction."""
from __future__ import annotations
from datetime import UTC, datetime
import pytest
import pytest_asyncio
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from alpine_bits_python.alpinebits_server import AlpineBitsClientInfo, Version
from alpine_bits_python.const import HttpStatusCode
from alpine_bits_python.db import Base, Hotel, HotelInventory, RoomAvailability
from alpine_bits_python.free_rooms_action import FreeRoomsAction
TEST_CONFIG = {
"alpine_bits_auth": [
{
"hotel_id": "TESTHOTEL",
"hotel_name": "Unit Test Hotel",
"username": "testuser",
"password": "testpass",
}
]
}
def build_complete_set_xml(body: str, hotel_code: str = "TESTHOTEL") -> str:
return f"""<?xml version="1.0" encoding="UTF-8"?>
<OTA_HotelInvCountNotifRQ xmlns="http://www.opentravel.org/OTA/2003/05" Version="7.000">
<UniqueID Type="16" ID="1" Instance="CompleteSet"/>
<Inventories HotelCode="{hotel_code}" HotelName="Unit Hotel">
{body}
</Inventories>
</OTA_HotelInvCountNotifRQ>"""
def build_delta_xml(body: str, hotel_code: str = "TESTHOTEL") -> str:
return f"""<?xml version="1.0" encoding="UTF-8"?>
<OTA_HotelInvCountNotifRQ xmlns="http://www.opentravel.org/OTA/2003/05" Version="7.000">
<Inventories HotelCode="{hotel_code}" HotelName="Unit Hotel">
{body}
</Inventories>
</OTA_HotelInvCountNotifRQ>"""
def daily_inventory(start: str, end: str, inv_type: str = "DBL", count: int = 3) -> str:
return f"""
<Inventory>
<StatusApplicationControl Start="{start}" End="{end}" InvTypeCode="{inv_type}"/>
<InvCounts>
<InvCount CountType="2" Count="{count}"/>
</InvCounts>
</Inventory>
"""
@pytest_asyncio.fixture
async def db_engine():
engine = create_async_engine("sqlite+aiosqlite:///:memory:", echo=False)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
await engine.dispose()
@pytest_asyncio.fixture
async def db_session(db_engine):
session_factory = async_sessionmaker(db_engine, expire_on_commit=False, class_=AsyncSession)
async with session_factory() as session:
yield session
async def insert_test_hotel(session: AsyncSession, hotel_id: str = "TESTHOTEL"):
hotel = Hotel(
hotel_id=hotel_id,
hotel_name="Unit Test Hotel",
username="testuser",
password_hash="bcrypt-hash",
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
is_active=True,
)
session.add(hotel)
await session.commit()
return hotel
def make_action() -> FreeRoomsAction:
return FreeRoomsAction(config=TEST_CONFIG)
def make_client_info() -> AlpineBitsClientInfo:
return AlpineBitsClientInfo(username="testuser", password="testpass")
@pytest.mark.asyncio
async def test_complete_set_creates_inventory_and_availability(db_session: AsyncSession):
await insert_test_hotel(db_session)
action = make_action()
xml = build_complete_set_xml(
daily_inventory("2025-01-01", "2025-01-03", inv_type="DBL", count=4)
)
response = await action.handle(
action="OTA_HotelInvCountNotif:FreeRooms",
request_xml=xml,
version=Version.V2024_10,
client_info=make_client_info(),
dbsession=db_session,
)
assert response.status_code == HttpStatusCode.OK
inventories = (await db_session.execute(select(HotelInventory))).scalars().all()
assert len(inventories) == 1
assert inventories[0].inv_type_code == "DBL"
rows = (
await db_session.execute(
select(RoomAvailability).order_by(RoomAvailability.date)
)
).scalars().all()
assert len(rows) == 3
assert rows[0].count_type_2 == 4
assert rows[0].update_type == "CompleteSet"
@pytest.mark.asyncio
async def test_complete_set_replaces_previous_availability(db_session: AsyncSession):
await insert_test_hotel(db_session)
action = make_action()
xml_initial = build_complete_set_xml(daily_inventory("2025-02-01", "2025-02-02", count=5))
xml_updated = build_complete_set_xml(daily_inventory("2025-02-01", "2025-02-01", count=1))
await action.handle(
"OTA_HotelInvCountNotif:FreeRooms",
xml_initial,
Version.V2024_10,
make_client_info(),
db_session,
)
await action.handle(
"OTA_HotelInvCountNotif:FreeRooms",
xml_updated,
Version.V2024_10,
make_client_info(),
db_session,
)
rows = (
await db_session.execute(select(RoomAvailability).order_by(RoomAvailability.date))
).scalars().all()
assert len(rows) == 1
assert rows[0].date.isoformat() == "2025-02-01"
assert rows[0].count_type_2 == 1
@pytest.mark.asyncio
async def test_delta_updates_only_specified_dates(db_session: AsyncSession):
await insert_test_hotel(db_session)
action = make_action()
complete_xml = build_complete_set_xml(daily_inventory("2025-03-01", "2025-03-03", count=2))
delta_xml = build_delta_xml(daily_inventory("2025-03-02", "2025-03-02", count=7))
await action.handle(
"OTA_HotelInvCountNotif:FreeRooms",
complete_xml,
Version.V2024_10,
make_client_info(),
db_session,
)
await action.handle(
"OTA_HotelInvCountNotif:FreeRooms",
delta_xml,
Version.V2024_10,
make_client_info(),
db_session,
)
rows = (
await db_session.execute(select(RoomAvailability).order_by(RoomAvailability.date))
).scalars().all()
counts = {row.date.isoformat(): row.count_type_2 for row in rows}
assert counts == {
"2025-03-01": 2,
"2025-03-02": 7,
"2025-03-03": 2,
}
assert all(row.update_type in {"CompleteSet", "Delta"} for row in rows)
@pytest.mark.asyncio
async def test_closing_season_entries_marked_correctly(db_session: AsyncSession):
await insert_test_hotel(db_session)
action = make_action()
xml = build_complete_set_xml(
"""
<Inventory>
<StatusApplicationControl Start="2025-04-01" End="2025-04-02" AllInvCode="true"/>
</Inventory>
<Inventory>
<StatusApplicationControl Start="2025-04-03" End="2025-04-03" InvTypeCode="SGL"/>
</Inventory>
"""
)
response = await action.handle(
"OTA_HotelInvCountNotif:FreeRooms",
xml,
Version.V2024_10,
make_client_info(),
db_session,
)
assert response.status_code == HttpStatusCode.OK
inventories = (await db_session.execute(select(HotelInventory))).scalars().all()
closing_inventory = next(inv for inv in inventories if inv.inv_type_code == "__CLOSE")
assert closing_inventory.inv_code is None
rows = (
await db_session.execute(select(RoomAvailability).order_by(RoomAvailability.date))
).scalars().all()
closing_rows = [row for row in rows if row.is_closing_season]
assert len(closing_rows) == 2
assert all(row.count_type_2 is None for row in closing_rows)
@pytest.mark.asyncio
async def test_closing_season_not_allowed_in_delta(db_session: AsyncSession):
await insert_test_hotel(db_session)
action = make_action()
xml = build_delta_xml(
"""
<Inventory>
<StatusApplicationControl Start="2025-05-01" End="2025-05-02" AllInvCode="true"/>
</Inventory>
"""
)
response = await action.handle(
"OTA_HotelInvCountNotif:FreeRooms",
xml,
Version.V2024_10,
make_client_info(),
db_session,
)
assert response.status_code == HttpStatusCode.BAD_REQUEST
assert "Closing seasons" in response.xml_content
@pytest.mark.asyncio
async def test_missing_invtypecode_returns_error(db_session: AsyncSession):
await insert_test_hotel(db_session)
action = make_action()
xml = build_complete_set_xml(
"""
<Inventory>
<StatusApplicationControl Start="2025-06-01" End="2025-06-02"/>
</Inventory>
"""
)
response = await action.handle(
"OTA_HotelInvCountNotif:FreeRooms",
xml,
Version.V2024_10,
make_client_info(),
db_session,
)
assert response.status_code == HttpStatusCode.BAD_REQUEST
assert "InvTypeCode is required" in response.xml_content
@pytest.mark.asyncio
async def test_duplicate_count_type_rejected(db_session: AsyncSession):
await insert_test_hotel(db_session)
action = make_action()
xml = build_complete_set_xml(
"""
<Inventory>
<StatusApplicationControl Start="2025-07-01" End="2025-07-01" InvTypeCode="SGL"/>
<InvCounts>
<InvCount CountType="2" Count="3"/>
<InvCount CountType="2" Count="4"/>
</InvCounts>
</Inventory>
"""
)
response = await action.handle(
"OTA_HotelInvCountNotif:FreeRooms",
xml,
Version.V2024_10,
make_client_info(),
db_session,
)
assert response.status_code == HttpStatusCode.BAD_REQUEST
assert "Duplicate CountType" in response.xml_content
@pytest.mark.asyncio
async def test_invalid_date_range_returns_error(db_session: AsyncSession):
await insert_test_hotel(db_session)
action = make_action()
xml = build_complete_set_xml(
"""
<Inventory>
<StatusApplicationControl Start="2025-08-10" End="2025-08-01" InvTypeCode="DBL"/>
</Inventory>
"""
)
response = await action.handle(
"OTA_HotelInvCountNotif:FreeRooms",
xml,
Version.V2024_10,
make_client_info(),
db_session,
)
assert response.status_code == HttpStatusCode.BAD_REQUEST
assert "End date cannot be before Start date" in response.xml_content
@pytest.mark.asyncio
async def test_invalid_credentials_return_unauthorized(db_session: AsyncSession):
await insert_test_hotel(db_session)
action = make_action()
bad_client = AlpineBitsClientInfo(username="testuser", password="wrongpass")
xml = build_complete_set_xml(daily_inventory("2025-09-01", "2025-09-01"))
response = await action.handle(
"OTA_HotelInvCountNotif:FreeRooms",
xml,
Version.V2024_10,
bad_client,
db_session,
)
assert response.status_code == HttpStatusCode.UNAUTHORIZED
assert "Unauthorized" in response.xml_content
@pytest.mark.asyncio
async def test_invalid_xml_returns_error(db_session: AsyncSession):
await insert_test_hotel(db_session)
action = make_action()
client_info = make_client_info()
response = await action.handle(
"OTA_HotelInvCountNotif:FreeRooms",
"<invalid",
Version.V2024_10,
client_info,
db_session,
)
assert response.status_code == HttpStatusCode.BAD_REQUEST
assert "Invalid XML payload" in response.xml_content

View File

@@ -0,0 +1,218 @@
"""Tests for webhook-related Pydantic schemas."""
import hashlib
import json
from datetime import datetime
import pytest
from pydantic import ValidationError
from alpine_bits_python.const import WebhookStatus
from alpine_bits_python.schemas import (
HotelData,
WebhookEndpointData,
WebhookRequestData,
)
class TestHotelData:
"""Tests for HotelData schema."""
def test_valid_hotel_data(self):
"""Test creating a valid HotelData instance."""
data = HotelData(
hotel_id="hotel123",
hotel_name="Test Hotel",
username="admin",
password_hash="hashed_password_123",
)
assert data.hotel_id == "hotel123"
assert data.hotel_name == "Test Hotel"
assert data.username == "admin"
assert data.password_hash == "hashed_password_123"
assert data.is_active is True
assert isinstance(data.created_at, datetime)
def test_whitespace_stripping(self):
"""Test that whitespace is stripped from string fields."""
data = HotelData(
hotel_id=" hotel123 ",
hotel_name=" Test Hotel ",
username=" admin ",
password_hash="hashed_password_123",
)
assert data.hotel_id == "hotel123"
assert data.hotel_name == "Test Hotel"
assert data.username == "admin"
def test_optional_fields(self):
"""Test that optional fields can be None."""
data = HotelData(
hotel_id="hotel123",
hotel_name="Test Hotel",
username="admin",
password_hash="hashed_password_123",
meta_account_id=None,
google_account_id=None,
)
assert data.meta_account_id is None
assert data.google_account_id is None
class TestWebhookEndpointData:
"""Tests for WebhookEndpointData schema."""
def test_valid_webhook_endpoint(self):
"""Test creating a valid WebhookEndpointData instance."""
data = WebhookEndpointData(
hotel_id="hotel123",
webhook_secret="secret_abc123",
webhook_type="wix_form",
)
assert data.hotel_id == "hotel123"
assert data.webhook_secret == "secret_abc123"
assert data.webhook_type == "wix_form"
assert data.is_enabled is True
assert isinstance(data.created_at, datetime)
def test_webhook_endpoint_with_description(self):
"""Test WebhookEndpointData with optional description."""
data = WebhookEndpointData(
hotel_id="hotel123",
webhook_secret="secret_abc123",
webhook_type="generic",
description="Main booking form",
)
assert data.description == "Main booking form"
def test_whitespace_stripping(self):
"""Test that whitespace is stripped from string fields."""
data = WebhookEndpointData(
hotel_id=" hotel123 ",
webhook_secret=" secret_abc123 ",
webhook_type=" wix_form ",
)
assert data.hotel_id == "hotel123"
assert data.webhook_secret == "secret_abc123"
assert data.webhook_type == "wix_form"
class TestWebhookRequestData:
"""Tests for WebhookRequestData schema."""
def test_auto_calculate_payload_hash(self):
"""Test that payload_hash is auto-calculated from payload_json."""
payload = {"name": "John", "email": "john@example.com"}
data = WebhookRequestData(payload_json=payload)
# Verify hash was calculated
assert data.payload_hash is not None
assert len(data.payload_hash) == 64 # SHA256 produces 64 hex chars
# Verify it matches the expected hash (same algorithm as api.py)
payload_json_str = json.dumps(payload, sort_keys=True)
expected_hash = hashlib.sha256(payload_json_str.encode("utf-8")).hexdigest()
assert data.payload_hash == expected_hash
def test_explicit_payload_hash(self):
"""Test providing payload_hash explicitly (for purged payloads)."""
explicit_hash = "a" * 64
data = WebhookRequestData(
payload_json=None,
payload_hash=explicit_hash,
)
assert data.payload_hash == explicit_hash
assert data.payload_json is None
def test_payload_hash_required(self):
"""Test that payload_hash is required (either calculated or explicit)."""
with pytest.raises(ValidationError) as exc_info:
WebhookRequestData(
payload_json=None,
payload_hash=None,
)
assert "payload_hash is required" in str(exc_info.value)
def test_consistent_hashing(self):
"""Test that the same payload always produces the same hash."""
payload = {"b": 2, "a": 1, "c": 3} # Unordered keys
data1 = WebhookRequestData(payload_json=payload.copy())
data2 = WebhookRequestData(payload_json=payload.copy())
assert data1.payload_hash == data2.payload_hash
def test_default_status(self):
"""Test that status defaults to PENDING."""
data = WebhookRequestData(payload_json={"test": "data"})
assert data.status == WebhookStatus.PENDING
def test_status_normalization(self):
"""Test that status is normalized to WebhookStatus enum."""
data = WebhookRequestData(
payload_json={"test": "data"},
status="completed", # String
)
assert data.status == WebhookStatus.COMPLETED
assert isinstance(data.status, WebhookStatus)
def test_retry_count_default(self):
"""Test that retry_count defaults to 0."""
data = WebhookRequestData(payload_json={"test": "data"})
assert data.retry_count == 0
def test_optional_foreign_keys(self):
"""Test optional foreign key fields."""
data = WebhookRequestData(
payload_json={"test": "data"},
webhook_endpoint_id=123,
hotel_id="hotel456",
)
assert data.webhook_endpoint_id == 123
assert data.hotel_id == "hotel456"
def test_result_tracking(self):
"""Test result tracking fields."""
data = WebhookRequestData(
payload_json={"test": "data"},
created_customer_id=1,
created_reservation_id=2,
)
assert data.created_customer_id == 1
assert data.created_reservation_id == 2
def test_purged_payload(self):
"""Test representing a purged webhook request (after processing)."""
explicit_hash = "b" * 64
data = WebhookRequestData(
payload_json=None,
payload_hash=explicit_hash,
status=WebhookStatus.COMPLETED,
purged_at=datetime.now(),
)
assert data.payload_json is None
assert data.payload_hash == explicit_hash
assert data.status == WebhookStatus.COMPLETED
assert data.purged_at is not None
def test_processing_metadata(self):
"""Test processing tracking fields."""
now = datetime.now()
data = WebhookRequestData(
payload_json={"test": "data"},
status=WebhookStatus.PROCESSING,
processing_started_at=now,
)
assert data.status == WebhookStatus.PROCESSING
assert data.processing_started_at == now
assert data.processing_completed_at is None
def test_request_metadata(self):
"""Test request metadata fields."""
data = WebhookRequestData(
payload_json={"test": "data"},
source_ip="192.168.1.1",
user_agent="Mozilla/5.0",
)
assert data.source_ip == "192.168.1.1"
assert data.user_agent == "Mozilla/5.0"

View File

@@ -0,0 +1,340 @@
"""Tests for webhook duplicate handling and reprocessing.
This module tests:
- Duplicate detection during normal operation
- Duplicate handling during app startup reprocessing
- Stuck webhooks that are duplicates
"""
import asyncio
import json
import uuid
from datetime import UTC, datetime
from pathlib import Path
from unittest.mock import patch
import pytest
import pytest_asyncio
from fastapi.testclient import TestClient
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from alpine_bits_python.api import app
from alpine_bits_python.const import WebhookStatus
from alpine_bits_python.db import Base, Reservation, WebhookRequest
from alpine_bits_python.db_setup import reprocess_stuck_webhooks
from alpine_bits_python.schemas import WebhookRequestData
from alpine_bits_python.webhook_processor import initialize_webhook_processors, webhook_registry
@pytest_asyncio.fixture
async def test_db_engine():
"""Create an in-memory SQLite database for testing."""
engine = create_async_engine(
"sqlite+aiosqlite:///:memory:",
echo=False,
)
# Create tables
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
# Cleanup
await engine.dispose()
@pytest.fixture
def test_config():
"""Test configuration."""
return {
"server": {
"codecontext": "ADVERTISING",
"code": "70597314",
"companyname": "99tales Gmbh",
"res_id_source_context": "99tales",
},
"alpine_bits_auth": [
{
"hotel_id": "HOTEL123",
"hotel_name": "Test Hotel",
"username": "testuser",
"password": "testpass",
}
],
"default_hotel_code": "HOTEL123",
"default_hotel_name": "Test Hotel",
"database": {"url": "sqlite+aiosqlite:///:memory:"},
}
@pytest.fixture
def sample_wix_form_data():
"""Sample Wix form submission data with FIXED submissionId for duplicate testing."""
return {
"data": {
"submissionId": "FIXED-DUPLICATE-TEST-ID", # Fixed ID to trigger duplicates
"submissionTime": "2025-10-07T05:48:41.855Z",
"contact": {
"name": {"first": "John", "last": "Doe"},
"email": "john.doe.duplicate.test@example.com",
"phones": [{"e164Phone": "+1234567890"}],
"locale": "en-US",
"contactId": "contact-duplicate-test",
},
"field:anrede": "Mr.",
"field:form_field_5a7b": True,
"field:date_picker_a7c8": "2024-12-25",
"field:date_picker_7e65": "2024-12-31",
"field:number_7cf5": "2",
"field:anzahl_kinder": "1",
"field:alter_kind_1": "8",
"field:angebot_auswaehlen": "Christmas Special",
"field:utm_source": "google",
"field:utm_medium": "cpc",
"field:utm_campaign": "winter2024",
"field:fbclid": "test_fbclid_123",
"field:long_answer_3524": "Late check-in please",
}
}
class TestWebhookDuplicateHandling:
"""Test duplicate webhook handling during normal operation."""
def test_duplicate_webhook_during_operation(self, test_config, sample_wix_form_data):
"""Test that sending the same webhook twice handles duplicates gracefully."""
# Create engine and tables
engine = create_async_engine(
"sqlite+aiosqlite:///:memory:",
echo=False,
)
async def create_tables():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
asyncio.run(create_tables())
# Mock config and database to use our test database
with patch("alpine_bits_python.api.load_config", return_value=test_config), \
patch("alpine_bits_python.api.create_database_engine", return_value=engine):
from alpine_bits_python.alpinebits_server import AlpineBitsServer
# Setup app state
app.state.engine = engine
app.state.async_sessionmaker = async_sessionmaker(
engine, expire_on_commit=False
)
app.state.config = test_config
app.state.alpine_bits_server = AlpineBitsServer(test_config)
with TestClient(app) as client:
# First submission - should succeed
response1 = client.post(
"/api/webhook/wix-form",
json=sample_wix_form_data
)
assert response1.status_code == 200
data1 = response1.json()
assert data1["status"] == "success"
# Second submission with same data - should detect duplicate at API level
response2 = client.post(
"/api/webhook/wix-form",
json=sample_wix_form_data
)
assert response2.status_code == 200
data2 = response2.json()
# API returns success for already-processed webhooks, but sets duplicate flag
assert data2["status"] == "success"
assert data2.get("duplicate") is True
assert "already processed" in data2["message"].lower()
# Cleanup
asyncio.run(engine.dispose())
class TestWebhookReprocessing:
"""Test webhook reprocessing on app restart."""
@pytest.mark.asyncio
async def test_reprocess_stuck_duplicate_webhook(self, test_db_engine, test_config):
"""Test that stuck webhooks that are duplicates are handled correctly on restart."""
AsyncSessionLocal = async_sessionmaker(test_db_engine, expire_on_commit=False)
# Step 1: Process a webhook normally to create a reservation
from alpine_bits_python.webhook_processor import process_wix_form_submission
test_form_file = Path(__file__).parent / "test_data" / f"test_form{1}.json"
if not test_form_file.exists():
pytest.skip(f"{test_form_file.name} not found")
# Load test form data
with test_form_file.open() as f:
test_data = json.load(f)
test_data["data"]["submissionId"] = "STUCK-WEBHOOK-TEST-ID" # Fixed ID for duplicate test
async with AsyncSessionLocal() as session:
result = await process_wix_form_submission(
test_data, session, config=test_config
)
await session.commit()
assert result["status"] == "success"
# Step 2: Verify the reservation was created
async with AsyncSessionLocal() as session:
stmt = select(Reservation).where(
Reservation.unique_id == "STUCK-WEBHOOK-TEST-ID"
)
result = await session.execute(stmt)
reservation = result.scalar_one_or_none()
assert reservation is not None, "Reservation should exist"
assert reservation.unique_id == "STUCK-WEBHOOK-TEST-ID"
# Step 3: Manually create a webhook request stuck in "processing" status
# This simulates a webhook that was being processed when the app crashed
from alpine_bits_python.db import WebhookEndpoint, Hotel
async with AsyncSessionLocal() as session:
# Create hotel
hotel = Hotel(
hotel_id="HOTEL123",
hotel_name="Test Hotel",
username="testuser",
password_hash="dummy",
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
is_active=True,
)
session.add(hotel)
await session.flush()
# Create webhook endpoint
endpoint = WebhookEndpoint(
hotel_id="HOTEL123",
webhook_type="wix_form",
webhook_secret="test-secret-123",
is_enabled=True,
created_at=datetime.now(UTC),
)
session.add(endpoint)
await session.flush()
# Create stuck webhook request with the SAME payload
stuck_webhook_data = WebhookRequestData(
webhook_endpoint_id=endpoint.id,
hotel_id="HOTEL123",
payload_json=test_data,
status=WebhookStatus.PROCESSING, # Stuck in processing!
created_at=datetime.now(UTC),
)
stuck_webhook = WebhookRequest(**stuck_webhook_data.model_dump())
session.add(stuck_webhook)
await session.commit()
# initialize wix_form processor
initialize_webhook_processors()
# Step 4: Run reprocessing (simulates app restart)
await reprocess_stuck_webhooks(AsyncSessionLocal, test_config)
# Step 5: Verify the stuck webhook was marked as completed (not failed)
async with AsyncSessionLocal() as session:
stmt = select(WebhookRequest).where(
WebhookRequest.status == WebhookStatus.COMPLETED
)
result = await session.execute(stmt)
completed_webhooks = result.scalars().all()
assert len(completed_webhooks) == 1
assert completed_webhooks[0].last_error is None
# Verify no failed webhooks
stmt = select(WebhookRequest).where(
WebhookRequest.status == WebhookStatus.FAILED
)
result = await session.execute(stmt)
failed_webhooks = result.scalars().all()
assert len(failed_webhooks) == 0
# Step 6: Verify only ONE reservation exists (no duplicate)
async with AsyncSessionLocal() as session:
stmt = select(Reservation)
result = await session.execute(stmt)
reservations = result.scalars().all()
assert len(reservations) == 1
class TestWebhookReprocessingNeverBlocksStartup:
"""Test that reprocessing never blocks app startup."""
@pytest.mark.asyncio
async def test_reprocessing_error_does_not_block_startup(
self, test_db_engine, test_config
):
"""Test that even if reprocessing fails, app startup continues."""
AsyncSessionLocal = async_sessionmaker(test_db_engine, expire_on_commit=False)
from alpine_bits_python.db import WebhookEndpoint, Hotel
# Create a stuck webhook with invalid data that will cause processing to fail
async with AsyncSessionLocal() as session:
# Create hotel
hotel = Hotel(
hotel_id="HOTEL123",
hotel_name="Test Hotel",
username="testuser",
password_hash="dummy",
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
is_active=True,
)
session.add(hotel)
await session.flush()
# Create webhook endpoint
endpoint = WebhookEndpoint(
hotel_id="HOTEL123",
webhook_type="wix_form",
webhook_secret="test-secret-123",
is_enabled=True,
created_at=datetime.now(UTC),
)
session.add(endpoint)
await session.flush()
webhook_request = WebhookRequestData(
hotel_id="HOTEL123",
payload_json={"data": {"invalid": "data"}}, # Missing required fields
status=WebhookStatus.PROCESSING
)
stuck_webhook = WebhookRequest(**webhook_request.model_dump())
session.add(stuck_webhook) ## Cannot add the stuck webhook. Integrity Error payload_hash is missing
await session.commit()
# This should NOT raise an exception - it should log and continue
try:
await reprocess_stuck_webhooks(AsyncSessionLocal, test_config)
except Exception as e:
pytest.fail(
f"reprocess_stuck_webhooks should NEVER raise exceptions, but got: {e}"
)
# Verify the webhook was marked as failed
async with AsyncSessionLocal() as session:
stmt = select(WebhookRequest).where(
WebhookRequest.status == WebhookStatus.FAILED
)
result = await session.execute(stmt)
failed_webhooks = result.scalars().all()
assert len(failed_webhooks) == 1
assert failed_webhooks[0].last_error is not None

327
tests/test_xml_builders.py Normal file
View File

@@ -0,0 +1,327 @@
"""Tests for XML builder helpers."""
import pytest
from xml.etree import ElementTree as ET
from tests.helpers.xml_builders import (
ReservationXMLBuilder,
MultiReservationXMLBuilder,
RoomReservationBuilder,
)
class TestRoomReservationBuilder:
"""Test RoomReservationBuilder functionality."""
def test_basic_room_without_revenue(self):
"""Test creating a basic room reservation without revenue."""
builder = RoomReservationBuilder(
arrival="2025-12-01",
departure="2025-12-03",
room_type="DZV",
room_number="101",
)
elem = builder.build()
assert elem.tag == "roomReservation"
assert elem.get("arrival") == "2025-12-01"
assert elem.get("departure") == "2025-12-03"
assert elem.get("roomType") == "DZV"
assert elem.get("roomNumber") == "101"
# Check daily sales - should have 3 entries (12-01, 12-02, 12-03)
daily_sales = elem.find("dailySales")
assert daily_sales is not None
daily_sale_elements = daily_sales.findall("dailySale")
assert len(daily_sale_elements) == 3
# First two should have no revenue attributes
assert daily_sale_elements[0].get("revenueTotal") is None
assert daily_sale_elements[0].get("revenueLogis") is None
def test_room_with_revenue(self):
"""Test creating a room with revenue per day."""
builder = RoomReservationBuilder(
arrival="2025-12-01",
departure="2025-12-03",
room_type="DZV",
room_number="101",
revenue_logis_per_day=150.0,
)
elem = builder.build()
daily_sales = elem.find("dailySales")
daily_sale_elements = daily_sales.findall("dailySale")
# Should have 3 entries total
assert len(daily_sale_elements) == 3
# First two days should have revenue
assert daily_sale_elements[0].get("revenueTotal") == "150.0"
assert daily_sale_elements[0].get("revenueLogis") == "150.0"
assert daily_sale_elements[1].get("revenueTotal") == "150.0"
assert daily_sale_elements[1].get("revenueLogis") == "150.0"
# Departure day should have no revenue
assert daily_sale_elements[2].get("revenueTotal") is None
assert daily_sale_elements[2].get("revenueLogis") is None
def test_room_with_children_and_infants(self):
"""Test room with children and infants attributes."""
builder = RoomReservationBuilder(
arrival="2025-12-01",
departure="2025-12-02",
adults=2,
children=1,
infants=1,
)
elem = builder.build()
assert elem.get("adults") == "2"
assert elem.get("children") == "1"
assert elem.get("infants") == "1"
class TestReservationXMLBuilder:
"""Test ReservationXMLBuilder functionality."""
def test_basic_reservation(self):
"""Test creating a basic reservation with one room."""
builder = ReservationXMLBuilder(
hotel_id="39054_001",
reservation_id="12345",
reservation_number="RES-001",
reservation_date="2025-11-14",
)
builder.set_guest(
guest_id="guest_001",
first_name="John",
last_name="Doe",
email="john@example.com",
)
builder.add_room(
arrival="2025-12-01",
departure="2025-12-05",
revenue_logis_per_day=150.0,
)
xml_string = builder.build_xml()
# Parse and verify structure
root = ET.fromstring(xml_string)
assert root.tag == "reservations"
reservation = root.find("reservation")
assert reservation is not None
assert reservation.get("hotelID") == "39054_001"
assert reservation.get("id") == "12345"
assert reservation.get("number") == "RES-001"
guest = reservation.find("guest")
assert guest is not None
assert guest.get("firstName") == "John"
assert guest.get("lastName") == "Doe"
assert guest.get("email") == "john@example.com"
room_reservations = reservation.find("roomReservations")
assert room_reservations is not None
rooms = room_reservations.findall("roomReservation")
assert len(rooms) == 1
def test_reservation_with_multiple_rooms(self):
"""Test reservation with multiple rooms."""
builder = ReservationXMLBuilder(
hotel_id="39054_001",
reservation_id="12345",
reservation_number="RES-001",
reservation_date="2025-11-14",
)
builder.set_guest(
guest_id="guest_001",
first_name="John",
last_name="Doe",
email="john@example.com",
)
builder.add_room(
arrival="2025-12-01",
departure="2025-12-05",
room_number="101",
revenue_logis_per_day=150.0,
)
builder.add_room(
arrival="2025-12-01",
departure="2025-12-05",
room_number="102",
revenue_logis_per_day=200.0,
)
xml_string = builder.build_xml()
root = ET.fromstring(xml_string)
reservation = root.find("reservation")
room_reservations = reservation.find("roomReservations")
rooms = room_reservations.findall("roomReservation")
assert len(rooms) == 2
assert rooms[0].get("roomNumber") == "101"
assert rooms[1].get("roomNumber") == "102"
def test_reservation_with_advertising_data(self):
"""Test reservation with advertising campaign data."""
builder = ReservationXMLBuilder(
hotel_id="39054_001",
reservation_id="12345",
reservation_number="RES-001",
reservation_date="2025-11-14",
advertising_medium="99TALES",
advertising_partner="google",
advertising_campagne="EAIaIQobChMI...",
)
builder.set_guest(
guest_id="guest_001",
first_name="John",
last_name="Doe",
email="john@example.com",
)
builder.add_room(
arrival="2025-12-01",
departure="2025-12-05",
)
xml_string = builder.build_xml()
root = ET.fromstring(xml_string)
reservation = root.find("reservation")
assert reservation.get("advertisingMedium") == "99TALES"
assert reservation.get("advertisingPartner") == "google"
assert reservation.get("advertisingCampagne") == "EAIaIQobChMI..."
class TestMultiReservationXMLBuilder:
"""Test MultiReservationXMLBuilder functionality."""
def test_multiple_reservations(self):
"""Test creating XML with multiple reservations."""
multi_builder = MultiReservationXMLBuilder()
# Add first reservation
res1 = ReservationXMLBuilder(
hotel_id="39054_001",
reservation_id="12345",
reservation_number="RES-001",
reservation_date="2025-11-14",
)
res1.set_guest(
guest_id="guest_001",
first_name="John",
last_name="Doe",
email="john@example.com",
)
res1.add_room(
arrival="2025-12-01",
departure="2025-12-03",
revenue_logis_per_day=150.0,
)
multi_builder.add_reservation(res1)
# Add second reservation
res2 = ReservationXMLBuilder(
hotel_id="39054_001",
reservation_id="12346",
reservation_number="RES-002",
reservation_date="2025-11-15",
)
res2.set_guest(
guest_id="guest_002",
first_name="Jane",
last_name="Smith",
email="jane@example.com",
)
res2.add_room(
arrival="2025-12-10",
departure="2025-12-12",
revenue_logis_per_day=200.0,
)
multi_builder.add_reservation(res2)
xml_string = multi_builder.build_xml()
root = ET.fromstring(xml_string)
assert root.tag == "reservations"
reservations = root.findall("reservation")
assert len(reservations) == 2
assert reservations[0].get("id") == "12345"
assert reservations[1].get("id") == "12346"
class TestConvenienceFeatures:
"""Test convenience features for common test scenarios."""
def test_simple_one_liner_reservation(self):
"""Test creating a simple reservation in a fluent style."""
xml = (
ReservationXMLBuilder(
hotel_id="39054_001",
reservation_id="12345",
reservation_number="RES-001",
reservation_date="2025-11-14",
)
.set_guest(
guest_id="guest_001",
first_name="John",
last_name="Doe",
email="john@example.com",
)
.add_room(
arrival="2025-12-01",
departure="2025-12-05",
revenue_logis_per_day=160.0,
)
.build_xml()
)
assert '<?xml version="1.0" ?>' in xml
assert 'hotelID="39054_001"' in xml
assert 'revenueLogis="160.0"' in xml
def test_revenue_calculation_for_multi_day_stay(self):
"""Test that daily sales are correctly generated for multi-day stays."""
builder = ReservationXMLBuilder(
hotel_id="39054_001",
reservation_id="12345",
reservation_number="RES-001",
reservation_date="2025-11-14",
)
builder.set_guest(
guest_id="guest_001",
first_name="John",
last_name="Doe",
email="john@example.com",
)
# 7-day stay (June 25 - July 2, 7 nights)
builder.add_room(
arrival="2026-06-25",
departure="2026-07-02",
revenue_logis_per_day=160.0,
)
elem = builder.build()
room_reservations = elem.find("roomReservations")
room = room_reservations.find("roomReservation")
daily_sales = room.find("dailySales")
daily_sale_elements = daily_sales.findall("dailySale")
# Should have 8 daily sale entries (7 nights + departure day)
assert len(daily_sale_elements) == 8
# First 7 should have revenue
for i in range(7):
assert daily_sale_elements[i].get("revenueLogis") == "160.0"
# Departure day should not have revenue
assert daily_sale_elements[7].get("revenueLogis") is None
if __name__ == "__main__":
pytest.main([__file__, "-v"])