trying to figure out best project structure

This commit is contained in:
Jonas Linter
2025-09-25 09:36:07 +02:00
parent d41084cd1b
commit 9ea09ffa3f
12 changed files with 495 additions and 47 deletions

View File

@@ -0,0 +1,401 @@
"""
AlpineBits Server for handling hotel data exchange.
This module provides an asynchronous AlpineBits server that can handle various
OTA (OpenTravel Alliance) actions for hotel data exchange. Currently implements
handshaking functionality with configurable supported actions and capabilities.
"""
import asyncio
import json
from typing import Dict, List, Optional, Any
from xml.etree import ElementTree as ET
from generated.alpinebits import OtaPingRq, OtaPingRs, WarningStatus
from xsdata_pydantic.bindings import XmlSerializer
from xsdata.formats.dataclass.serializers.config import SerializerConfig
class ServerCapabilities:
pass
class AlpineBitsServer:
"""
Asynchronous AlpineBits server for handling hotel data exchange requests.
This server handles various OTA actions and implements the AlpineBits protocol
for hotel data exchange. It maintains a registry of supported actions and
their capabilities, and can respond to handshake requests with its capabilities.
"""
def __init__(self):
"""Initialize the AlpineBits server with default supported actions."""
# Define supported versions and their capabilities
self.supported_versions = {
"2024-10": {
"actions": [
{
"action": "action_OTA_Read"
},
{
"action": "action_OTA_HotelResNotif_GuestRequests"
},
{
"action": "action_OTA_HotelResNotif_GuestRequests_StatusUpdate"
}
]
},
"2022-10": {
"actions": [
{
"action": "action_OTA_Read"
},
{
"action": "action_OTA_HotelResNotif_GuestRequests"
},
{
"action": "action_OTA_HotelResNotif_GuestRequests_StatusUpdate"
}
]
}
}
# XML serializer configuration
self.serializer_config = SerializerConfig(
pretty_print=True,
xml_declaration=True,
encoding="UTF-8"
)
self.xml_serializer = XmlSerializer(config=self.serializer_config)
# Namespace map for XML serialization
self.ns_map = {
None: "http://www.opentravel.org/OTA/2003/05",
"xsi": "http://www.w3.org/2001/XMLSchema-instance"
}
def add_supported_action(self, version: str, action: str, supports: Optional[List[str]] = None):
"""
Add a supported action to a specific version.
Args:
version: AlpineBits version (e.g., "2024-10")
action: Action name (e.g., "action_OTA_HotelInvCountNotif")
supports: List of supported features for this action (optional)
"""
if version not in self.supported_versions:
self.supported_versions[version] = {"actions": []}
action_dict = {"action": action}
if supports:
action_dict["supports"] = supports
self.supported_versions[version]["actions"].append(action_dict)
def remove_supported_action(self, version: str, action: str):
"""
Remove a supported action from a specific version.
Args:
version: AlpineBits version
action: Action name to remove
"""
if version in self.supported_versions:
actions = self.supported_versions[version]["actions"]
self.supported_versions[version]["actions"] = [
a for a in actions if a["action"] != action
]
def is_action_supported(self, action: str, version: str = None) -> bool:
"""
Check if an action is supported by the server.
Args:
action: Action name to check
version: Specific version to check (if None, checks all versions)
Returns:
True if action is supported, False otherwise
"""
if version:
if version not in self.supported_versions:
return False
actions = self.supported_versions[version]["actions"]
return any(a["action"] == action for a in actions)
# Check all versions
for ver_data in self.supported_versions.values():
if any(a["action"] == action for a in ver_data["actions"]):
return True
return False
def get_capabilities_json(self) -> str:
"""
Get the server capabilities as JSON string for handshake responses.
Returns:
JSON string containing supported versions and actions
"""
capabilities = {
"versions": [
{
"version": version,
"actions": version_data["actions"]
}
for version, version_data in self.supported_versions.items()
]
}
return json.dumps(capabilities, indent=4)
async def handle_request(self, action: str, request_xml: str) -> str:
"""
Handle an incoming AlpineBits request.
Args:
action: The action to perform (e.g., "action_OTA_Ping")
request_xml: The XML request body
Returns:
XML response string
Raises:
ValueError: If the action is not supported
"""
# Parse the action to determine the handler method
if action == "action_OTA_Ping":
return await self.handle_ping(request_xml)
elif action == "action_OTA_Read":
return await self.handle_read(request_xml)
elif action == "action_OTA_HotelResNotif_GuestRequests":
return await self.handle_guest_requests(request_xml)
elif action == "action_OTA_HotelResNotif_GuestRequests_StatusUpdate":
return await self.handle_guest_requests_status_update(request_xml)
else:
# Return error response for unsupported actions
return await self.handle_unsupported_action(action, request_xml)
async def handle_ping(self, request_xml: str) -> str:
"""
Handle OTA_Ping requests (handshake).
Args:
request_xml: The OTA_PingRQ XML request
Returns:
OTA_PingRS XML response with server capabilities
"""
try:
# Parse the incoming request to extract any version information
root = ET.fromstring(request_xml)
version = root.get("Version", "8.000")
# Extract echo data if present
echo_data_elem = root.find(".//{http://www.opentravel.org/OTA/2003/05}EchoData")
echo_data = None
if echo_data_elem is not None and echo_data_elem.text:
echo_data = echo_data_elem.text.strip()
# Get capabilities JSON
capabilities_json = self.get_capabilities_json()
# Create warning with capabilities
warning = OtaPingRs.Warnings.Warning(
type_value="11",
status=WarningStatus.ALPINEBITS_HANDSHAKE,
content=[capabilities_json]
)
warnings = OtaPingRs.Warnings(warning=[warning])
# Create successful ping response
ping_response = OtaPingRs(
version=version,
success=None, # Empty success element
warnings=warnings,
echo_data=echo_data or capabilities_json
)
# Serialize to XML
return self.xml_serializer.render(ping_response, ns_map=self.ns_map)
except Exception as e:
# Return error response if something goes wrong
return await self.create_error_response(
"OTA_PingRS",
"8.000",
f"Failed to process ping request: {str(e)}"
)
async def handle_read(self, request_xml: str) -> str:
"""
Handle OTA_Read requests.
Args:
request_xml: The OTA_ReadRQ XML request
Returns:
XML response (placeholder implementation)
"""
# Placeholder implementation - return unsupported for now
return await self.create_error_response(
"OTA_ReadRS",
"8.000",
"OTA_Read action not yet implemented"
)
async def handle_guest_requests(self, request_xml: str) -> str:
"""
Handle guest request notifications.
Args:
request_xml: The guest request XML
Returns:
XML response (placeholder implementation)
"""
# Placeholder implementation - return unsupported for now
return await self.create_error_response(
"OTA_HotelResNotifRS",
"8.000",
"Guest requests action not yet implemented"
)
async def handle_guest_requests_status_update(self, request_xml: str) -> str:
"""
Handle guest request status updates.
Args:
request_xml: The status update XML
Returns:
XML response (placeholder implementation)
"""
# Placeholder implementation - return unsupported for now
return await self.create_error_response(
"OTA_HotelResNotifRS",
"8.000",
"Guest request status updates not yet implemented"
)
async def handle_unsupported_action(self, action: str, request_xml: str) -> str:
"""
Handle unsupported actions by returning an appropriate error response.
Args:
action: The unsupported action name
request_xml: The request XML
Returns:
XML error response
"""
return await self.create_error_response(
"OTA_PingRS", # Use generic ping response for unknown actions
"8.000",
f"Action '{action}' is not supported by this server"
)
async def create_error_response(self, response_type: str, version: str, error_message: str) -> str:
"""
Create a generic error response.
Args:
response_type: The response type (e.g., "OTA_PingRS")
version: Version number
error_message: Error message to include
Returns:
XML error response string
"""
# Create a basic error response structure
error_xml = f'''<?xml version="1.0" encoding="UTF-8"?>
<{response_type} xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.opentravel.org/OTA/2003/05"
xsi:schemaLocation="http://www.opentravel.org/OTA/2003/05 {response_type}.xsd"
Version="{version}">
<Errors>
<Error Type="13" Code="450">{error_message}</Error>
</Errors>
</{response_type}>'''
return error_xml
def get_supported_actions(self, version: str = None) -> List[str]:
"""
Get list of supported action names.
Args:
version: Specific version to get actions for (if None, returns all)
Returns:
List of supported action names
"""
actions = set()
if version:
if version in self.supported_versions:
for action_data in self.supported_versions[version]["actions"]:
actions.add(action_data["action"])
else:
for version_data in self.supported_versions.values():
for action_data in version_data["actions"]:
actions.add(action_data["action"])
return sorted(list(actions))
def get_supported_versions(self) -> List[str]:
"""
Get list of supported AlpineBits versions.
Returns:
List of supported version strings
"""
return list(self.supported_versions.keys())
# Example usage and testing
async def main():
"""Example usage of the AlpineBits server."""
server = AlpineBitsServer()
# Add additional supported actions
server.add_supported_action(
"2024-10",
"action_OTA_HotelInvCountNotif",
[
"OTA_HotelInvCountNotif_accept_rooms",
"OTA_HotelInvCountNotif_accept_categories",
"OTA_HotelInvCountNotif_accept_deltas"
]
)
# Example ping request
ping_request = '''<?xml version="1.0" encoding="UTF-8"?>
<OTA_PingRQ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.opentravel.org/OTA/2003/05"
xsi:schemaLocation="http://www.opentravel.org/OTA/2003/05 OTA_PingRQ.xsd"
Version="8.000">
<EchoData>{"test": "handshake request"}</EchoData>
</OTA_PingRQ>'''
# Handle the request
try:
response = await server.handle_request("action_OTA_Ping", ping_request)
print("Response:")
print(response)
# Test unsupported action
print("\n--- Testing unsupported action ---")
unsupported_response = await server.handle_request("action_NOT_SUPPORTED", ping_request)
print("Unsupported action response:")
print(unsupported_response)
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
asyncio.run(main())