834 lines
29 KiB
Python
834 lines
29 KiB
Python
"""AlpineBits Server for handling hotel data exchange.
|
|
|
|
This module provides an asynchronous AlpineBits server that can handle various
|
|
OTA (OpenTravel Alliance) actions for hotel data exchange. Currently implements
|
|
handshaking functionality with configurable supported actions and capabilities.
|
|
"""
|
|
|
|
import inspect
|
|
import json
|
|
import re
|
|
from abc import ABC
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from enum import Enum, IntEnum
|
|
from typing import Any, Optional, override
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from sqlalchemy import select
|
|
from xsdata.formats.dataclass.serializers.config import SerializerConfig
|
|
from xsdata_pydantic.bindings import XmlParser, XmlSerializer
|
|
|
|
from alpine_bits_python.alpine_bits_helpers import (
|
|
create_res_notif_push_message,
|
|
create_res_retrieve_response,
|
|
)
|
|
from alpine_bits_python.logging_config import get_logger
|
|
|
|
from .db import AckedRequest, Customer, Reservation
|
|
from .generated.alpinebits import (
|
|
OtaNotifReportRq,
|
|
OtaNotifReportRs,
|
|
OtaPingRq,
|
|
OtaPingRs,
|
|
OtaReadRq,
|
|
WarningStatus,
|
|
)
|
|
|
|
# Configure logging
|
|
_LOGGER = get_logger(__name__)
|
|
|
|
|
|
class HttpStatusCode(IntEnum):
|
|
"""Allowed HTTP status codes for AlpineBits responses."""
|
|
|
|
OK = 200
|
|
BAD_REQUEST = 400
|
|
UNAUTHORIZED = 401
|
|
INTERNAL_SERVER_ERROR = 500
|
|
|
|
|
|
def dump_json_for_xml(json_content: Any) -> str:
|
|
"""Dump JSON content as a pretty-printed string for embedding in XML.
|
|
|
|
Adds newlines before and after the JSON block for better readability in XML.
|
|
"""
|
|
return json.dumps(json_content)
|
|
|
|
|
|
class AlpineBitsActionName(Enum):
|
|
"""Enum for AlpineBits action names with capability and request name mappings."""
|
|
|
|
# Format: (capability_name, actual_request_name)
|
|
OTA_PING = ("action_OTA_Ping", "OTA_Ping:Handshaking")
|
|
OTA_READ = ("action_OTA_Read", "OTA_Read:GuestRequests")
|
|
OTA_HOTEL_AVAIL_NOTIF = ("action_OTA_HotelAvailNotif", "OTA_HotelAvailNotif")
|
|
OTA_HOTEL_RES_NOTIF_GUEST_REQUESTS = ( ## Push Action for Guest Requests
|
|
"action_OTA_HotelResNotif_GuestRequests",
|
|
"OTA_HotelResNotif:GuestRequests",
|
|
)
|
|
OTA_HOTEL_NOTIF_REPORT = (
|
|
"action_OTA_Read", # if read is supported this is also supported
|
|
"OTA_NotifReport:GuestRequests",
|
|
)
|
|
OTA_HOTEL_DESCRIPTIVE_CONTENT_NOTIF_INVENTORY = (
|
|
"action_OTA_HotelDescriptiveContentNotif_Inventory",
|
|
"OTA_HotelDescriptiveContentNotif:Inventory",
|
|
)
|
|
OTA_HOTEL_DESCRIPTIVE_CONTENT_NOTIF_INFO = (
|
|
"action_OTA_HotelDescriptiveContentNotif_Info",
|
|
"OTA_HotelDescriptiveContentNotif:Info",
|
|
)
|
|
OTA_HOTEL_DESCRIPTIVE_INFO_INVENTORY = (
|
|
"action_OTA_HotelDescriptiveInfo_Inventory",
|
|
"OTA_HotelDescriptiveInfo:Inventory",
|
|
)
|
|
OTA_HOTEL_DESCRIPTIVE_INFO_INFO = (
|
|
"action_OTA_HotelDescriptiveInfo_Info",
|
|
"OTA_HotelDescriptiveInfo:Info",
|
|
)
|
|
OTA_HOTEL_RATE_PLAN_NOTIF_RATE_PLANS = (
|
|
"action_OTA_HotelRatePlanNotif_RatePlans",
|
|
"OTA_HotelRatePlanNotif:RatePlans",
|
|
)
|
|
OTA_HOTEL_RATE_PLAN_BASE_RATES = (
|
|
"action_OTA_HotelRatePlan_BaseRates",
|
|
"OTA_HotelRatePlan:BaseRates",
|
|
)
|
|
|
|
def __init__(self, capability_name: str, request_name: str):
|
|
self.capability_name = capability_name
|
|
self.request_name = request_name
|
|
|
|
@classmethod
|
|
def get_by_capability_name(
|
|
cls, capability_name: str
|
|
) -> Optional["AlpineBitsActionName"]:
|
|
"""Get action enum by capability name."""
|
|
for action in cls:
|
|
if action.capability_name == capability_name:
|
|
return action
|
|
return None
|
|
|
|
@classmethod
|
|
def get_by_request_name(cls, request_name: str) -> Optional["AlpineBitsActionName"]:
|
|
"""Get action enum by request name."""
|
|
for action in cls:
|
|
if action.request_name == request_name:
|
|
return action
|
|
return None
|
|
|
|
|
|
class Version(str, Enum):
|
|
"""Enum for AlpineBits versions."""
|
|
|
|
V2024_10 = "2024-10"
|
|
V2022_10 = "2022-10"
|
|
# Add other versions as needed
|
|
|
|
|
|
class AlpineBitsClientInfo:
|
|
"""Wrapper for username, password, client_id."""
|
|
|
|
def __init__(self, username: str, password: str, client_id: str | None = None):
|
|
self.username = username
|
|
self.password = password
|
|
self.client_id = client_id
|
|
|
|
|
|
@dataclass
|
|
class AlpineBitsResponse:
|
|
"""Response data structure for AlpineBits actions."""
|
|
|
|
xml_content: str
|
|
status_code: HttpStatusCode = HttpStatusCode.OK
|
|
|
|
def __post_init__(self):
|
|
"""Validate that status code is one of the allowed values."""
|
|
if self.status_code not in [200, 400, 401, 500]:
|
|
raise ValueError(
|
|
f"Invalid status code {self.status_code}. Must be 200, 400, 401, or 500"
|
|
)
|
|
|
|
|
|
# Abstract base class for AlpineBits Action
|
|
class AlpineBitsAction(ABC):
|
|
"""Abstract base class for handling AlpineBits actions."""
|
|
|
|
name: AlpineBitsActionName
|
|
version: (
|
|
Version | list[Version]
|
|
) # list of versions in case action supports multiple versions
|
|
|
|
async def handle(
|
|
self,
|
|
action: str,
|
|
request_xml: str,
|
|
version: Version,
|
|
client_info: AlpineBitsClientInfo,
|
|
dbsession=None,
|
|
server_capabilities=None,
|
|
) -> AlpineBitsResponse:
|
|
"""Handle the incoming request XML and return response XML.
|
|
|
|
Default implementation returns "not implemented" error.
|
|
Override this method in subclasses to provide actual functionality.
|
|
|
|
Args:
|
|
action: The action to perform (e.g., "OTA_PingRQ")
|
|
request_xml: The XML request body as string
|
|
version: The AlpineBits version
|
|
|
|
Returns:
|
|
AlpineBitsResponse with error or actual response
|
|
|
|
"""
|
|
return_string = f"Error: Action {action} not implemented"
|
|
return AlpineBitsResponse(return_string, HttpStatusCode.BAD_REQUEST)
|
|
|
|
async def check_version_supported(self, version: Version) -> bool:
|
|
"""Check if the action supports the given version.
|
|
|
|
Args:
|
|
version: The AlpineBits version to check
|
|
Returns:
|
|
True if supported, False otherwise
|
|
|
|
"""
|
|
if isinstance(self.version, list):
|
|
return version in self.version
|
|
return version == self.version
|
|
|
|
|
|
class ServerCapabilities:
|
|
"""Automatically discovers AlpineBitsAction implementations and generates capabilities."""
|
|
|
|
def __init__(self):
|
|
self.action_registry: dict[AlpineBitsActionName, type[AlpineBitsAction]] = {}
|
|
self._discover_actions()
|
|
self.capability_dict = None
|
|
|
|
def _discover_actions(self):
|
|
"""Discover all AlpineBitsAction implementations in the current module."""
|
|
current_module = inspect.getmodule(self)
|
|
|
|
for _name, obj in inspect.getmembers(current_module):
|
|
if (
|
|
inspect.isclass(obj)
|
|
and issubclass(obj, AlpineBitsAction)
|
|
and obj != AlpineBitsAction
|
|
):
|
|
# Check if this action is actually implemented (not just returning default)
|
|
if self._is_action_implemented(obj):
|
|
action_instance = obj()
|
|
if hasattr(action_instance, "name"):
|
|
# Use capability attribute as registry key
|
|
self.action_registry[action_instance.name] = obj
|
|
|
|
def _is_action_implemented(self, action_class: type[AlpineBitsAction]) -> bool:
|
|
"""Check if an action is actually implemented or just uses the default behavior.
|
|
This is a simple check - in practice, you might want more sophisticated detection.
|
|
"""
|
|
# Check if the class has overridden the handle method
|
|
return "handle" in action_class.__dict__
|
|
|
|
def create_capabilities_dict(self) -> None:
|
|
"""Generate the capabilities dictionary based on discovered actions."""
|
|
versions_dict = {}
|
|
|
|
for action_enum, action_class in self.action_registry.items():
|
|
action_instance = action_class()
|
|
|
|
# Get supported versions for this action
|
|
if isinstance(action_instance.version, list):
|
|
supported_versions = action_instance.version
|
|
else:
|
|
supported_versions = [action_instance.version]
|
|
|
|
# Add action to each supported version
|
|
for version in supported_versions:
|
|
version_str = version.value
|
|
|
|
if version_str not in versions_dict:
|
|
versions_dict[version_str] = {"version": version_str, "actions": []}
|
|
|
|
action_dict = {"action": action_enum.capability_name}
|
|
|
|
# Add supports field if the action has custom supports
|
|
if hasattr(action_instance, "supports") and action_instance.supports:
|
|
action_dict["supports"] = action_instance.supports
|
|
|
|
versions_dict[version_str]["actions"].append(action_dict)
|
|
|
|
self.capability_dict = {"versions": list(versions_dict.values())}
|
|
|
|
# filter duplicates in actions for each version
|
|
for version in self.capability_dict["versions"]:
|
|
seen_actions = set()
|
|
unique_actions = []
|
|
for action in version["actions"]:
|
|
if action["action"] not in seen_actions:
|
|
seen_actions.add(action["action"])
|
|
unique_actions.append(action)
|
|
version["actions"] = unique_actions
|
|
|
|
# remove action_OTA_Ping from version 2024-10
|
|
for version in self.capability_dict["versions"]:
|
|
if version["version"] == "2024-10":
|
|
version["actions"] = [
|
|
action
|
|
for action in version["actions"]
|
|
if action.get("action") != "action_OTA_Ping"
|
|
]
|
|
|
|
def get_capabilities_dict(self) -> dict:
|
|
"""Get capabilities as a dictionary. Generates if not already created."""
|
|
if self.capability_dict is None:
|
|
self.create_capabilities_dict()
|
|
return self.capability_dict
|
|
|
|
def get_supported_actions(self) -> list[str]:
|
|
"""Get list of all supported action names."""
|
|
return list(self.action_registry.keys())
|
|
|
|
|
|
# Sample Action Implementations for demonstration
|
|
|
|
|
|
class PingAction(AlpineBitsAction):
|
|
"""Implementation for OTA_Ping action (handshaking)."""
|
|
|
|
def __init__(self, config: dict = {}):
|
|
self.name = AlpineBitsActionName.OTA_PING
|
|
self.version = [
|
|
Version.V2024_10,
|
|
Version.V2022_10,
|
|
] # Supports multiple versions
|
|
self.config = config
|
|
|
|
@override
|
|
async def handle(
|
|
self,
|
|
action: str,
|
|
request_xml: str,
|
|
version: Version,
|
|
client_info: AlpineBitsClientInfo,
|
|
server_capabilities: None | ServerCapabilities = None,
|
|
) -> AlpineBitsResponse:
|
|
"""Handle ping requests."""
|
|
if request_xml is None:
|
|
return AlpineBitsResponse(
|
|
"Error: Xml Request missing", HttpStatusCode.BAD_REQUEST
|
|
)
|
|
|
|
if server_capabilities is None:
|
|
return AlpineBitsResponse(
|
|
"Error: Something went wrong", HttpStatusCode.INTERNAL_SERVER_ERROR
|
|
)
|
|
|
|
# Parse the incoming request XML and extract EchoData
|
|
parser = XmlParser()
|
|
|
|
try:
|
|
parsed_request = parser.from_string(request_xml, OtaPingRq)
|
|
|
|
echo_data_client = json.loads(parsed_request.echo_data)
|
|
except Exception:
|
|
return AlpineBitsResponse(
|
|
"Error: Invalid XML request", HttpStatusCode.BAD_REQUEST
|
|
)
|
|
|
|
# compare echo data with capabilities, create a dictionary containing the matching capabilities
|
|
capabilities_dict = server_capabilities.get_capabilities_dict()
|
|
|
|
_LOGGER.debug("Capabilities of Server: %s", capabilities_dict)
|
|
matching_capabilities = {"versions": []}
|
|
|
|
# Iterate through client's requested versions
|
|
for client_version in echo_data_client.get("versions", []):
|
|
client_version_str = client_version.get("version", "")
|
|
|
|
# Find matching server version
|
|
for server_version in capabilities_dict["versions"]:
|
|
if server_version["version"] == client_version_str:
|
|
# Found a matching version, now find common actions
|
|
matching_version = {"version": client_version_str, "actions": []}
|
|
|
|
# Get client's requested actions for this version
|
|
client_actions = {
|
|
action.get("action", ""): action
|
|
for action in client_version.get("actions", [])
|
|
}
|
|
server_actions = {
|
|
action.get("action", ""): action
|
|
for action in server_version.get("actions", [])
|
|
}
|
|
|
|
# Find common actions
|
|
for action_name in client_actions:
|
|
if action_name in server_actions:
|
|
# Use server's action definition (includes our supports)
|
|
matching_version["actions"].append(
|
|
server_actions[action_name]
|
|
)
|
|
|
|
# Only add version if there are common actions
|
|
if matching_version["actions"]:
|
|
matching_capabilities["versions"].append(matching_version)
|
|
break
|
|
|
|
# Debug print to see what we matched
|
|
|
|
# Create successful ping response with matched capabilities
|
|
capabilities_json_str = dump_json_for_xml(matching_capabilities)
|
|
|
|
warning = OtaPingRs.Warnings.Warning(
|
|
status=WarningStatus.ALPINEBITS_HANDSHAKE,
|
|
type_value="11",
|
|
content=[capabilities_json_str],
|
|
)
|
|
|
|
warning_response = OtaPingRs.Warnings(warning=[warning])
|
|
|
|
client_response_echo_data = parsed_request.echo_data
|
|
|
|
response_ota_ping = OtaPingRs(
|
|
version="7.000",
|
|
warnings=warning_response,
|
|
echo_data=client_response_echo_data,
|
|
success="",
|
|
)
|
|
|
|
config = SerializerConfig(
|
|
pretty_print=True, xml_declaration=True, encoding="UTF-8"
|
|
)
|
|
|
|
serializer = XmlSerializer(config=config)
|
|
|
|
response_xml = serializer.render(
|
|
response_ota_ping, ns_map={None: "http://www.opentravel.org/OTA/2003/05"}
|
|
)
|
|
|
|
return AlpineBitsResponse(response_xml, HttpStatusCode.OK)
|
|
|
|
|
|
def strip_control_chars(s):
|
|
# Remove all control characters (ASCII < 32 and DEL)
|
|
return re.sub(r"[\x00-\x1F\x7F]", "", s)
|
|
|
|
|
|
def validate_hotel_authentication(
|
|
username: str, password: str, hotelid: str, config: dict
|
|
) -> bool:
|
|
"""Validate hotel authentication based on username, password, and hotel ID.
|
|
|
|
Example config
|
|
alpine_bits_auth:
|
|
- hotel_id: "123"
|
|
hotel_name: "Frangart Inn"
|
|
username: "alice"
|
|
password: !secret ALICE_PASSWORD
|
|
"""
|
|
if not config or "alpine_bits_auth" not in config:
|
|
return False
|
|
auth_list = config["alpine_bits_auth"]
|
|
for auth in auth_list:
|
|
if (
|
|
auth.get("hotel_id") == hotelid
|
|
and auth.get("username") == username
|
|
and auth.get("password") == password
|
|
):
|
|
return True
|
|
return False
|
|
|
|
# look for hotelid in config
|
|
|
|
|
|
class ReadAction(AlpineBitsAction):
|
|
"""Implementation for OTA_Read action."""
|
|
|
|
def __init__(self, config: dict = {}):
|
|
self.name = AlpineBitsActionName.OTA_READ
|
|
self.version = [Version.V2024_10, Version.V2022_10]
|
|
self.config = config
|
|
|
|
async def handle(
|
|
self,
|
|
action: str,
|
|
request_xml: str,
|
|
version: Version,
|
|
client_info: AlpineBitsClientInfo,
|
|
dbsession=None,
|
|
server_capabilities=None,
|
|
) -> AlpineBitsResponse:
|
|
"""Handle read requests."""
|
|
clean_action = strip_control_chars(str(action)).strip()
|
|
clean_expected = strip_control_chars(self.name.value[1]).strip()
|
|
|
|
if clean_action != clean_expected:
|
|
return AlpineBitsResponse(
|
|
f"Error: Invalid action {action}, expected {self.name.value[1]}",
|
|
HttpStatusCode.BAD_REQUEST,
|
|
)
|
|
|
|
if dbsession is None:
|
|
return AlpineBitsResponse(
|
|
"Error: Something went wrong", HttpStatusCode.INTERNAL_SERVER_ERROR
|
|
)
|
|
|
|
read_request = XmlParser().from_string(request_xml, OtaReadRq)
|
|
|
|
hotel_read_request = read_request.read_requests.hotel_read_request
|
|
|
|
hotelid = hotel_read_request.hotel_code
|
|
hotelname = hotel_read_request.hotel_name
|
|
|
|
if hotelname is None:
|
|
hotelname = "unknown"
|
|
|
|
if hotelid is None:
|
|
return AlpineBitsResponse(
|
|
"Error: Unauthorized Read Request. No target hotel specified. Check credentials",
|
|
HttpStatusCode.UNAUTHORIZED,
|
|
)
|
|
|
|
if not validate_hotel_authentication(
|
|
client_info.username, client_info.password, hotelid, self.config
|
|
):
|
|
return AlpineBitsResponse(
|
|
f"Error: Unauthorized Read Request for this specific hotel {hotelname}. Check credentials",
|
|
HttpStatusCode.UNAUTHORIZED,
|
|
)
|
|
|
|
start_date = None
|
|
|
|
"""When given, the server will send only inquiries generated after the Start timestamp, regardless
|
|
whether the client has retrieved them before or not."""
|
|
|
|
if hotel_read_request.selection_criteria is not None:
|
|
start_date = datetime.fromisoformat(
|
|
hotel_read_request.selection_criteria.start
|
|
)
|
|
|
|
# query all reservations for this hotel from the database, where start_date is greater than or equal to the given start_date
|
|
|
|
stmt = (
|
|
select(Reservation, Customer)
|
|
.join(Customer, Reservation.customer_id == Customer.id)
|
|
.filter(Reservation.hotel_code == hotelid)
|
|
)
|
|
if start_date:
|
|
_LOGGER.info("Filtering reservations from start date %s", start_date)
|
|
stmt = stmt.filter(Reservation.created_at >= start_date)
|
|
# remove reservations that have been acknowledged via client_id
|
|
elif client_info.client_id:
|
|
subquery = (
|
|
select(Reservation.id)
|
|
.join(
|
|
AckedRequest,
|
|
Reservation.md5_unique_id == AckedRequest.unique_id,
|
|
)
|
|
.filter(AckedRequest.client_id == client_info.client_id)
|
|
)
|
|
stmt = stmt.filter(~Reservation.id.in_(subquery))
|
|
|
|
result = await dbsession.execute(stmt)
|
|
reservation_customer_pairs: list[tuple[Reservation, Customer]] = (
|
|
result.all()
|
|
) # List of (Reservation, Customer) tuples
|
|
|
|
_LOGGER.info(
|
|
"Querying reservations and customers for hotel %s from database",
|
|
hotelid,
|
|
)
|
|
for reservation, customer in reservation_customer_pairs:
|
|
_LOGGER.info(
|
|
"Retrieving reservation %s for customer %s %s",
|
|
reservation.id,
|
|
customer.given_name,
|
|
customer.surname,
|
|
)
|
|
|
|
res_retrive_rs = create_res_retrieve_response(
|
|
reservation_customer_pairs, config=self.config
|
|
)
|
|
|
|
config = SerializerConfig(
|
|
pretty_print=True, xml_declaration=True, encoding="UTF-8"
|
|
)
|
|
serializer = XmlSerializer(config=config)
|
|
response_xml = serializer.render(
|
|
res_retrive_rs, ns_map={None: "http://www.opentravel.org/OTA/2003/05"}
|
|
)
|
|
|
|
return AlpineBitsResponse(response_xml, HttpStatusCode.OK)
|
|
|
|
|
|
class NotifReportReadAction(AlpineBitsAction):
|
|
"""Necessary for read action to follow specification. Clients need to report acknowledgements."""
|
|
|
|
def __init__(self, config: dict = {}):
|
|
self.name = AlpineBitsActionName.OTA_HOTEL_NOTIF_REPORT
|
|
self.version = [Version.V2024_10, Version.V2022_10]
|
|
self.config = config
|
|
|
|
async def handle(
|
|
self,
|
|
action: str,
|
|
request_xml: str,
|
|
version: Version,
|
|
client_info: AlpineBitsClientInfo,
|
|
dbsession=None,
|
|
server_capabilities=None,
|
|
) -> AlpineBitsResponse:
|
|
"""Handle read requests."""
|
|
notif_report = XmlParser().from_string(request_xml, OtaNotifReportRq)
|
|
|
|
# we can't check hotel auth here, because this action does not contain hotel info
|
|
|
|
warnings = notif_report.warnings
|
|
notif_report_details = notif_report.notif_details
|
|
|
|
success_message = OtaNotifReportRs(version="7.000", success="")
|
|
|
|
if client_info.client_id is None:
|
|
return AlpineBitsResponse(
|
|
"ERROR:no valid client id provided", HttpStatusCode.BAD_REQUEST
|
|
)
|
|
|
|
config = SerializerConfig(
|
|
pretty_print=True, xml_declaration=True, encoding="UTF-8"
|
|
)
|
|
serializer = XmlSerializer(config=config)
|
|
response_xml = serializer.render(
|
|
success_message, ns_map={None: "http://www.opentravel.org/OTA/2003/05"}
|
|
)
|
|
|
|
if (warnings is None and notif_report_details is None) or (
|
|
notif_report_details is not None
|
|
and notif_report_details.hotel_notif_report is None
|
|
):
|
|
return AlpineBitsResponse(
|
|
response_xml, HttpStatusCode.OK
|
|
) # Nothing to process
|
|
if dbsession is None:
|
|
return AlpineBitsResponse(
|
|
"Error: Something went wrong", HttpStatusCode.INTERNAL_SERVER_ERROR
|
|
)
|
|
|
|
timestamp = datetime.now(ZoneInfo("UTC"))
|
|
for entry in (
|
|
notif_report_details.hotel_notif_report.hotel_reservations.hotel_reservation
|
|
): # type: ignore
|
|
unique_id = entry.unique_id.id
|
|
acked_request = AckedRequest(
|
|
unique_id=unique_id,
|
|
client_id=client_info.client_id,
|
|
timestamp=timestamp,
|
|
)
|
|
dbsession.add(acked_request)
|
|
|
|
await dbsession.commit()
|
|
|
|
return AlpineBitsResponse(response_xml, HttpStatusCode.OK)
|
|
|
|
|
|
class PushAction(AlpineBitsAction):
|
|
"""Creates the necessary xml for OTA_HotelResNotif:GuestRequests."""
|
|
|
|
def __init__(self, config: dict = {}):
|
|
self.name = AlpineBitsActionName.OTA_HOTEL_RES_NOTIF_GUEST_REQUESTS
|
|
self.version = [Version.V2024_10, Version.V2022_10]
|
|
self.config = config
|
|
|
|
async def handle(
|
|
self,
|
|
action: str,
|
|
request_xml: tuple[Reservation, Customer],
|
|
version: Version,
|
|
client_info: AlpineBitsClientInfo,
|
|
dbsession=None,
|
|
server_capabilities=None,
|
|
) -> AlpineBitsResponse:
|
|
"""Create push request XML."""
|
|
xml_push_request = create_res_notif_push_message(
|
|
request_xml, config=self.config
|
|
)
|
|
|
|
config = SerializerConfig(
|
|
pretty_print=True, xml_declaration=True, encoding="UTF-8"
|
|
)
|
|
serializer = XmlSerializer(config=config)
|
|
xml_push_request = serializer.render(
|
|
xml_push_request, ns_map={None: "http://www.opentravel.org/OTA/2003/05"}
|
|
)
|
|
|
|
return AlpineBitsResponse(xml_push_request, HttpStatusCode.OK)
|
|
|
|
|
|
class AlpineBitsServer:
|
|
"""Asynchronous AlpineBits server for handling hotel data exchange requests.
|
|
|
|
This server handles various OTA actions and implements the AlpineBits protocol
|
|
for hotel data exchange. It maintains a registry of supported actions and
|
|
their capabilities, and can respond to handshake requests with its capabilities.
|
|
"""
|
|
|
|
def __init__(self, config: dict | None = None):
|
|
self.capabilities = ServerCapabilities()
|
|
self._action_instances = {}
|
|
self.config = config
|
|
self._initialize_action_instances()
|
|
|
|
def _initialize_action_instances(self):
|
|
"""Initialize instances of all discovered action classes."""
|
|
for capability_name, action_class in self.capabilities.action_registry.items():
|
|
_LOGGER.info(f"Initializing action instance for {capability_name}")
|
|
self._action_instances[capability_name] = action_class(config=self.config)
|
|
|
|
def get_capabilities(self) -> dict:
|
|
"""Get server capabilities."""
|
|
return self.capabilities.get_capabilities_dict()
|
|
|
|
async def handle_request(
|
|
self,
|
|
request_action_name: str,
|
|
request_xml: str | tuple[Reservation, Customer],
|
|
client_info: AlpineBitsClientInfo,
|
|
version: str = "2024-10",
|
|
dbsession=None,
|
|
) -> AlpineBitsResponse:
|
|
"""Handle an incoming AlpineBits request by routing to appropriate action handler.
|
|
|
|
Args:
|
|
request_action_name: The action name from the request (e.g., "OTA_Read:GuestRequests")
|
|
request_xml: The XML request body. Gets passed to the action handler. In case of PushRequest can be the data to be pushed
|
|
version: The AlpineBits version (defaults to "2024-10")
|
|
|
|
Returns:
|
|
AlpineBitsResponse with the result
|
|
|
|
"""
|
|
# Convert string version to enum
|
|
try:
|
|
version_enum = Version(version)
|
|
except ValueError:
|
|
return AlpineBitsResponse(
|
|
f"Error: Unsupported version {version}", HttpStatusCode.BAD_REQUEST
|
|
)
|
|
|
|
# Find the action by request name
|
|
action_enum = AlpineBitsActionName.get_by_request_name(request_action_name)
|
|
|
|
_LOGGER.info(
|
|
f"Handling request for action: {request_action_name} with action enum: {action_enum}"
|
|
)
|
|
if not action_enum:
|
|
return AlpineBitsResponse(
|
|
f"Error: Unknown action {request_action_name}",
|
|
HttpStatusCode.BAD_REQUEST,
|
|
)
|
|
|
|
# Check if we have an implementation for this action
|
|
|
|
if action_enum not in self._action_instances:
|
|
return AlpineBitsResponse(
|
|
f"Error: Action {request_action_name} is not implemented",
|
|
HttpStatusCode.BAD_REQUEST,
|
|
)
|
|
|
|
action_instance: AlpineBitsAction = self._action_instances[action_enum]
|
|
|
|
# Check if the action supports the requested version
|
|
if not await action_instance.check_version_supported(version_enum):
|
|
return AlpineBitsResponse(
|
|
f"Error: Action {request_action_name} does not support version {version}",
|
|
HttpStatusCode.BAD_REQUEST,
|
|
)
|
|
|
|
# Handle the request
|
|
try:
|
|
# Special case for ping action - pass server capabilities
|
|
|
|
if action_enum == AlpineBitsActionName.OTA_HOTEL_RES_NOTIF_GUEST_REQUESTS:
|
|
action_instance: PushAction
|
|
if request_xml is None or not isinstance(request_xml, tuple):
|
|
return AlpineBitsResponse(
|
|
"Error: Invalid data for push request",
|
|
HttpStatusCode.BAD_REQUEST,
|
|
)
|
|
return await action_instance.handle(
|
|
action=request_action_name,
|
|
request_xml=request_xml,
|
|
version=version_enum,
|
|
client_info=client_info,
|
|
)
|
|
|
|
if action_enum == AlpineBitsActionName.OTA_PING:
|
|
return await action_instance.handle(
|
|
action=request_action_name,
|
|
request_xml=request_xml,
|
|
version=version_enum,
|
|
server_capabilities=self.capabilities,
|
|
client_info=client_info,
|
|
)
|
|
return await action_instance.handle(
|
|
action=request_action_name,
|
|
request_xml=request_xml,
|
|
version=version_enum,
|
|
dbsession=dbsession,
|
|
client_info=client_info,
|
|
)
|
|
except Exception as e:
|
|
# print stack trace for debugging
|
|
import traceback
|
|
|
|
traceback.print_exc()
|
|
return AlpineBitsResponse(
|
|
f"Error: Internal server error while processing {request_action_name}: {e!s}",
|
|
HttpStatusCode.INTERNAL_SERVER_ERROR,
|
|
)
|
|
|
|
def get_supported_request_names(self) -> list[str]:
|
|
"""Get all supported request names (not capability names)."""
|
|
request_names = []
|
|
for capability_name in self._action_instances:
|
|
action_enum = AlpineBitsActionName.get_by_capability_name(capability_name)
|
|
if action_enum:
|
|
request_names.append(action_enum.request_name)
|
|
return sorted(request_names)
|
|
|
|
def is_action_supported(
|
|
self, request_action_name: str, version: str | None = None
|
|
) -> bool:
|
|
"""Check if a request action is supported.
|
|
|
|
Args:
|
|
request_action_name: The request action name (e.g., "OTA_Read:GuestRequests")
|
|
version: Optional version to check
|
|
|
|
Returns:
|
|
True if supported, False otherwise
|
|
|
|
"""
|
|
action_enum = AlpineBitsActionName.get_by_request_name(request_action_name)
|
|
if not action_enum:
|
|
return False
|
|
|
|
capability_name = action_enum.capability_name
|
|
if capability_name not in self._action_instances:
|
|
return False
|
|
|
|
if version:
|
|
try:
|
|
version_enum = Version(version)
|
|
action_instance = self._action_instances[capability_name]
|
|
# This would need to be async, but for simplicity we'll just check if version exists
|
|
if isinstance(action_instance.version, list):
|
|
return version_enum in action_instance.version
|
|
return action_instance.version == version_enum
|
|
except ValueError:
|
|
return False
|
|
|
|
return True
|