3 Commits

Author SHA1 Message Date
Jonas Linter
3aa2f0b7f4 Offers get extracted from generic webhooks and added to reservations 2025-11-27 19:59:38 +01:00
Jonas Linter
c402b28b72 Duplicate detection improved but refactoring necessary to make the whole thing more managable 2025-11-27 19:35:30 +01:00
Jonas Linter
a07edfe3ec Free rooms first implementation 2025-11-27 18:57:45 +01:00
9 changed files with 1866 additions and 91 deletions

View File

@@ -0,0 +1,108 @@
"""Add hotel inventory and room availability tables
Revision ID: b2cfe2d3aabc
Revises: e7ee03d8f430
Create Date: 2025-11-27 12:00:00.000000
"""
from collections.abc import Sequence
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "b2cfe2d3aabc"
down_revision: str | Sequence[str] | None = "e7ee03d8f430"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
"""Upgrade schema with inventory and availability tables."""
op.create_table(
"hotel_inventory",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("hotel_id", sa.String(length=50), nullable=False),
sa.Column("inv_type_code", sa.String(length=8), nullable=False),
sa.Column("inv_code", sa.String(length=16), nullable=True),
sa.Column("room_name", sa.String(length=200), nullable=True),
sa.Column("max_occupancy", sa.Integer(), nullable=True),
sa.Column("source", sa.String(length=20), nullable=False),
sa.Column("first_seen", sa.DateTime(timezone=True), nullable=False),
sa.Column("last_updated", sa.DateTime(timezone=True), nullable=False),
sa.ForeignKeyConstraint(["hotel_id"], ["hotels.hotel_id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_hotel_inventory_hotel_id"),
"hotel_inventory",
["hotel_id"],
unique=False,
)
op.create_index(
op.f("ix_hotel_inventory_inv_type_code"),
"hotel_inventory",
["inv_type_code"],
unique=False,
)
op.create_index(
op.f("ix_hotel_inventory_inv_code"),
"hotel_inventory",
["inv_code"],
unique=False,
)
op.create_index(
"uq_hotel_inventory_unique_key",
"hotel_inventory",
["hotel_id", "inv_type_code", sa.text("COALESCE(inv_code, '')")],
unique=True,
)
op.create_table(
"room_availability",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("inventory_id", sa.Integer(), nullable=False),
sa.Column("date", sa.Date(), nullable=False),
sa.Column("count_type_2", sa.Integer(), nullable=True),
sa.Column("count_type_6", sa.Integer(), nullable=True),
sa.Column("count_type_9", sa.Integer(), nullable=True),
sa.Column("is_closing_season", sa.Boolean(), nullable=False, server_default=sa.false()),
sa.Column("last_updated", sa.DateTime(timezone=True), nullable=False),
sa.Column("update_type", sa.String(length=20), nullable=False),
sa.ForeignKeyConstraint(["inventory_id"], ["hotel_inventory.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("inventory_id", "date", name="uq_room_availability_unique_key"),
)
op.create_index(
op.f("ix_room_availability_inventory_id"),
"room_availability",
["inventory_id"],
unique=False,
)
op.create_index(
op.f("ix_room_availability_date"),
"room_availability",
["date"],
unique=False,
)
op.create_index(
"idx_room_availability_inventory_date",
"room_availability",
["inventory_id", "date"],
unique=False,
)
def downgrade() -> None:
"""Downgrade schema by removing availability tables."""
op.drop_index("idx_room_availability_inventory_date", table_name="room_availability")
op.drop_index(op.f("ix_room_availability_date"), table_name="room_availability")
op.drop_index(op.f("ix_room_availability_inventory_id"), table_name="room_availability")
op.drop_table("room_availability")
op.drop_index("uq_hotel_inventory_unique_key", table_name="hotel_inventory")
op.drop_index(op.f("ix_hotel_inventory_inv_code"), table_name="hotel_inventory")
op.drop_index(op.f("ix_hotel_inventory_inv_type_code"), table_name="hotel_inventory")
op.drop_index(op.f("ix_hotel_inventory_hotel_id"), table_name="hotel_inventory")
op.drop_table("hotel_inventory")

View File

@@ -86,6 +86,10 @@ class AlpineBitsActionName(Enum):
"action_OTA_HotelRatePlan_BaseRates", "action_OTA_HotelRatePlan_BaseRates",
"OTA_HotelRatePlan:BaseRates", "OTA_HotelRatePlan:BaseRates",
) )
OTA_HOTEL_INV_COUNT_NOTIF_FREE_ROOMS = (
"action_OTA_HotelInvCountNotif",
"OTA_HotelInvCountNotif:FreeRooms",
)
def __init__(self, capability_name: str, request_name: str): def __init__(self, capability_name: str, request_name: str):
self.capability_name = capability_name self.capability_name = capability_name
@@ -819,3 +823,7 @@ class AlpineBitsServer:
return False return False
return True return True
# Ensure FreeRoomsAction is registered with ServerCapabilities discovery
from .free_rooms_action import FreeRoomsAction # noqa: E402,F401

View File

@@ -18,6 +18,8 @@ from sqlalchemy import (
Index, Index,
Integer, Integer,
String, String,
UniqueConstraint,
func,
) )
from sqlalchemy.exc import DBAPIError from sqlalchemy.exc import DBAPIError
from sqlalchemy.ext.asyncio import ( from sqlalchemy.ext.asyncio import (
@@ -679,6 +681,66 @@ class ConversionRoom(Base):
conversion = relationship("Conversion", back_populates="conversion_rooms") conversion = relationship("Conversion", back_populates="conversion_rooms")
class HotelInventory(Base):
"""Room and category definitions synchronized via AlpineBits."""
__tablename__ = "hotel_inventory"
id = Column(Integer, primary_key=True)
hotel_id = Column(
String(50), ForeignKey("hotels.hotel_id", ondelete="CASCADE"), nullable=False, index=True
)
inv_type_code = Column(String(8), nullable=False, index=True)
inv_code = Column(String(16), nullable=True, index=True)
room_name = Column(String(200), nullable=True)
max_occupancy = Column(Integer, nullable=True)
source = Column(String(20), nullable=False)
first_seen = Column(DateTime(timezone=True), nullable=False)
last_updated = Column(DateTime(timezone=True), nullable=False)
hotel = relationship("Hotel", back_populates="inventory_items")
availability = relationship(
"RoomAvailability",
back_populates="inventory_item",
cascade="all, delete-orphan",
passive_deletes=True,
)
__table_args__ = (
Index(
"uq_hotel_inventory_unique_key",
"hotel_id",
"inv_type_code",
func.coalesce(inv_code, ""),
unique=True,
),
)
class RoomAvailability(Base):
"""Daily availability counts for inventory items."""
__tablename__ = "room_availability"
id = Column(Integer, primary_key=True)
inventory_id = Column(
Integer, ForeignKey("hotel_inventory.id", ondelete="CASCADE"), nullable=False, index=True
)
date = Column(Date, nullable=False, index=True)
count_type_2 = Column(Integer, nullable=True)
count_type_6 = Column(Integer, nullable=True)
count_type_9 = Column(Integer, nullable=True)
is_closing_season = Column(Boolean, nullable=False, default=False)
last_updated = Column(DateTime(timezone=True), nullable=False)
update_type = Column(String(20), nullable=False)
inventory_item = relationship("HotelInventory", back_populates="availability")
__table_args__ = (
UniqueConstraint("inventory_id", "date", name="uq_room_availability_unique_key"),
)
class Hotel(Base): class Hotel(Base):
"""Hotel configuration (migrated from alpine_bits_auth in config.yaml).""" """Hotel configuration (migrated from alpine_bits_auth in config.yaml)."""
@@ -710,6 +772,9 @@ class Hotel(Base):
# Relationships # Relationships
webhook_endpoints = relationship("WebhookEndpoint", back_populates="hotel") webhook_endpoints = relationship("WebhookEndpoint", back_populates="hotel")
inventory_items = relationship(
"HotelInventory", back_populates="hotel", cascade="all, delete-orphan"
)
class WebhookEndpoint(Base): class WebhookEndpoint(Base):

View File

@@ -249,10 +249,14 @@ async def reprocess_stuck_webhooks(
These are webhooks that were not fully processed in the previous run, These are webhooks that were not fully processed in the previous run,
likely due to a crash or unexpected shutdown. likely due to a crash or unexpected shutdown.
This function is designed to NEVER block application startup.
All errors are caught and logged, but the app will start regardless.
Args: Args:
sessionmaker: SQLAlchemy async sessionmaker sessionmaker: SQLAlchemy async sessionmaker
config: Application configuration dictionary config: Application configuration dictionary
""" """
try:
_LOGGER.info("Checking for stuck webhooks to reprocess...") _LOGGER.info("Checking for stuck webhooks to reprocess...")
async with sessionmaker() as session: async with sessionmaker() as session:
@@ -324,18 +328,41 @@ async def reprocess_stuck_webhooks(
) )
# Reprocess webhook with simplified interface # Reprocess webhook with simplified interface
await processor.process( result = await processor.process(
webhook_request=webhook_request, webhook_request=webhook_request,
db_session=session, db_session=session,
config=config, config=config,
) )
# Check result status
result_status = result.get("status") if isinstance(result, dict) else "success"
if result_status == "duplicate":
# Duplicate is not an error - mark as completed and continue
webhook_request.status = WebhookStatus.COMPLETED
webhook_request.processing_completed_at = datetime.now(UTC)
reprocessed_count += 1
_LOGGER.info(
"Webhook %d was a duplicate (already processed), marked as completed",
webhook_id
)
elif result_status in ("success", "completed"):
# Update status to completed # Update status to completed
webhook_request.status = WebhookStatus.COMPLETED webhook_request.status = WebhookStatus.COMPLETED
webhook_request.processing_completed_at = datetime.now(UTC) webhook_request.processing_completed_at = datetime.now(UTC)
reprocessed_count += 1 reprocessed_count += 1
_LOGGER.info("Successfully reprocessed webhook %d", webhook_id) _LOGGER.info("Successfully reprocessed webhook %d", webhook_id)
else:
# Unexpected status - treat as failure
_LOGGER.warning(
"Webhook %d returned unexpected status: %s",
webhook_id,
result_status
)
webhook_request.status = WebhookStatus.FAILED
webhook_request.last_error = f"Unexpected status: {result_status}"
webhook_request.processing_completed_at = datetime.now(UTC)
failed_count += 1
except Exception as e: except Exception as e:
_LOGGER.exception("Failed to reprocess webhook %d: %s", webhook_id, e) _LOGGER.exception("Failed to reprocess webhook %d: %s", webhook_id, e)
@@ -354,6 +381,12 @@ async def reprocess_stuck_webhooks(
reprocessed_count, reprocessed_count,
failed_count, failed_count,
) )
except Exception as e:
# CRITICAL: Never let reprocessing block application startup
_LOGGER.exception(
"CRITICAL ERROR during webhook reprocessing, but allowing app to start: %s",
e
)
async def run_startup_tasks( async def run_startup_tasks(

View File

@@ -0,0 +1,600 @@
"""Action handler for OTA_HotelInvCountNotif:FreeRooms."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import UTC, date, datetime, timedelta
from typing import Any
from sqlalchemy import delete, select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
from sqlalchemy.ext.asyncio import AsyncSession
from xsdata.formats.dataclass.serializers.config import SerializerConfig
from xsdata_pydantic.bindings import XmlParser, XmlSerializer
from .alpinebits_server import (
AlpineBitsAction,
AlpineBitsActionName,
AlpineBitsClientInfo,
AlpineBitsResponse,
Version,
validate_hotel_authentication,
)
from .const import HttpStatusCode
from .db import Hotel, HotelInventory, RoomAvailability
from .generated import (
ErrorType,
InvCountCountType,
OtaHotelInvCountNotifRq,
OtaHotelInvCountNotifRs,
UniqueIdInstance,
)
from .logging_config import get_logger
_LOGGER = get_logger(__name__)
SUPPORTED_CAPABILITIES = [
"OTA_HotelInvCountNotif_accept_rooms",
"OTA_HotelInvCountNotif_accept_categories",
"OTA_HotelInvCountNotif_accept_deltas",
"OTA_HotelInvCountNotif_accept_complete_set",
"OTA_HotelInvCountNotif_accept_out_of_order",
"OTA_HotelInvCountNotif_accept_out_of_market",
"OTA_HotelInvCountNotif_accept_closing_seasons",
]
CLOSING_SEASON_TYPE = "__CLOSE" # <= 8 chars per spec
SOURCE_FREEROOMS = "FreeRooms"
COUNT_TYPE_MAP = {
InvCountCountType.VALUE_2: "count_type_2",
InvCountCountType.VALUE_6: "count_type_6",
InvCountCountType.VALUE_9: "count_type_9",
}
@dataclass
class FreeRoomsProcessingError(Exception):
"""Custom exception that carries HTTP and OTA error metadata."""
message: str
status_code: HttpStatusCode = HttpStatusCode.BAD_REQUEST
error_type: ErrorType = ErrorType.VALUE_13
code: str = "450"
def __str__(self) -> str:
return self.message
class FreeRoomsAction(AlpineBitsAction):
"""Handler for OTA_HotelInvCountNotif:FreeRooms requests."""
def __init__(self, config: dict | None = None):
self.name = AlpineBitsActionName.OTA_HOTEL_INV_COUNT_NOTIF_FREE_ROOMS
self.version = [Version.V2024_10, Version.V2022_10]
self.config = config or {}
self.supports = SUPPORTED_CAPABILITIES
self._parser = XmlParser()
self._serializer = XmlSerializer(
config=SerializerConfig(
pretty_print=True,
xml_declaration=True,
encoding="UTF-8",
)
)
async def handle(
self,
action: str,
request_xml: str,
version: Version,
client_info: AlpineBitsClientInfo,
dbsession: AsyncSession | None = None,
server_capabilities=None,
) -> AlpineBitsResponse:
"""Process FreeRooms inventory updates."""
try:
self._validate_action_name(action)
if request_xml is None:
raise FreeRoomsProcessingError("Missing request payload")
if dbsession is None:
raise FreeRoomsProcessingError(
"Database session unavailable",
HttpStatusCode.INTERNAL_SERVER_ERROR,
)
try:
request = self._parser.from_string(request_xml, OtaHotelInvCountNotifRq)
except Exception as exc: # pragma: no cover - serialization already tested upstream
_LOGGER.exception("Failed to parse FreeRooms request: %s", exc)
raise FreeRoomsProcessingError("Invalid XML payload") from exc
hotel_code = request.inventories.hotel_code if request.inventories else None
if not hotel_code:
raise FreeRoomsProcessingError("HotelCode attribute is required")
if not client_info or not client_info.username or not client_info.password:
raise FreeRoomsProcessingError(
"Missing authentication context",
HttpStatusCode.UNAUTHORIZED,
error_type=ErrorType.VALUE_11,
code="401",
)
if not validate_hotel_authentication(
client_info.username,
client_info.password,
hotel_code,
self.config,
):
raise FreeRoomsProcessingError(
f"Unauthorized FreeRooms notification for hotel {hotel_code}",
HttpStatusCode.UNAUTHORIZED,
error_type=ErrorType.VALUE_11,
code="401",
)
hotel = await self._fetch_hotel(dbsession, hotel_code)
if hotel is None:
raise FreeRoomsProcessingError(
f"Hotel {hotel_code} is not provisioned on this server"
)
is_complete_set = (
request.unique_id is not None
and request.unique_id.instance == UniqueIdInstance.COMPLETE_SET
)
update_type = "CompleteSet" if is_complete_set else "Delta"
inventory_cache: dict[tuple[str, str | None], HotelInventory] = {}
try:
if is_complete_set:
await self._process_complete_set(
dbsession, hotel, request, update_type, inventory_cache
)
else:
await self._process_delta(
dbsession, hotel, request, update_type, inventory_cache
)
await dbsession.commit()
except FreeRoomsProcessingError:
await dbsession.rollback()
raise
except Exception as exc: # pragma: no cover - defensive
await dbsession.rollback()
_LOGGER.exception("Unexpected FreeRooms failure: %s", exc)
return self._error_response(
"Internal server error while processing FreeRooms notification",
HttpStatusCode.INTERNAL_SERVER_ERROR,
)
_LOGGER.info(
"Processed FreeRooms %s update for hotel %s (%d inventory items)",
update_type,
hotel_code,
len(request.inventories.inventory),
)
return self._success_response()
except FreeRoomsProcessingError as exc:
return self._error_response(
exc.message,
exc.status_code,
error_type=exc.error_type,
code=exc.code,
)
def _validate_action_name(self, action: str) -> None:
expected = self.name.value[1]
if (action or "").strip() != expected:
raise FreeRoomsProcessingError(
f"Invalid action {action}, expected {expected}",
HttpStatusCode.BAD_REQUEST,
)
async def _fetch_hotel(self, session: AsyncSession, hotel_code: str) -> Hotel | None:
stmt = select(Hotel).where(Hotel.hotel_id == hotel_code, Hotel.is_active.is_(True))
result = await session.execute(stmt)
return result.scalar_one_or_none()
async def _process_complete_set(
self,
session: AsyncSession,
hotel: Hotel,
request: OtaHotelInvCountNotifRq,
update_type: str,
inventory_cache: dict[tuple[str, str | None], HotelInventory],
) -> None:
await self._delete_existing_availability(session, hotel.hotel_id)
await self._process_inventories(
session, hotel, request, update_type, inventory_cache, enforce_closing_order=True
)
async def _process_delta(
self,
session: AsyncSession,
hotel: Hotel,
request: OtaHotelInvCountNotifRq,
update_type: str,
inventory_cache: dict[tuple[str, str | None], HotelInventory],
) -> None:
await self._process_inventories(
session, hotel, request, update_type, inventory_cache, enforce_closing_order=False
)
async def _delete_existing_availability(
self,
session: AsyncSession,
hotel_id: str,
) -> None:
subquery = select(HotelInventory.id).where(HotelInventory.hotel_id == hotel_id)
await session.execute(
delete(RoomAvailability).where(RoomAvailability.inventory_id.in_(subquery))
)
async def _process_inventories(
self,
session: AsyncSession,
hotel: Hotel,
request: OtaHotelInvCountNotifRq,
update_type: str,
inventory_cache: dict[tuple[str, str | None], HotelInventory],
enforce_closing_order: bool,
) -> None:
inventories = request.inventories.inventory if request.inventories else []
if not inventories:
raise FreeRoomsProcessingError(
"Request must include at least one Inventory block",
HttpStatusCode.BAD_REQUEST,
)
rows_to_upsert: list[dict[str, Any]] = []
now = datetime.now(UTC)
encountered_standard = False
for inventory in inventories:
sac = inventory.status_application_control
if sac is None:
raise FreeRoomsProcessingError(
"StatusApplicationControl element is required for each Inventory",
HttpStatusCode.BAD_REQUEST,
)
is_closing = self._is_closing_season(sac)
if is_closing:
if inventory.inv_counts is not None:
raise FreeRoomsProcessingError(
"Closing seasons cannot contain InvCounts data",
HttpStatusCode.BAD_REQUEST,
)
if update_type != "CompleteSet":
raise FreeRoomsProcessingError(
"Closing seasons are only allowed on CompleteSet updates",
HttpStatusCode.BAD_REQUEST,
)
if enforce_closing_order and encountered_standard:
raise FreeRoomsProcessingError(
"Closing seasons must appear before other inventory entries",
HttpStatusCode.BAD_REQUEST,
)
rows_to_upsert.extend(
await self._process_closing_season(
session, hotel, sac, update_type, now, inventory_cache
)
)
continue
encountered_standard = True
rows_to_upsert.extend(
await self._process_inventory_item(
session,
hotel,
sac,
inventory.inv_counts,
update_type,
now,
inventory_cache,
)
)
await self._upsert_availability_rows(session, rows_to_upsert)
async def _process_closing_season(
self,
session: AsyncSession,
hotel: Hotel,
sac: OtaHotelInvCountNotifRq.Inventories.Inventory.StatusApplicationControl,
update_type: str,
timestamp: datetime,
inventory_cache: dict[tuple[str, str | None], HotelInventory],
) -> list[dict[str, Any]]:
if sac.inv_type_code or sac.inv_code:
raise FreeRoomsProcessingError(
"Closing season entries cannot specify InvTypeCode or InvCode",
HttpStatusCode.BAD_REQUEST,
)
start_date, end_date = self._parse_date_range(sac.start, sac.end)
inventory_item = await self._ensure_inventory_item(
session,
hotel.hotel_id,
CLOSING_SEASON_TYPE,
None,
timestamp,
inventory_cache,
)
base_payload = {
"inventory_id": inventory_item.id,
"count_type_2": None,
"count_type_6": None,
"count_type_9": None,
"is_closing_season": True,
"last_updated": timestamp,
"update_type": update_type,
}
rows = []
for day in self._iter_days(start_date, end_date):
payload = dict(base_payload)
payload["date"] = day
rows.append(payload)
return rows
async def _process_inventory_item(
self,
session: AsyncSession,
hotel: Hotel,
sac: OtaHotelInvCountNotifRq.Inventories.Inventory.StatusApplicationControl,
inv_counts: (
OtaHotelInvCountNotifRq.Inventories.Inventory.InvCounts | None
),
update_type: str,
timestamp: datetime,
inventory_cache: dict[tuple[str, str | None], HotelInventory],
) -> list[dict[str, Any]]:
inv_type_code = (sac.inv_type_code or "").strip()
if not inv_type_code:
raise FreeRoomsProcessingError(
"InvTypeCode is required unless AllInvCode=\"true\"",
HttpStatusCode.BAD_REQUEST,
)
inv_code = sac.inv_code.strip() if sac.inv_code else None
start_date, end_date = self._parse_date_range(sac.start, sac.end)
counts = self._extract_counts(inv_counts)
base_counts = {
"count_type_2": counts.get("count_type_2"),
"count_type_6": counts.get("count_type_6"),
"count_type_9": counts.get("count_type_9"),
}
inventory_item = await self._ensure_inventory_item(
session,
hotel.hotel_id,
inv_type_code,
inv_code,
timestamp,
inventory_cache,
)
base_payload = {
"inventory_id": inventory_item.id,
"is_closing_season": False,
"last_updated": timestamp,
"update_type": update_type,
**base_counts,
}
rows = []
for day in self._iter_days(start_date, end_date):
payload = dict(base_payload)
payload["date"] = day
rows.append(payload)
return rows
def _parse_date_range(self, start_str: str, end_str: str) -> tuple[date, date]:
try:
start_date = date.fromisoformat(start_str)
end_date = date.fromisoformat(end_str)
except ValueError as exc:
raise FreeRoomsProcessingError(
f"Invalid date format: {exc!s}",
HttpStatusCode.BAD_REQUEST,
) from exc
if end_date < start_date:
raise FreeRoomsProcessingError(
"StatusApplicationControl End date cannot be before Start date",
HttpStatusCode.BAD_REQUEST,
)
return start_date, end_date
def _iter_days(self, start_date: date, end_date: date):
current = start_date
while current <= end_date:
yield current
current += timedelta(days=1)
def _is_closing_season(
self,
sac: OtaHotelInvCountNotifRq.Inventories.Inventory.StatusApplicationControl,
) -> bool:
return (sac.all_inv_code or "").strip().lower() == "true"
def _extract_counts(
self,
inv_counts: OtaHotelInvCountNotifRq.Inventories.Inventory.InvCounts | None,
) -> dict[str, int | None]:
if inv_counts is None or not inv_counts.inv_count:
return {}
parsed: dict[str, int] = {}
for count in inv_counts.inv_count:
column_name = COUNT_TYPE_MAP.get(count.count_type)
if column_name is None:
raise FreeRoomsProcessingError(
f"Unsupported CountType {count.count_type}",
HttpStatusCode.BAD_REQUEST,
)
if column_name in parsed:
raise FreeRoomsProcessingError(
f"Duplicate CountType {count.count_type.value} detected",
HttpStatusCode.BAD_REQUEST,
)
try:
value = int(count.count)
except ValueError as exc:
raise FreeRoomsProcessingError(
f"Invalid Count value '{count.count}'",
HttpStatusCode.BAD_REQUEST,
) from exc
if value < 0:
raise FreeRoomsProcessingError(
"Count values must be non-negative",
HttpStatusCode.BAD_REQUEST,
)
parsed[column_name] = value
return parsed
async def _ensure_inventory_item(
self,
session: AsyncSession,
hotel_id: str,
inv_type_code: str,
inv_code: str | None,
timestamp: datetime,
cache: dict[tuple[str, str | None], HotelInventory],
) -> HotelInventory:
cache_key = (inv_type_code, inv_code)
if cache_key in cache:
return cache[cache_key]
filters = [
HotelInventory.hotel_id == hotel_id,
HotelInventory.inv_type_code == inv_type_code,
]
if inv_code is None:
filters.append(HotelInventory.inv_code.is_(None))
else:
filters.append(HotelInventory.inv_code == inv_code)
stmt = select(HotelInventory).where(*filters)
result = await session.execute(stmt)
inventory_item = result.scalar_one_or_none()
if inventory_item:
inventory_item.last_updated = timestamp
else:
inventory_item = HotelInventory(
hotel_id=hotel_id,
inv_type_code=inv_type_code,
inv_code=inv_code,
source=SOURCE_FREEROOMS,
first_seen=timestamp,
last_updated=timestamp,
)
session.add(inventory_item)
await session.flush()
cache[cache_key] = inventory_item
return inventory_item
async def _upsert_availability_rows(
self,
session: AsyncSession,
rows: list[dict[str, Any]],
) -> None:
if not rows:
return
bind = session.get_bind()
dialect_name = bind.dialect.name if bind else ""
table = RoomAvailability.__table__
if dialect_name == "postgresql":
stmt = pg_insert(table).values(rows)
stmt = stmt.on_conflict_do_update(
index_elements=["inventory_id", "date"],
set_=self._build_upsert_set(stmt),
)
await session.execute(stmt)
return
if dialect_name == "sqlite":
stmt = sqlite_insert(table).values(rows)
stmt = stmt.on_conflict_do_update(
index_elements=["inventory_id", "date"],
set_=self._build_upsert_set(stmt),
)
await session.execute(stmt)
return
await self._upsert_with_fallback(session, rows)
def _build_upsert_set(self, stmt):
return {
"count_type_2": stmt.excluded.count_type_2,
"count_type_6": stmt.excluded.count_type_6,
"count_type_9": stmt.excluded.count_type_9,
"is_closing_season": stmt.excluded.is_closing_season,
"last_updated": stmt.excluded.last_updated,
"update_type": stmt.excluded.update_type,
}
async def _upsert_with_fallback(
self, session: AsyncSession, rows: list[dict[str, Any]]
) -> None:
for row in rows:
stmt = select(RoomAvailability).where(
RoomAvailability.inventory_id == row["inventory_id"],
RoomAvailability.date == row["date"],
)
result = await session.execute(stmt)
existing = result.scalar_one_or_none()
if existing:
existing.count_type_2 = row["count_type_2"]
existing.count_type_6 = row["count_type_6"]
existing.count_type_9 = row["count_type_9"]
existing.is_closing_season = row["is_closing_season"]
existing.last_updated = row["last_updated"]
existing.update_type = row["update_type"]
else:
session.add(RoomAvailability(**row))
def _success_response(self) -> AlpineBitsResponse:
response = OtaHotelInvCountNotifRs(version="7.000", success="")
xml = self._serializer.render(
response, ns_map={None: "http://www.opentravel.org/OTA/2003/05"}
)
return AlpineBitsResponse(xml, HttpStatusCode.OK)
def _error_response(
self,
message: str,
status_code: HttpStatusCode,
error_type: ErrorType = ErrorType.VALUE_13,
code: str = "450",
) -> AlpineBitsResponse:
error = OtaHotelInvCountNotifRs.Errors.Error(
type_value=error_type,
code=code,
content=[message],
)
errors = OtaHotelInvCountNotifRs.Errors(error=[error])
response = OtaHotelInvCountNotifRs(version="7.000", errors=errors)
xml = self._serializer.render(
response, ns_map={None: "http://www.opentravel.org/OTA/2003/05"}
)
return AlpineBitsResponse(xml, status_code)

View File

@@ -273,6 +273,24 @@ async def process_wix_form_submission(
reservation, db_customer.id reservation, db_customer.id
) )
except IntegrityError as e: except IntegrityError as e:
await db_session.rollback()
# Check if this is a duplicate (unique constraint violation)
error_msg = str(e.orig) if hasattr(e, 'orig') else str(e)
is_duplicate = any(keyword in error_msg.lower() for keyword in ['unique', 'duplicate', 'already exists'])
if is_duplicate:
_LOGGER.info(
"Duplicate reservation detected for unique_id=%s, skipping (this is expected for reprocessing)",
unique_id
)
return {
"status": "duplicate",
"message": "Reservation already exists (duplicate submission)",
"unique_id": unique_id,
"timestamp": timestamp,
}
else:
# Real integrity error (not a duplicate)
_LOGGER.exception("Database integrity error creating reservation: %s", e) _LOGGER.exception("Database integrity error creating reservation: %s", e)
raise HTTPException( raise HTTPException(
status_code=500, detail="Database error creating reservation" status_code=500, detail="Database error creating reservation"
@@ -403,7 +421,7 @@ async def process_generic_webhook_submission(
hotel_data = data.get("hotel_data", {}) hotel_data = data.get("hotel_data", {})
form_data = data.get("form_data", {}) form_data = data.get("form_data", {})
tracking_data = data.get("tracking_data", {}) tracking_data = data.get("tracking_data", {})
offer_data = data.get("unterkunftTyp", {}) offer_data = form_data.get("unterkunftTyp", {})
selected_offers = [] selected_offers = []
@@ -581,9 +599,33 @@ async def process_generic_webhook_submission(
# Use ReservationService to create reservation # Use ReservationService to create reservation
reservation_service = ReservationService(db_session) reservation_service = ReservationService(db_session)
try:
db_reservation = await reservation_service.create_reservation( db_reservation = await reservation_service.create_reservation(
reservation, db_customer.id reservation, db_customer.id
) )
except IntegrityError as e:
await db_session.rollback()
# Check if this is a duplicate (unique constraint violation)
error_msg = str(e.orig) if hasattr(e, 'orig') else str(e)
is_duplicate = any(keyword in error_msg.lower() for keyword in ['unique', 'duplicate', 'already exists'])
if is_duplicate:
_LOGGER.info(
"Duplicate reservation detected for unique_id=%s, skipping (this is expected for reprocessing)",
unique_id
)
return {
"status": "duplicate",
"message": "Reservation already exists (duplicate submission)",
"unique_id": unique_id,
"timestamp": timestamp,
}
else:
# Real integrity error (not a duplicate)
_LOGGER.exception("Database integrity error creating reservation: %s", e)
raise HTTPException(
status_code=500, detail="Database error creating reservation"
) from e
async def push_event(): async def push_event():
# Fire event for listeners (push, etc.) - hotel-specific dispatch # Fire event for listeners (push, etc.) - hotel-specific dispatch

215
tests/test_api_freerooms.py Normal file
View File

@@ -0,0 +1,215 @@
"""Integration tests for the FreeRooms endpoint."""
from __future__ import annotations
import asyncio
import gzip
import urllib.parse
from datetime import UTC, datetime
from unittest.mock import patch
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import select
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from alpine_bits_python.alpinebits_server import AlpineBitsServer
from alpine_bits_python.api import app
from alpine_bits_python.const import HttpStatusCode
from alpine_bits_python.db import Base, Hotel, RoomAvailability
def build_request_xml(body: str, include_unique_id: bool = True) -> str:
unique = (
'<UniqueID Type="16" ID="1" Instance="CompleteSet"/>'
if include_unique_id
else ""
)
return f"""<?xml version="1.0" encoding="UTF-8"?>
<OTA_HotelInvCountNotifRQ xmlns="http://www.opentravel.org/OTA/2003/05" Version="7.000">
{unique}
<Inventories HotelCode="HOTEL123" HotelName="Integration Hotel">
{body}
</Inventories>
</OTA_HotelInvCountNotifRQ>"""
INVENTORY_A = """
<Inventory>
<StatusApplicationControl Start="2025-10-01" End="2025-10-03" InvTypeCode="DBL"/>
<InvCounts>
<InvCount CountType="2" Count="3"/>
</InvCounts>
</Inventory>
"""
INVENTORY_B = """
<Inventory>
<StatusApplicationControl Start="2025-10-02" End="2025-10-02" InvTypeCode="DBL"/>
<InvCounts>
<InvCount CountType="2" Count="1"/>
</InvCounts>
</Inventory>
"""
@pytest.fixture
def freerooms_test_config():
return {
"server": {
"codecontext": "ADVERTISING",
"code": "70597314",
"companyname": "99tales Gmbh",
"res_id_source_context": "99tales",
},
"alpine_bits_auth": [
{
"hotel_id": "HOTEL123",
"hotel_name": "Integration Hotel",
"username": "testuser",
"password": "testpass",
}
],
"database": {"url": "sqlite+aiosqlite:///:memory:"},
}
@pytest.fixture
def freerooms_client(freerooms_test_config):
engine = create_async_engine("sqlite+aiosqlite:///:memory:", echo=False)
async def create_tables():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
asyncio.run(create_tables())
with patch("alpine_bits_python.api.load_config", return_value=freerooms_test_config), patch(
"alpine_bits_python.api.create_database_engine", return_value=engine
):
app.state.engine = engine
app.state.async_sessionmaker = async_sessionmaker(engine, expire_on_commit=False)
app.state.config = freerooms_test_config
app.state.alpine_bits_server = AlpineBitsServer(freerooms_test_config)
with TestClient(app) as test_client:
yield test_client
@pytest.fixture
def freerooms_headers():
return {
"Authorization": "Basic dGVzdHVzZXI6dGVzdHBhc3M=",
"X-AlpineBits-ClientProtocolVersion": "2024-10",
}
def seed_hotel_if_missing(client: TestClient):
async def _seed():
async_sessionmaker = client.app.state.async_sessionmaker
async with async_sessionmaker() as session:
result = await session.execute(
select(Hotel).where(Hotel.hotel_id == "HOTEL123")
)
if result.scalar_one_or_none():
return
session.add(
Hotel(
hotel_id="HOTEL123",
hotel_name="Integration Hotel",
username="testuser",
password_hash="integration-hash",
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
is_active=True,
)
)
await session.commit()
asyncio.run(_seed())
def fetch_availability(client: TestClient):
async def _fetch():
async_sessionmaker = client.app.state.async_sessionmaker
async with async_sessionmaker() as session:
result = await session.execute(
select(RoomAvailability).order_by(RoomAvailability.date)
)
return result.scalars().all()
return asyncio.run(_fetch())
def test_freerooms_endpoint_complete_set(freerooms_client: TestClient, freerooms_headers):
seed_hotel_if_missing(freerooms_client)
xml = build_request_xml(INVENTORY_A, include_unique_id=True)
response = freerooms_client.post(
"/api/alpinebits/server-2024-10",
data={"action": "OTA_HotelInvCountNotif:FreeRooms", "request": xml},
headers=freerooms_headers,
)
assert response.status_code == HttpStatusCode.OK
assert "<Success" in response.text
rows = fetch_availability(freerooms_client)
assert len(rows) == 3
assert rows[0].count_type_2 == 3
def test_freerooms_endpoint_delta_updates_existing_rows(
freerooms_client: TestClient, freerooms_headers
):
seed_hotel_if_missing(freerooms_client)
complete_xml = build_request_xml(INVENTORY_A, include_unique_id=True)
delta_xml = build_request_xml(INVENTORY_B, include_unique_id=False)
response = freerooms_client.post(
"/api/alpinebits/server-2024-10",
data={"action": "OTA_HotelInvCountNotif:FreeRooms", "request": complete_xml},
headers=freerooms_headers,
)
assert response.status_code == HttpStatusCode.OK
response = freerooms_client.post(
"/api/alpinebits/server-2024-10",
data={"action": "OTA_HotelInvCountNotif:FreeRooms", "request": delta_xml},
headers=freerooms_headers,
)
assert response.status_code == HttpStatusCode.OK
rows = fetch_availability(freerooms_client)
counts = {row.date.isoformat(): row.count_type_2 for row in rows}
assert counts["2025-10-02"] == 1
assert counts["2025-10-01"] == 3
def test_freerooms_endpoint_accepts_gzip_payload(
freerooms_client: TestClient, freerooms_headers
):
seed_hotel_if_missing(freerooms_client)
xml = build_request_xml(INVENTORY_A, include_unique_id=True)
encoded = urllib.parse.urlencode(
{"action": "OTA_HotelInvCountNotif:FreeRooms", "request": xml}
).encode("utf-8")
compressed = gzip.compress(encoded)
headers = {
**freerooms_headers,
"Content-Encoding": "gzip",
"Content-Type": "application/x-www-form-urlencoded",
}
response = freerooms_client.post(
"/api/alpinebits/server-2024-10",
data=compressed,
headers=headers,
)
assert response.status_code == HttpStatusCode.OK
assert "<Success" in response.text
rows = fetch_availability(freerooms_client)
assert len(rows) == 3

View File

@@ -0,0 +1,367 @@
"""Unit tests for FreeRoomsAction."""
from __future__ import annotations
from datetime import UTC, datetime
import pytest
import pytest_asyncio
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from alpine_bits_python.alpinebits_server import AlpineBitsClientInfo, Version
from alpine_bits_python.const import HttpStatusCode
from alpine_bits_python.db import Base, Hotel, HotelInventory, RoomAvailability
from alpine_bits_python.free_rooms_action import FreeRoomsAction
TEST_CONFIG = {
"alpine_bits_auth": [
{
"hotel_id": "TESTHOTEL",
"hotel_name": "Unit Test Hotel",
"username": "testuser",
"password": "testpass",
}
]
}
def build_complete_set_xml(body: str, hotel_code: str = "TESTHOTEL") -> str:
return f"""<?xml version="1.0" encoding="UTF-8"?>
<OTA_HotelInvCountNotifRQ xmlns="http://www.opentravel.org/OTA/2003/05" Version="7.000">
<UniqueID Type="16" ID="1" Instance="CompleteSet"/>
<Inventories HotelCode="{hotel_code}" HotelName="Unit Hotel">
{body}
</Inventories>
</OTA_HotelInvCountNotifRQ>"""
def build_delta_xml(body: str, hotel_code: str = "TESTHOTEL") -> str:
return f"""<?xml version="1.0" encoding="UTF-8"?>
<OTA_HotelInvCountNotifRQ xmlns="http://www.opentravel.org/OTA/2003/05" Version="7.000">
<Inventories HotelCode="{hotel_code}" HotelName="Unit Hotel">
{body}
</Inventories>
</OTA_HotelInvCountNotifRQ>"""
def daily_inventory(start: str, end: str, inv_type: str = "DBL", count: int = 3) -> str:
return f"""
<Inventory>
<StatusApplicationControl Start="{start}" End="{end}" InvTypeCode="{inv_type}"/>
<InvCounts>
<InvCount CountType="2" Count="{count}"/>
</InvCounts>
</Inventory>
"""
@pytest_asyncio.fixture
async def db_engine():
engine = create_async_engine("sqlite+aiosqlite:///:memory:", echo=False)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
await engine.dispose()
@pytest_asyncio.fixture
async def db_session(db_engine):
session_factory = async_sessionmaker(db_engine, expire_on_commit=False, class_=AsyncSession)
async with session_factory() as session:
yield session
async def insert_test_hotel(session: AsyncSession, hotel_id: str = "TESTHOTEL"):
hotel = Hotel(
hotel_id=hotel_id,
hotel_name="Unit Test Hotel",
username="testuser",
password_hash="bcrypt-hash",
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
is_active=True,
)
session.add(hotel)
await session.commit()
return hotel
def make_action() -> FreeRoomsAction:
return FreeRoomsAction(config=TEST_CONFIG)
def make_client_info() -> AlpineBitsClientInfo:
return AlpineBitsClientInfo(username="testuser", password="testpass")
@pytest.mark.asyncio
async def test_complete_set_creates_inventory_and_availability(db_session: AsyncSession):
await insert_test_hotel(db_session)
action = make_action()
xml = build_complete_set_xml(
daily_inventory("2025-01-01", "2025-01-03", inv_type="DBL", count=4)
)
response = await action.handle(
action="OTA_HotelInvCountNotif:FreeRooms",
request_xml=xml,
version=Version.V2024_10,
client_info=make_client_info(),
dbsession=db_session,
)
assert response.status_code == HttpStatusCode.OK
inventories = (await db_session.execute(select(HotelInventory))).scalars().all()
assert len(inventories) == 1
assert inventories[0].inv_type_code == "DBL"
rows = (
await db_session.execute(
select(RoomAvailability).order_by(RoomAvailability.date)
)
).scalars().all()
assert len(rows) == 3
assert rows[0].count_type_2 == 4
assert rows[0].update_type == "CompleteSet"
@pytest.mark.asyncio
async def test_complete_set_replaces_previous_availability(db_session: AsyncSession):
await insert_test_hotel(db_session)
action = make_action()
xml_initial = build_complete_set_xml(daily_inventory("2025-02-01", "2025-02-02", count=5))
xml_updated = build_complete_set_xml(daily_inventory("2025-02-01", "2025-02-01", count=1))
await action.handle(
"OTA_HotelInvCountNotif:FreeRooms",
xml_initial,
Version.V2024_10,
make_client_info(),
db_session,
)
await action.handle(
"OTA_HotelInvCountNotif:FreeRooms",
xml_updated,
Version.V2024_10,
make_client_info(),
db_session,
)
rows = (
await db_session.execute(select(RoomAvailability).order_by(RoomAvailability.date))
).scalars().all()
assert len(rows) == 1
assert rows[0].date.isoformat() == "2025-02-01"
assert rows[0].count_type_2 == 1
@pytest.mark.asyncio
async def test_delta_updates_only_specified_dates(db_session: AsyncSession):
await insert_test_hotel(db_session)
action = make_action()
complete_xml = build_complete_set_xml(daily_inventory("2025-03-01", "2025-03-03", count=2))
delta_xml = build_delta_xml(daily_inventory("2025-03-02", "2025-03-02", count=7))
await action.handle(
"OTA_HotelInvCountNotif:FreeRooms",
complete_xml,
Version.V2024_10,
make_client_info(),
db_session,
)
await action.handle(
"OTA_HotelInvCountNotif:FreeRooms",
delta_xml,
Version.V2024_10,
make_client_info(),
db_session,
)
rows = (
await db_session.execute(select(RoomAvailability).order_by(RoomAvailability.date))
).scalars().all()
counts = {row.date.isoformat(): row.count_type_2 for row in rows}
assert counts == {
"2025-03-01": 2,
"2025-03-02": 7,
"2025-03-03": 2,
}
assert all(row.update_type in {"CompleteSet", "Delta"} for row in rows)
@pytest.mark.asyncio
async def test_closing_season_entries_marked_correctly(db_session: AsyncSession):
await insert_test_hotel(db_session)
action = make_action()
xml = build_complete_set_xml(
"""
<Inventory>
<StatusApplicationControl Start="2025-04-01" End="2025-04-02" AllInvCode="true"/>
</Inventory>
<Inventory>
<StatusApplicationControl Start="2025-04-03" End="2025-04-03" InvTypeCode="SGL"/>
</Inventory>
"""
)
response = await action.handle(
"OTA_HotelInvCountNotif:FreeRooms",
xml,
Version.V2024_10,
make_client_info(),
db_session,
)
assert response.status_code == HttpStatusCode.OK
inventories = (await db_session.execute(select(HotelInventory))).scalars().all()
closing_inventory = next(inv for inv in inventories if inv.inv_type_code == "__CLOSE")
assert closing_inventory.inv_code is None
rows = (
await db_session.execute(select(RoomAvailability).order_by(RoomAvailability.date))
).scalars().all()
closing_rows = [row for row in rows if row.is_closing_season]
assert len(closing_rows) == 2
assert all(row.count_type_2 is None for row in closing_rows)
@pytest.mark.asyncio
async def test_closing_season_not_allowed_in_delta(db_session: AsyncSession):
await insert_test_hotel(db_session)
action = make_action()
xml = build_delta_xml(
"""
<Inventory>
<StatusApplicationControl Start="2025-05-01" End="2025-05-02" AllInvCode="true"/>
</Inventory>
"""
)
response = await action.handle(
"OTA_HotelInvCountNotif:FreeRooms",
xml,
Version.V2024_10,
make_client_info(),
db_session,
)
assert response.status_code == HttpStatusCode.BAD_REQUEST
assert "Closing seasons" in response.xml_content
@pytest.mark.asyncio
async def test_missing_invtypecode_returns_error(db_session: AsyncSession):
await insert_test_hotel(db_session)
action = make_action()
xml = build_complete_set_xml(
"""
<Inventory>
<StatusApplicationControl Start="2025-06-01" End="2025-06-02"/>
</Inventory>
"""
)
response = await action.handle(
"OTA_HotelInvCountNotif:FreeRooms",
xml,
Version.V2024_10,
make_client_info(),
db_session,
)
assert response.status_code == HttpStatusCode.BAD_REQUEST
assert "InvTypeCode is required" in response.xml_content
@pytest.mark.asyncio
async def test_duplicate_count_type_rejected(db_session: AsyncSession):
await insert_test_hotel(db_session)
action = make_action()
xml = build_complete_set_xml(
"""
<Inventory>
<StatusApplicationControl Start="2025-07-01" End="2025-07-01" InvTypeCode="SGL"/>
<InvCounts>
<InvCount CountType="2" Count="3"/>
<InvCount CountType="2" Count="4"/>
</InvCounts>
</Inventory>
"""
)
response = await action.handle(
"OTA_HotelInvCountNotif:FreeRooms",
xml,
Version.V2024_10,
make_client_info(),
db_session,
)
assert response.status_code == HttpStatusCode.BAD_REQUEST
assert "Duplicate CountType" in response.xml_content
@pytest.mark.asyncio
async def test_invalid_date_range_returns_error(db_session: AsyncSession):
await insert_test_hotel(db_session)
action = make_action()
xml = build_complete_set_xml(
"""
<Inventory>
<StatusApplicationControl Start="2025-08-10" End="2025-08-01" InvTypeCode="DBL"/>
</Inventory>
"""
)
response = await action.handle(
"OTA_HotelInvCountNotif:FreeRooms",
xml,
Version.V2024_10,
make_client_info(),
db_session,
)
assert response.status_code == HttpStatusCode.BAD_REQUEST
assert "End date cannot be before Start date" in response.xml_content
@pytest.mark.asyncio
async def test_invalid_credentials_return_unauthorized(db_session: AsyncSession):
await insert_test_hotel(db_session)
action = make_action()
bad_client = AlpineBitsClientInfo(username="testuser", password="wrongpass")
xml = build_complete_set_xml(daily_inventory("2025-09-01", "2025-09-01"))
response = await action.handle(
"OTA_HotelInvCountNotif:FreeRooms",
xml,
Version.V2024_10,
bad_client,
db_session,
)
assert response.status_code == HttpStatusCode.UNAUTHORIZED
assert "Unauthorized" in response.xml_content
@pytest.mark.asyncio
async def test_invalid_xml_returns_error(db_session: AsyncSession):
await insert_test_hotel(db_session)
action = make_action()
client_info = make_client_info()
response = await action.handle(
"OTA_HotelInvCountNotif:FreeRooms",
"<invalid",
Version.V2024_10,
client_info,
db_session,
)
assert response.status_code == HttpStatusCode.BAD_REQUEST
assert "Invalid XML payload" in response.xml_content

View File

@@ -0,0 +1,337 @@
"""Tests for webhook duplicate handling and reprocessing.
This module tests:
- Duplicate detection during normal operation
- Duplicate handling during app startup reprocessing
- Stuck webhooks that are duplicates
"""
import asyncio
import uuid
from datetime import UTC, datetime
from pathlib import Path
from unittest.mock import patch
import pytest
import pytest_asyncio
from fastapi.testclient import TestClient
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from alpine_bits_python.api import app
from alpine_bits_python.const import WebhookStatus
from alpine_bits_python.db import Base, Reservation, WebhookRequest
from alpine_bits_python.db_setup import reprocess_stuck_webhooks
@pytest_asyncio.fixture
async def test_db_engine():
"""Create an in-memory SQLite database for testing."""
engine = create_async_engine(
"sqlite+aiosqlite:///:memory:",
echo=False,
)
# Create tables
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
# Cleanup
await engine.dispose()
@pytest.fixture
def test_config():
"""Test configuration."""
return {
"server": {
"codecontext": "ADVERTISING",
"code": "70597314",
"companyname": "99tales Gmbh",
"res_id_source_context": "99tales",
},
"alpine_bits_auth": [
{
"hotel_id": "HOTEL123",
"hotel_name": "Test Hotel",
"username": "testuser",
"password": "testpass",
}
],
"default_hotel_code": "HOTEL123",
"default_hotel_name": "Test Hotel",
"database": {"url": "sqlite+aiosqlite:///:memory:"},
}
@pytest.fixture
def sample_wix_form_data():
"""Sample Wix form submission data with FIXED submissionId for duplicate testing."""
return {
"data": {
"submissionId": "FIXED-DUPLICATE-TEST-ID", # Fixed ID to trigger duplicates
"submissionTime": "2025-10-07T05:48:41.855Z",
"contact": {
"name": {"first": "John", "last": "Doe"},
"email": "john.doe.duplicate.test@example.com",
"phones": [{"e164Phone": "+1234567890"}],
"locale": "en-US",
"contactId": "contact-duplicate-test",
},
"field:anrede": "Mr.",
"field:form_field_5a7b": True,
"field:date_picker_a7c8": "2024-12-25",
"field:date_picker_7e65": "2024-12-31",
"field:number_7cf5": "2",
"field:anzahl_kinder": "1",
"field:alter_kind_1": "8",
"field:angebot_auswaehlen": "Christmas Special",
"field:utm_source": "google",
"field:utm_medium": "cpc",
"field:utm_campaign": "winter2024",
"field:fbclid": "test_fbclid_123",
"field:long_answer_3524": "Late check-in please",
}
}
class TestWebhookDuplicateHandling:
"""Test duplicate webhook handling during normal operation."""
def test_duplicate_webhook_during_operation(self, test_config, sample_wix_form_data):
"""Test that sending the same webhook twice handles duplicates gracefully."""
# Create engine and tables
engine = create_async_engine(
"sqlite+aiosqlite:///:memory:",
echo=False,
)
async def create_tables():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
asyncio.run(create_tables())
# Mock config and database to use our test database
with patch("alpine_bits_python.api.load_config", return_value=test_config), \
patch("alpine_bits_python.api.create_database_engine", return_value=engine):
from alpine_bits_python.alpinebits_server import AlpineBitsServer
# Setup app state
app.state.engine = engine
app.state.async_sessionmaker = async_sessionmaker(
engine, expire_on_commit=False
)
app.state.config = test_config
app.state.alpine_bits_server = AlpineBitsServer(test_config)
with TestClient(app) as client:
# First submission - should succeed
response1 = client.post(
"/api/webhook/wix-form",
json=sample_wix_form_data
)
assert response1.status_code == 200
data1 = response1.json()
assert data1["status"] == "success"
# Second submission with same data - should detect duplicate at API level
response2 = client.post(
"/api/webhook/wix-form",
json=sample_wix_form_data
)
assert response2.status_code == 200
data2 = response2.json()
# API returns success for already-processed webhooks, but sets duplicate flag
assert data2["status"] == "success"
assert data2.get("duplicate") is True
assert "already processed" in data2["message"].lower()
# Cleanup
asyncio.run(engine.dispose())
class TestWebhookReprocessing:
"""Test webhook reprocessing on app restart."""
@pytest.mark.asyncio
async def test_reprocess_stuck_duplicate_webhook(self, test_db_engine, test_config):
"""Test that stuck webhooks that are duplicates are handled correctly on restart."""
AsyncSessionLocal = async_sessionmaker(test_db_engine, expire_on_commit=False)
# Step 1: Process a webhook normally to create a reservation
from alpine_bits_python.webhook_processor import process_wix_form_submission
test_data = {
"data": {
"submissionId": "STUCK-WEBHOOK-TEST-ID",
"submissionTime": "2025-10-07T05:48:41.855Z",
"contact": {
"name": {"first": "Jane", "last": "Smith"},
"email": "jane.smith@example.com",
"phones": [{"e164Phone": "+9876543210"}],
"locale": "en-US",
"contactId": "contact-stuck-test",
},
"field:date_picker_a7c8": "2024-12-25",
"field:date_picker_7e65": "2024-12-31",
"field:number_7cf5": "2",
"field:anzahl_kinder": "0",
}
}
async with AsyncSessionLocal() as session:
result = await process_wix_form_submission(
test_data, session, config=test_config
)
await session.commit()
assert result["status"] == "success"
# Step 2: Verify the reservation was created
async with AsyncSessionLocal() as session:
stmt = select(Reservation).where(
Reservation.unique_id == "STUCK-WEBHOOK-TEST-ID"
)
result = await session.execute(stmt)
reservation = result.scalar_one_or_none()
assert reservation is not None
assert reservation.unique_id == "STUCK-WEBHOOK-TEST-ID"
# Step 3: Manually create a webhook request stuck in "processing" status
# This simulates a webhook that was being processed when the app crashed
from alpine_bits_python.db import WebhookEndpoint, Hotel
async with AsyncSessionLocal() as session:
# Create hotel
hotel = Hotel(
hotel_id="HOTEL123",
hotel_name="Test Hotel",
username="testuser",
password_hash="dummy",
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
is_active=True,
)
session.add(hotel)
await session.flush()
# Create webhook endpoint
endpoint = WebhookEndpoint(
hotel_id="HOTEL123",
webhook_type="wix_form",
webhook_secret="test-secret-123",
is_enabled=True,
created_at=datetime.now(UTC),
)
session.add(endpoint)
await session.flush()
# Create stuck webhook request with the SAME payload
stuck_webhook = WebhookRequest(
webhook_endpoint_id=endpoint.id,
hotel_id="HOTEL123",
payload_json=test_data,
status=WebhookStatus.PROCESSING, # Stuck in processing!
created_at=datetime.now(UTC),
)
session.add(stuck_webhook)
await session.commit()
# Step 4: Run reprocessing (simulates app restart)
await reprocess_stuck_webhooks(AsyncSessionLocal, test_config)
# Step 5: Verify the stuck webhook was marked as completed (not failed)
async with AsyncSessionLocal() as session:
stmt = select(WebhookRequest).where(
WebhookRequest.status == WebhookStatus.COMPLETED
)
result = await session.execute(stmt)
completed_webhooks = result.scalars().all()
assert len(completed_webhooks) == 1
assert completed_webhooks[0].last_error is None
# Verify no failed webhooks
stmt = select(WebhookRequest).where(
WebhookRequest.status == WebhookStatus.FAILED
)
result = await session.execute(stmt)
failed_webhooks = result.scalars().all()
assert len(failed_webhooks) == 0
# Step 6: Verify only ONE reservation exists (no duplicate)
async with AsyncSessionLocal() as session:
stmt = select(Reservation)
result = await session.execute(stmt)
reservations = result.scalars().all()
assert len(reservations) == 1
class TestWebhookReprocessingNeverBlocksStartup:
"""Test that reprocessing never blocks app startup."""
@pytest.mark.asyncio
async def test_reprocessing_error_does_not_block_startup(
self, test_db_engine, test_config
):
"""Test that even if reprocessing fails, app startup continues."""
AsyncSessionLocal = async_sessionmaker(test_db_engine, expire_on_commit=False)
from alpine_bits_python.db import WebhookEndpoint, Hotel
# Create a stuck webhook with invalid data that will cause processing to fail
async with AsyncSessionLocal() as session:
# Create hotel
hotel = Hotel(
hotel_id="HOTEL123",
hotel_name="Test Hotel",
username="testuser",
password_hash="dummy",
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
is_active=True,
)
session.add(hotel)
await session.flush()
# Create webhook endpoint
endpoint = WebhookEndpoint(
hotel_id="HOTEL123",
webhook_type="wix_form",
webhook_secret="test-secret-123",
is_enabled=True,
created_at=datetime.now(UTC),
)
session.add(endpoint)
await session.flush()
# Create stuck webhook with INVALID data (missing required fields)
stuck_webhook = WebhookRequest(
webhook_endpoint_id=endpoint.id,
hotel_id="HOTEL123",
payload_json={"data": {"invalid": "data"}}, # Missing required fields
status=WebhookStatus.PROCESSING,
received_at=datetime.now(UTC),
)
session.add(stuck_webhook)
await session.commit()
# This should NOT raise an exception - it should log and continue
try:
await reprocess_stuck_webhooks(AsyncSessionLocal, test_config)
except Exception as e:
pytest.fail(
f"reprocess_stuck_webhooks should NEVER raise exceptions, but got: {e}"
)
# Verify the webhook was marked as failed
async with AsyncSessionLocal() as session:
stmt = select(WebhookRequest).where(
WebhookRequest.status == WebhookStatus.FAILED
)
result = await session.execute(stmt)
failed_webhooks = result.scalars().all()
assert len(failed_webhooks) == 1
assert failed_webhooks[0].last_error is not None