Server takes form

This commit is contained in:
Jonas Linter
2025-09-25 11:33:54 +02:00
parent 35da6d3c24
commit 6925dd734c
6 changed files with 678 additions and 333 deletions

View File

@@ -8,19 +8,289 @@ handshaking functionality with configurable supported actions and capabilities.
import asyncio
import json
from typing import Dict, List, Optional, Any
import inspect
from typing import Dict, List, Optional, Any, Union, Tuple, Type
from xml.etree import ElementTree as ET
from dataclasses import dataclass
from enum import Enum, IntEnum
from .generated.alpinebits import OtaPingRq, OtaPingRs, WarningStatus
from xsdata_pydantic.bindings import XmlSerializer
from xsdata.formats.dataclass.serializers.config import SerializerConfig
from abc import ABC, abstractmethod
class HttpStatusCode(IntEnum):
"""Allowed HTTP status codes for AlpineBits responses."""
OK = 200
BAD_REQUEST = 400
UNAUTHORIZED = 401
INTERNAL_SERVER_ERROR = 500
class AlpineBitsActionName(Enum):
"""Enum for AlpineBits action names with capability and request name mappings."""
# Format: (capability_name, actual_request_name)
OTA_PING = ("action_OTA_Ping", "OTA_Ping:Handshaking")
OTA_READ = ("action_OTA_Read", "OTA_Read:GuestRequests")
OTA_HOTEL_AVAIL_NOTIF = ("action_OTA_HotelAvailNotif", "OTA_HotelAvailNotif")
OTA_HOTEL_RES_NOTIF_GUEST_REQUESTS = ("action_OTA_HotelResNotif_GuestRequests",
"OTA_HotelResNotif:GuestRequests")
OTA_HOTEL_DESCRIPTIVE_CONTENT_NOTIF_INVENTORY = ("action_OTA_HotelDescriptiveContentNotif_Inventory",
"OTA_HotelDescriptiveContentNotif:Inventory")
OTA_HOTEL_DESCRIPTIVE_CONTENT_NOTIF_INFO = ("action_OTA_HotelDescriptiveContentNotif_Info",
"OTA_HotelDescriptiveContentNotif:Info")
OTA_HOTEL_DESCRIPTIVE_INFO_INVENTORY = ("action_OTA_HotelDescriptiveInfo_Inventory",
"OTA_HotelDescriptiveInfo:Inventory")
OTA_HOTEL_DESCRIPTIVE_INFO_INFO = ("action_OTA_HotelDescriptiveInfo_Info",
"OTA_HotelDescriptiveInfo:Info")
OTA_HOTEL_RATE_PLAN_NOTIF_RATE_PLANS = ("action_OTA_HotelRatePlanNotif_RatePlans",
"OTA_HotelRatePlanNotif:RatePlans")
OTA_HOTEL_RATE_PLAN_BASE_RATES = ("action_OTA_HotelRatePlan_BaseRates",
"OTA_HotelRatePlan:BaseRates")
def __init__(self, capability_name: str, request_name: str):
self.capability_name = capability_name
self.request_name = request_name
@classmethod
def get_by_capability_name(cls, capability_name: str) -> Optional['AlpineBitsActionName']:
"""Get action enum by capability name."""
for action in cls:
if action.capability_name == capability_name:
return action
return None
@classmethod
def get_by_request_name(cls, request_name: str) -> Optional['AlpineBitsActionName']:
"""Get action enum by request name."""
for action in cls:
if action.request_name == request_name:
return action
return None
class Version(str, Enum):
"""Enum for AlpineBits versions."""
V2024_10 = "2024-10"
V2022_10 = "2022-10"
# Add other versions as needed
@dataclass
class AlpineBitsResponse:
"""Response data structure for AlpineBits actions."""
xml_content: str
status_code: HttpStatusCode = HttpStatusCode.OK
def __post_init__(self):
"""Validate that status code is one of the allowed values."""
if self.status_code not in [200, 400, 401, 500]:
raise ValueError(f"Invalid status code {self.status_code}. Must be 200, 400, 401, or 500")
# Abstract base class for AlpineBits Action
class AlpineBitsAction(ABC):
"""Abstract base class for handling AlpineBits actions."""
name: AlpineBitsActionName
version: Version | list[Version] # list of versions in case action supports multiple versions
async def handle(self, action: str, request_xml: str, version: Version) -> AlpineBitsResponse:
"""
Handle the incoming request XML and return response XML.
Default implementation returns "not implemented" error.
Override this method in subclasses to provide actual functionality.
Args:
action: The action to perform (e.g., "OTA_PingRQ")
request_xml: The XML request body as string
version: The AlpineBits version
Returns:
AlpineBitsResponse with error or actual response
"""
return_string = f"Error: Action {action} not implemented"
return AlpineBitsResponse(return_string, HttpStatusCode.BAD_REQUEST)
async def check_version_supported(self, version: Version) -> bool:
"""
Check if the action supports the given version.
Args:
version: The AlpineBits version to check
Returns:
True if supported, False otherwise
"""
if isinstance(self.version, list):
return version in self.version
return version == self.version
class ServerCapabilities:
pass
"""
Automatically discovers AlpineBitsAction implementations and generates capabilities.
"""
def __init__(self):
self.action_registry: Dict[str, Type[AlpineBitsAction]] = {}
self._discover_actions()
def _discover_actions(self):
"""Discover all AlpineBitsAction implementations in the current module."""
current_module = inspect.getmodule(self)
for name, obj in inspect.getmembers(current_module):
if (inspect.isclass(obj) and
issubclass(obj, AlpineBitsAction) and
obj != AlpineBitsAction):
# Check if this action is actually implemented (not just returning default)
if self._is_action_implemented(obj):
action_instance = obj()
if hasattr(action_instance, 'name'):
# Use capability name for the registry key
self.action_registry[action_instance.name.capability_name] = obj
def _is_action_implemented(self, action_class: Type[AlpineBitsAction]) -> bool:
"""
Check if an action is actually implemented or just uses the default behavior.
This is a simple check - in practice, you might want more sophisticated detection.
"""
# Check if the class has overridden the handle method
if 'handle' in action_class.__dict__:
return True
return False
def get_capabilities_dict(self) -> Dict:
"""
Generate the capabilities dictionary based on discovered actions.
Returns:
Dictionary matching the AlpineBits capabilities format
"""
versions_dict = {}
for action_name, action_class in self.action_registry.items():
action_instance = action_class()
# Get supported versions for this action
if isinstance(action_instance.version, list):
supported_versions = action_instance.version
else:
supported_versions = [action_instance.version]
# Add action to each supported version
for version in supported_versions:
version_str = version.value
if version_str not in versions_dict:
versions_dict[version_str] = {
"version": version_str,
"actions": []
}
action_dict = {"action": action_name}
# Add supports field if the action has custom supports
if hasattr(action_instance, 'supports') and action_instance.supports:
action_dict["supports"] = action_instance.supports
versions_dict[version_str]["actions"].append(action_dict)
return {
"versions": list(versions_dict.values())
}
def get_capabilities_json(self) -> str:
"""Get capabilities as formatted JSON string."""
return json.dumps(self.get_capabilities_dict(), indent=2)
def get_supported_actions(self) -> List[str]:
"""Get list of all supported action names."""
return list(self.action_registry.keys())
# Sample Action Implementations for demonstration
class PingAction(AlpineBitsAction):
"""Implementation for OTA_Ping action (handshaking)."""
def __init__(self):
self.name = AlpineBitsActionName.OTA_PING
self.version = [Version.V2024_10, Version.V2022_10] # Supports multiple versions
async def handle(self, action: str, request_xml: str, version: Version) -> AlpineBitsResponse:
"""Handle ping requests."""
response_xml = f'''<?xml version="1.0" encoding="UTF-8"?>
<OTA_PingRS xmlns="http://www.opentravel.org/OTA/2003/05" Version="8.000">
<Success/>
<EchoData>Ping successful for version {version.value}</EchoData>
</OTA_PingRS>'''
return AlpineBitsResponse(response_xml, HttpStatusCode.OK)
class ReadAction(AlpineBitsAction):
"""Implementation for OTA_Read action."""
def __init__(self):
self.name = AlpineBitsActionName.OTA_READ
self.version = [Version.V2024_10, Version.V2022_10]
async def handle(self, action: str, request_xml: str, version: Version) -> AlpineBitsResponse:
"""Handle read requests."""
response_xml = f'''<?xml version="1.0" encoding="UTF-8"?>
<OTA_ReadRS xmlns="http://www.opentravel.org/OTA/2003/05" Version="8.000">
<Success/>
<Data>Read operation successful for {version.value}</Data>
</OTA_ReadRS>'''
return AlpineBitsResponse(response_xml, HttpStatusCode.OK)
class HotelAvailNotifAction(AlpineBitsAction):
"""Implementation for Hotel Availability Notification action with supports."""
def __init__(self):
self.name = AlpineBitsActionName.OTA_HOTEL_AVAIL_NOTIF
self.version = Version.V2024_10
self.supports = [
"OTA_HotelAvailNotif_accept_rooms",
"OTA_HotelAvailNotif_accept_categories",
"OTA_HotelAvailNotif_accept_deltas",
"OTA_HotelAvailNotif_accept_BookingThreshold"
]
async def handle(self, action: str, request_xml: str, version: Version) -> AlpineBitsResponse:
"""Handle hotel availability notifications."""
response_xml = '''<?xml version="1.0" encoding="UTF-8"?>
<OTA_HotelAvailNotifRS xmlns="http://www.opentravel.org/OTA/2003/05" Version="8.000">
<Success/>
</OTA_HotelAvailNotifRS>'''
return AlpineBitsResponse(response_xml, HttpStatusCode.OK)
class GuestRequestsAction(AlpineBitsAction):
"""Unimplemented action - will not appear in capabilities."""
def __init__(self):
self.name = AlpineBitsActionName.OTA_HOTEL_RES_NOTIF_GUEST_REQUESTS
self.version = Version.V2024_10
# Note: This class doesn't override the handle method, so it won't be discovered
@@ -32,369 +302,182 @@ class AlpineBitsServer:
for hotel data exchange. It maintains a registry of supported actions and
their capabilities, and can respond to handshake requests with its capabilities.
"""
def __init__(self):
"""Initialize the AlpineBits server with default supported actions."""
# Define supported versions and their capabilities
self.supported_versions = {
"2024-10": {
"actions": [
{
"action": "action_OTA_Read"
},
{
"action": "action_OTA_HotelResNotif_GuestRequests"
},
{
"action": "action_OTA_HotelResNotif_GuestRequests_StatusUpdate"
}
]
},
"2022-10": {
"actions": [
{
"action": "action_OTA_Read"
},
{
"action": "action_OTA_HotelResNotif_GuestRequests"
},
{
"action": "action_OTA_HotelResNotif_GuestRequests_StatusUpdate"
}
]
}
}
# XML serializer configuration
self.serializer_config = SerializerConfig(
pretty_print=True,
xml_declaration=True,
encoding="UTF-8"
)
self.xml_serializer = XmlSerializer(config=self.serializer_config)
# Namespace map for XML serialization
self.ns_map = {
None: "http://www.opentravel.org/OTA/2003/05",
"xsi": "http://www.w3.org/2001/XMLSchema-instance"
}
def add_supported_action(self, version: str, action: str, supports: Optional[List[str]] = None):
"""
Add a supported action to a specific version.
Args:
version: AlpineBits version (e.g., "2024-10")
action: Action name (e.g., "action_OTA_HotelInvCountNotif")
supports: List of supported features for this action (optional)
"""
if version not in self.supported_versions:
self.supported_versions[version] = {"actions": []}
action_dict = {"action": action}
if supports:
action_dict["supports"] = supports
self.supported_versions[version]["actions"].append(action_dict)
def remove_supported_action(self, version: str, action: str):
"""
Remove a supported action from a specific version.
Args:
version: AlpineBits version
action: Action name to remove
"""
if version in self.supported_versions:
actions = self.supported_versions[version]["actions"]
self.supported_versions[version]["actions"] = [
a for a in actions if a["action"] != action
]
def is_action_supported(self, action: str, version: str = None) -> bool:
"""
Check if an action is supported by the server.
Args:
action: Action name to check
version: Specific version to check (if None, checks all versions)
Returns:
True if action is supported, False otherwise
"""
if version:
if version not in self.supported_versions:
return False
actions = self.supported_versions[version]["actions"]
return any(a["action"] == action for a in actions)
# Check all versions
for ver_data in self.supported_versions.values():
if any(a["action"] == action for a in ver_data["actions"]):
return True
return False
self.capabilities = ServerCapabilities()
self._action_instances = {}
self._initialize_action_instances()
def _initialize_action_instances(self):
"""Initialize instances of all discovered action classes."""
for capability_name, action_class in self.capabilities.action_registry.items():
self._action_instances[capability_name] = action_class()
def get_capabilities(self) -> Dict:
"""Get server capabilities."""
return self.capabilities.get_capabilities_dict()
def get_capabilities_json(self) -> str:
"""Get server capabilities as JSON."""
return self.capabilities.get_capabilities_json()
async def handle_request(self, request_action_name: str, request_xml: str, version: str = "2024-10") -> AlpineBitsResponse:
"""
Get the server capabilities as JSON string for handshake responses.
Returns:
JSON string containing supported versions and actions
"""
capabilities = {
"versions": [
{
"version": version,
"actions": version_data["actions"]
}
for version, version_data in self.supported_versions.items()
]
}
return json.dumps(capabilities, indent=4)
async def handle_request(self, action: str, request_xml: str) -> str:
"""
Handle an incoming AlpineBits request.
Handle an incoming AlpineBits request by routing to appropriate action handler.
Args:
action: The action to perform (e.g., "action_OTA_Ping")
request_action_name: The action name from the request (e.g., "OTA_Read:GuestRequests")
request_xml: The XML request body
version: The AlpineBits version (defaults to "2024-10")
Returns:
XML response string
Raises:
ValueError: If the action is not supported
"""
# Parse the action to determine the handler method
if action == "action_OTA_Ping":
return await self.handle_ping(request_xml)
elif action == "action_OTA_Read":
return await self.handle_read(request_xml)
elif action == "action_OTA_HotelResNotif_GuestRequests":
return await self.handle_guest_requests(request_xml)
elif action == "action_OTA_HotelResNotif_GuestRequests_StatusUpdate":
return await self.handle_guest_requests_status_update(request_xml)
else:
# Return error response for unsupported actions
return await self.handle_unsupported_action(action, request_xml)
async def handle_ping(self, request_xml: str) -> str:
"""
Handle OTA_Ping requests (handshake).
Args:
request_xml: The OTA_PingRQ XML request
Returns:
OTA_PingRS XML response with server capabilities
AlpineBitsResponse with the result
"""
# Convert string version to enum
try:
# Parse the incoming request to extract any version information
root = ET.fromstring(request_xml)
version = root.get("Version", "8.000")
# Extract echo data if present
echo_data_elem = root.find(".//{http://www.opentravel.org/OTA/2003/05}EchoData")
echo_data = None
if echo_data_elem is not None and echo_data_elem.text:
echo_data = echo_data_elem.text.strip()
# Get capabilities JSON
capabilities_json = self.get_capabilities_json()
# Create warning with capabilities
warning = OtaPingRs.Warnings.Warning(
type_value="11",
status=WarningStatus.ALPINEBITS_HANDSHAKE,
content=[capabilities_json]
version_enum = Version(version)
except ValueError:
return AlpineBitsResponse(
f"Error: Unsupported version {version}",
HttpStatusCode.BAD_REQUEST
)
warnings = OtaPingRs.Warnings(warning=[warning])
# Create successful ping response
ping_response = OtaPingRs(
version=version,
success=None, # Empty success element
warnings=warnings,
echo_data=echo_data or capabilities_json
# Find the action by request name
action_enum = AlpineBitsActionName.get_by_request_name(request_action_name)
if not action_enum:
return AlpineBitsResponse(
f"Error: Unknown action {request_action_name}",
HttpStatusCode.BAD_REQUEST
)
# Serialize to XML
return self.xml_serializer.render(ping_response, ns_map=self.ns_map)
# Check if we have an implementation for this action
capability_name = action_enum.capability_name
if capability_name not in self._action_instances:
return AlpineBitsResponse(
f"Error: Action {request_action_name} is not implemented",
HttpStatusCode.BAD_REQUEST
)
action_instance = self._action_instances[capability_name]
# Check if the action supports the requested version
if not await action_instance.check_version_supported(version_enum):
return AlpineBitsResponse(
f"Error: Action {request_action_name} does not support version {version}",
HttpStatusCode.BAD_REQUEST
)
# Handle the request
try:
return await action_instance.handle(request_action_name, request_xml, version_enum)
except Exception as e:
# Return error response if something goes wrong
return await self.create_error_response(
"OTA_PingRS",
"8.000",
f"Failed to process ping request: {str(e)}"
return AlpineBitsResponse(
f"Error: Internal server error while processing {request_action_name}: {str(e)}",
HttpStatusCode.INTERNAL_SERVER_ERROR
)
async def handle_read(self, request_xml: str) -> str:
def get_supported_request_names(self) -> List[str]:
"""Get all supported request names (not capability names)."""
request_names = []
for capability_name in self._action_instances.keys():
action_enum = AlpineBitsActionName.get_by_capability_name(capability_name)
if action_enum:
request_names.append(action_enum.request_name)
return sorted(request_names)
def is_action_supported(self, request_action_name: str, version: str = None) -> bool:
"""
Handle OTA_Read requests.
Check if a request action is supported.
Args:
request_xml: The OTA_ReadRQ XML request
request_action_name: The request action name (e.g., "OTA_Read:GuestRequests")
version: Optional version to check
Returns:
XML response (placeholder implementation)
True if supported, False otherwise
"""
# Placeholder implementation - return unsupported for now
return await self.create_error_response(
"OTA_ReadRS",
"8.000",
"OTA_Read action not yet implemented"
)
async def handle_guest_requests(self, request_xml: str) -> str:
"""
Handle guest request notifications.
action_enum = AlpineBitsActionName.get_by_request_name(request_action_name)
if not action_enum:
return False
Args:
request_xml: The guest request XML
Returns:
XML response (placeholder implementation)
"""
# Placeholder implementation - return unsupported for now
return await self.create_error_response(
"OTA_HotelResNotifRS",
"8.000",
"Guest requests action not yet implemented"
)
async def handle_guest_requests_status_update(self, request_xml: str) -> str:
"""
Handle guest request status updates.
Args:
request_xml: The status update XML
Returns:
XML response (placeholder implementation)
"""
# Placeholder implementation - return unsupported for now
return await self.create_error_response(
"OTA_HotelResNotifRS",
"8.000",
"Guest request status updates not yet implemented"
)
async def handle_unsupported_action(self, action: str, request_xml: str) -> str:
"""
Handle unsupported actions by returning an appropriate error response.
Args:
action: The unsupported action name
request_xml: The request XML
Returns:
XML error response
"""
return await self.create_error_response(
"OTA_PingRS", # Use generic ping response for unknown actions
"8.000",
f"Action '{action}' is not supported by this server"
)
async def create_error_response(self, response_type: str, version: str, error_message: str) -> str:
"""
Create a generic error response.
Args:
response_type: The response type (e.g., "OTA_PingRS")
version: Version number
error_message: Error message to include
Returns:
XML error response string
"""
# Create a basic error response structure
error_xml = f'''<?xml version="1.0" encoding="UTF-8"?>
<{response_type} xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.opentravel.org/OTA/2003/05"
xsi:schemaLocation="http://www.opentravel.org/OTA/2003/05 {response_type}.xsd"
Version="{version}">
<Errors>
<Error Type="13" Code="450">{error_message}</Error>
</Errors>
</{response_type}>'''
return error_xml
def get_supported_actions(self, version: str = None) -> List[str]:
"""
Get list of supported action names.
Args:
version: Specific version to get actions for (if None, returns all)
Returns:
List of supported action names
"""
actions = set()
capability_name = action_enum.capability_name
if capability_name not in self._action_instances:
return False
if version:
if version in self.supported_versions:
for action_data in self.supported_versions[version]["actions"]:
actions.add(action_data["action"])
else:
for version_data in self.supported_versions.values():
for action_data in version_data["actions"]:
actions.add(action_data["action"])
try:
version_enum = Version(version)
action_instance = self._action_instances[capability_name]
# This would need to be async, but for simplicity we'll just check if version exists
if isinstance(action_instance.version, list):
return version_enum in action_instance.version
else:
return action_instance.version == version_enum
except ValueError:
return False
return sorted(list(actions))
def get_supported_versions(self) -> List[str]:
"""
Get list of supported AlpineBits versions.
Returns:
List of supported version strings
"""
return list(self.supported_versions.keys())
return True
# Example usage and testing
async def main():
"""Example usage of the AlpineBits server."""
"""Demonstrate the automatic capabilities discovery and request handling."""
print("🚀 AlpineBits Server Capabilities Discovery & Request Handling Demo")
print("=" * 70)
# Create server instance
server = AlpineBitsServer()
# Add additional supported actions
server.add_supported_action(
"2024-10",
"action_OTA_HotelInvCountNotif",
[
"OTA_HotelInvCountNotif_accept_rooms",
"OTA_HotelInvCountNotif_accept_categories",
"OTA_HotelInvCountNotif_accept_deltas"
]
)
print("\n📋 Discovered Action Classes:")
print("-" * 30)
for capability_name, action_class in server.capabilities.action_registry.items():
action_enum = AlpineBitsActionName.get_by_capability_name(capability_name)
request_name = action_enum.request_name if action_enum else "unknown"
print(f"{capability_name} -> {action_class.__name__}")
print(f" Request name: {request_name}")
# Example ping request
ping_request = '''<?xml version="1.0" encoding="UTF-8"?>
<OTA_PingRQ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.opentravel.org/OTA/2003/05"
xsi:schemaLocation="http://www.opentravel.org/OTA/2003/05 OTA_PingRQ.xsd"
Version="8.000">
<EchoData>{"test": "handshake request"}</EchoData>
</OTA_PingRQ>'''
print(f"\n📊 Total Implemented Actions: {len(server.capabilities.get_supported_actions())}")
# Handle the request
try:
response = await server.handle_request("action_OTA_Ping", ping_request)
print("Response:")
print(response)
print("\n🔍 Generated Capabilities JSON:")
print("-" * 30)
capabilities_json = server.get_capabilities_json()
print(capabilities_json)
print("\n🎯 Supported Request Names:")
print("-" * 30)
for request_name in server.get_supported_request_names():
print(f"{request_name}")
print("\n🧪 Testing Request Handling:")
print("-" * 30)
test_xml = "<test>sample request</test>"
# Test different request formats
test_cases = [
("OTA_Ping", "2024-10"),
("OTA_Read:GuestRequests", "2024-10"),
("OTA_Read", "2022-10"),
("OTA_HotelAvailNotif", "2024-10"),
("UnknownAction", "2024-10"),
("OTA_Ping", "unsupported-version")
]
for request_name, version in test_cases:
print(f"\n<EFBFBD> Testing: {request_name} (v{version})")
# Test unsupported action
print("\n--- Testing unsupported action ---")
unsupported_response = await server.handle_request("action_NOT_SUPPORTED", ping_request)
print("Unsupported action response:")
print(unsupported_response)
# Check if supported first
is_supported = server.is_action_supported(request_name, version)
print(f" Supported: {is_supported}")
except Exception as e:
print(f"Error: {e}")
# Handle the request
response = await server.handle_request(request_name, test_xml, version)
print(f" Status: {response.status_code}")
if len(response.xml_content) > 100:
print(f" Response: {response.xml_content[:100]}...")
else:
print(f" Response: {response.xml_content}")
print("\n✅ Demo completed successfully!")
if __name__ == "__main__":

View File

@@ -25,6 +25,13 @@ def main():
parsed_result = parser.from_string(xml, OtaPingRq)
print(parsed_result.echo_data)
# save json in echo_data to file with indents
output_path = "parsed_echo_data.json"
with open(output_path, "w", encoding="utf-8") as out_f:
import json
json.dump(json.loads(parsed_result.echo_data), out_f, indent=4)
print(f"Saved echo_data json to {output_path}")