This commit is contained in:
Jonas Linter
2025-10-10 10:45:47 +02:00
parent 5b91608577
commit 4ac5a148b6
16 changed files with 42 additions and 450 deletions

4
.gitignore vendored
View File

@@ -27,3 +27,7 @@ secrets.yaml
# ignore db
alpinebits.db
# test output files
test_output.txt
output.xml

View File

@@ -0,0 +1 @@
"""AlpineBits Python Server package."""

View File

@@ -1,7 +0,0 @@
"""Entry point for alpine_bits_python package."""
from .main import main
if __name__ == "__main__":
print("running test main")
main()

View File

@@ -722,10 +722,7 @@ def _process_single_reservation(
if reservation.hotel_code is None:
raise ValueError("Reservation hotel_code is None")
hotel_code = str(reservation.hotel_code)
if reservation.hotel_name is None:
hotel_name = None
else:
hotel_name = str(reservation.hotel_name)
hotel_name = None if reservation.hotel_name is None else str(reservation.hotel_name)
basic_property_info = HotelReservation.ResGlobalInfo.BasicPropertyInfo(
hotel_code=hotel_code,
@@ -881,7 +878,7 @@ def _create_xml_from_db(
try:
ota_res_retrieve_rs.model_validate(ota_res_retrieve_rs.model_dump())
except Exception as e:
_LOGGER.error(f"Validation error: {e}")
_LOGGER.exception(f"Validation error: {e}")
raise
return ota_res_retrieve_rs

View File

@@ -128,7 +128,7 @@ class Version(str, Enum):
class AlpineBitsClientInfo:
"""Wrapper for username, password, client_id"""
"""Wrapper for username, password, client_id."""
def __init__(self, username: str, password: str, client_id: str | None = None):
self.username = username
@@ -212,7 +212,7 @@ class ServerCapabilities:
"""Discover all AlpineBitsAction implementations in the current module."""
current_module = inspect.getmodule(self)
for name, obj in inspect.getmembers(current_module):
for _name, obj in inspect.getmembers(current_module):
if (
inspect.isclass(obj)
and issubclass(obj, AlpineBitsAction)
@@ -230,9 +230,7 @@ class ServerCapabilities:
This is a simple check - in practice, you might want more sophisticated detection.
"""
# Check if the class has overridden the handle method
if "handle" in action_class.__dict__:
return True
return False
return "handle" in action_class.__dict__
def create_capabilities_dict(self) -> None:
"""Generate the capabilities dictionary based on discovered actions."""
@@ -636,7 +634,7 @@ class NotifReportReadAction(AlpineBitsAction):
class PushAction(AlpineBitsAction):
"""Creates the necessary xml for OTA_HotelResNotif:GuestRequests"""
"""Creates the necessary xml for OTA_HotelResNotif:GuestRequests."""
def __init__(self, config: dict = {}):
self.name = AlpineBitsActionName.OTA_HOTEL_RES_NOTIF_GUEST_REQUESTS
@@ -676,7 +674,7 @@ class AlpineBitsServer:
their capabilities, and can respond to handshake requests with its capabilities.
"""
def __init__(self, config: dict = None):
def __init__(self, config: dict | None = None):
self.capabilities = ServerCapabilities()
self._action_instances = {}
self.config = config
@@ -782,7 +780,6 @@ class AlpineBitsServer:
client_info=client_info,
)
except Exception as e:
print(f"Error handling request {request_action_name}: {e!s}")
# print stack trace for debugging
import traceback
@@ -795,7 +792,7 @@ class AlpineBitsServer:
def get_supported_request_names(self) -> list[str]:
"""Get all supported request names (not capability names)."""
request_names = []
for capability_name in self._action_instances.keys():
for capability_name in self._action_instances:
action_enum = AlpineBitsActionName.get_by_capability_name(capability_name)
if action_enum:
request_names.append(action_enum.request_name)

View File

@@ -25,7 +25,7 @@ from .alpinebits_server import (
AlpineBitsServer,
Version,
)
from .auth import generate_api_key, generate_unique_id, validate_api_key
from .auth import generate_unique_id, validate_api_key
from .config_loader import load_config
from .db import Base, get_database_url
from .db import Customer as DBCustomer
@@ -57,7 +57,7 @@ class EventDispatcher:
self.listeners[event_name].append(func)
def register_hotel_listener(self, event_name, hotel_code, func):
"""Register a listener for a specific hotel"""
"""Register a listener for a specific hotel."""
self.hotel_listeners[f"{event_name}:{hotel_code}"].append(func)
async def dispatch(self, event_name, *args, **kwargs):
@@ -65,7 +65,7 @@ class EventDispatcher:
await func(*args, **kwargs)
async def dispatch_for_hotel(self, event_name, hotel_code, *args, **kwargs):
"""Dispatch event only to listeners registered for specific hotel"""
"""Dispatch event only to listeners registered for specific hotel."""
key = f"{event_name}:{hotel_code}"
for func in self.hotel_listeners[key]:
await func(*args, **kwargs)
@@ -162,7 +162,7 @@ async def push_listener(customer: DBCustomer, reservation: DBReservation, hotel)
)
except Exception as e:
_LOGGER.error(f"Push event failed for hotel {hotel['hotel_id']}: {e}")
_LOGGER.exception(f"Push event failed for hotel {hotel['hotel_id']}: {e}")
# Optionally implement retry logic here@asynccontextmanager
@@ -258,7 +258,7 @@ app.add_middleware(
@api_router.get("/")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def root(request: Request):
"""Health check endpoint"""
"""Health check endpoint."""
return {
"message": "Wix Form Handler API is running",
"timestamp": datetime.now().isoformat(),
@@ -275,7 +275,7 @@ async def root(request: Request):
@api_router.get("/health")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def health_check(request: Request):
"""Detailed health check"""
"""Detailed health check."""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
@@ -345,7 +345,7 @@ async def process_wix_form_submission(request: Request, data: dict[str, Any], db
last_name = contact_info.get("name", {}).get("last")
email = contact_info.get("email")
phone_number = contact_info.get("phones", [{}])[0].get("e164Phone")
locale = contact_info.get("locale", "de-de")
contact_info.get("locale", "de-de")
contact_id = contact_info.get("contactId")
name_prefix = data.get("field:anrede")
@@ -377,7 +377,7 @@ async def process_wix_form_submission(request: Request, data: dict[str, Any], db
num_children = int(data.get("field:anzahl_kinder") or 0)
children_ages = []
if num_children > 0:
for k in data.keys():
for k in data:
if k.startswith("field:alter_kind_"):
try:
age = int(data[k])
@@ -548,7 +548,7 @@ async def handle_wix_form(
try:
return await process_wix_form_submission(request, data, db_session)
except Exception as e:
_LOGGER.error("Error in handle_wix_form: %s", e)
_LOGGER.exception("Error in handle_wix_form: %s", e)
# log stacktrace
import traceback
@@ -563,12 +563,13 @@ async def handle_wix_form_test(
request: Request, data: dict[str, Any], db_session=Depends(get_async_session)
):
"""Test endpoint to verify the API is working with raw JSON data.
No authentication required for testing purposes.
"""
try:
return await process_wix_form_submission(request, data, db_session)
except Exception as e:
_LOGGER.error(f"Error in handle_wix_form_test: {e!s}")
_LOGGER.exception(f"Error in handle_wix_form_test: {e!s}")
raise HTTPException(status_code=500, detail="Error processing test data")
@@ -666,30 +667,6 @@ async def handle_xml_upload(
raise HTTPException(status_code=500, detail="Error processing XML upload")
# UNUSED
@api_router.post("/admin/generate-api-key")
@limiter.limit("5/hour") # Very restrictive for admin operations
async def generate_new_api_key(
request: Request, admin_key: str = Depends(validate_api_key)
):
"""Admin endpoint to generate new API keys.
Requires admin API key and is heavily rate limited.
"""
if admin_key != "admin-key":
raise HTTPException(status_code=403, detail="Admin access required")
new_key = generate_api_key()
_LOGGER.info(f"Generated new API key (requested by: {admin_key})")
return {
"status": "success",
"message": "New API key generated",
"api_key": new_key,
"timestamp": datetime.now().isoformat(),
"note": "Store this key securely - it won't be shown again",
}
# TODO Bit sketchy. May need requests-toolkit in the future
def parse_multipart_data(content_type: str, body: bytes) -> dict[str, Any]:
"""Parse multipart/form-data from raw request body.
@@ -861,7 +838,7 @@ async def alpinebits_server_handshake(
# Re-raise HTTP exceptions (auth errors, etc.)
raise
except Exception as e:
_LOGGER.error(f"Error in AlpineBits handshake: {e!s}")
_LOGGER.exception(f"Error in AlpineBits handshake: {e!s}")
raise HTTPException(status_code=500, detail="Internal server error")

View File

@@ -83,29 +83,29 @@ def validate_wix_signature(payload: bytes, signature: str, secret: str) -> bool:
# Compare signatures securely
return secrets.compare_digest(signature, expected_signature)
except Exception as e:
logger.error(f"Error validating signature: {e}")
logger.exception(f"Error validating signature: {e}")
return False
class APIKeyAuth:
"""Simple API key authentication class"""
"""Simple API key authentication class."""
def __init__(self, api_keys: dict):
self.api_keys = api_keys
def authenticate(self, api_key: str) -> str | None:
"""Authenticate an API key and return the key name if valid"""
"""Authenticate an API key and return the key name if valid."""
for key_name, valid_key in self.api_keys.items():
if secrets.compare_digest(api_key, valid_key):
return key_name
return None
def add_key(self, name: str, key: str):
"""Add a new API key"""
"""Add a new API key."""
self.api_keys[name] = key
def remove_key(self, name: str):
"""Remove an API key"""
"""Remove an API key."""
if name in self.api_keys:
del self.api_keys[name]

View File

@@ -585,8 +585,7 @@ class TextTextFormat2(Enum):
class TimeUnitType(Enum):
"""Defines the unit in which the time is expressed (e.g. year, day, hour).
"""
"""Defines the unit in which the time is expressed (e.g. year, day, hour)."""
YEAR = "Year"
MONTH = "Month"

View File

@@ -1,360 +0,0 @@
import asyncio
import json
import logging
import os
from datetime import UTC, date, datetime
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from .alpine_bits_helpers import (
AlpineBitsFactory,
CommentData,
CommentListItemData,
CommentsData,
CustomerData,
GuestCountsFactory,
HotelReservationIdData,
OtaMessageType,
PhoneTechType,
)
from .config_loader import load_config
# DB and config
from .db import Base, get_database_url
from .db import Customer as DBCustomer
from .db import Reservation as DBReservation
from .generated import alpinebits as ab
# Configure logging
logging.basicConfig(level=logging.INFO)
_LOGGER = logging.getLogger(__name__)
async def setup_db(config):
DATABASE_URL = get_database_url(config)
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
# Create tables
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
_LOGGER.info("Database tables checked/created at startup.")
return engine, AsyncSessionLocal
async def main():
print("🚀 Starting AlpineBits XML generation script...")
# Load config (yaml, annotatedyaml)
config = load_config()
# print config for debugging
print("Loaded configuration:")
print(json.dumps(config, indent=2))
# Ensure SQLite DB file exists if using SQLite
db_url = config.get("database", {}).get("url", "")
if db_url.startswith("sqlite+aiosqlite:///"):
db_path = db_url.replace("sqlite+aiosqlite:///", "")
db_path = os.path.abspath(db_path)
db_dir = os.path.dirname(db_path)
if not os.path.exists(db_dir):
os.makedirs(db_dir, exist_ok=True)
# for now we delete the existing DB for clean testing
if os.path.exists(db_path):
os.remove(db_path)
print(f"Deleted existing SQLite DB at {db_path} for clean testing.")
# # Ensure DB schema is created (async)
engine, AsyncSessionLocal = await setup_db(config)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async with AsyncSessionLocal() as db:
# Load data from JSON file
json_path = os.path.join(
os.path.dirname(__file__),
"../../test_data/wix_test_data_20250928_132611.json",
)
with open(json_path, encoding="utf-8") as f:
wix_data = json.load(f)
data = wix_data["data"]["data"]
contact_info = data.get("contact", {})
first_name = contact_info.get("name", {}).get("first")
last_name = contact_info.get("name", {}).get("last")
email = contact_info.get("email")
phone_number = contact_info.get("phones", [{}])[0].get("e164Phone")
locale = contact_info.get("locale", "de-de")
contact_id = contact_info.get("contactId")
name_prefix = data.get("field:anrede")
email_newsletter = data.get("field:form_field_5a7b", "") != "Non selezionato"
address_line = None
city_name = None
postal_code = None
country_code = None
gender = None
birth_date = None
language = data.get("contact", {}).get("locale", "en")[:2]
# Dates
start_date = (
data.get("field:date_picker_a7c8")
or data.get("Anreisedatum")
or data.get("submissions", [{}])[1].get("value")
)
end_date = (
data.get("field:date_picker_7e65")
or data.get("Abreisedatum")
or data.get("submissions", [{}])[2].get("value")
)
# Room/guest info
num_adults = int(data.get("field:number_7cf5") or 2)
num_children = int(data.get("field:anzahl_kinder") or 0)
children_ages = []
if num_children > 0:
for k in data:
if k.startswith("field:alter_kind_"):
try:
age = int(data[k])
children_ages.append(age)
except ValueError:
_LOGGER.warning("Invalid age value for %s: %s", k, data[k])
# UTM and offer
utm_fields = [
("utm_Source", "utm_source"),
("utm_Medium", "utm_medium"),
("utm_Campaign", "utm_campaign"),
("utm_Term", "utm_term"),
("utm_Content", "utm_content"),
]
utm_comment_text = []
for label, field in utm_fields:
val = data.get(f"field:{field}") or data.get(label)
if val:
utm_comment_text.append(f"{label}: {val}")
utm_comment = " | ".join(utm_comment_text) if utm_comment_text else None
offer = data.get("field:angebot_auswaehlen")
# Save all relevant data to DB (including new fields)
db_customer = DBCustomer(
given_name=first_name,
surname=last_name,
contact_id=contact_id,
name_prefix=name_prefix,
email_address=email,
phone=phone_number,
email_newsletter=email_newsletter,
address_line=address_line,
city_name=city_name,
postal_code=postal_code,
country_code=country_code,
gender=gender,
birth_date=birth_date,
language=language,
address_catalog=False,
name_title=None,
)
db.add(db_customer)
await db.commit()
await db.refresh(db_customer)
db_reservation = DBReservation(
customer_id=db_customer.id,
form_id=data.get("submissionId"),
start_date=date.fromisoformat(start_date) if start_date else None,
end_date=date.fromisoformat(end_date) if end_date else None,
num_adults=num_adults,
num_children=num_children,
children_ages=",".join(str(a) for a in children_ages),
offer=offer,
utm_comment=utm_comment,
created_at=datetime.now(UTC),
utm_source=data.get("field:utm_source"),
utm_medium=data.get("field:utm_medium"),
utm_campaign=data.get("field:utm_campaign"),
utm_term=data.get("field:utm_term"),
utm_content=data.get("field:utm_content"),
user_comment=data.get("field:long_answer_3524", ""),
fbclid=data.get("field:fbclid"),
gclid=data.get("field:gclid"),
hotel_code="123",
hotel_name="Frangart Inn",
)
db.add(db_reservation)
await db.commit()
await db.refresh(db_reservation)
# Now read back from DB
customer = await db.get(DBCustomer, db_reservation.customer_id)
reservation = await db.get(DBReservation, db_reservation.id)
# Generate XML from DB data
create_xml_from_db(customer, reservation)
await db.close()
def create_xml_from_db(customer: DBCustomer, reservation: DBReservation):
"""Generate AlpineBits XML from DB customer and reservation data."""
# Prepare data for XML
phone_numbers = [(customer.phone, PhoneTechType.MOBILE)] if customer.phone else []
customer_data = CustomerData(
given_name=customer.given_name,
surname=customer.surname,
name_prefix=customer.name_prefix,
name_title=customer.name_title,
phone_numbers=phone_numbers,
email_address=customer.email_address,
email_newsletter=customer.email_newsletter,
address_line=customer.address_line,
city_name=customer.city_name,
postal_code=customer.postal_code,
country_code=customer.country_code,
address_catalog=customer.address_catalog,
gender=customer.gender,
birth_date=customer.birth_date,
language=customer.language,
)
alpine_bits_factory = AlpineBitsFactory()
res_guests = alpine_bits_factory.create_res_guests(
customer_data, OtaMessageType.RETRIEVE
)
# Guest counts
children_ages = [int(a) for a in reservation.children_ages.split(",") if a]
guest_counts = GuestCountsFactory.create_retrieve_guest_counts(
reservation.num_adults, children_ages
)
# UniqueID
unique_id = ab.OtaResRetrieveRs.ReservationsList.HotelReservation.UniqueId(
type_value=ab.UniqueIdType2.VALUE_14, id=reservation.unique_id
)
# TimeSpan
time_span = ab.OtaResRetrieveRs.ReservationsList.HotelReservation.RoomStays.RoomStay.TimeSpan(
start=reservation.start_date.isoformat() if reservation.start_date else None,
end=reservation.end_date.isoformat() if reservation.end_date else None,
)
room_stay = (
ab.OtaResRetrieveRs.ReservationsList.HotelReservation.RoomStays.RoomStay(
time_span=time_span,
guest_counts=guest_counts,
)
)
room_stays = ab.OtaResRetrieveRs.ReservationsList.HotelReservation.RoomStays(
room_stay=[room_stay],
)
# HotelReservationId
hotel_res_id_data = HotelReservationIdData(
res_id_type="13",
res_id_value=reservation.fbclid or reservation.gclid,
res_id_source=None,
res_id_source_context="99tales",
)
hotel_res_id = alpine_bits_factory.create(
hotel_res_id_data, OtaMessageType.RETRIEVE
)
hotel_res_ids = ab.OtaResRetrieveRs.ReservationsList.HotelReservation.ResGlobalInfo.HotelReservationIds(
hotel_reservation_id=[hotel_res_id]
)
basic_property_info = ab.OtaResRetrieveRs.ReservationsList.HotelReservation.ResGlobalInfo.BasicPropertyInfo(
hotel_code=reservation.hotel_code,
hotel_name=reservation.hotel_name,
)
# Comments
offer_comment = CommentData(
name=ab.CommentName2.ADDITIONAL_INFO,
text="Angebot/Offerta",
list_items=[
CommentListItemData(
value=reservation.offer,
language=customer.language,
list_item="1",
)
],
)
comment = None
if reservation.user_comment:
comment = CommentData(
name=ab.CommentName2.CUSTOMER_COMMENT,
text=reservation.user_comment,
list_items=[
CommentListItemData(
value="Landing page comment",
language=customer.language,
list_item="1",
)
],
)
comments = [offer_comment, comment] if comment else [offer_comment]
comments_data = CommentsData(comments=comments)
comments_xml = alpine_bits_factory.create(comments_data, OtaMessageType.RETRIEVE)
res_global_info = (
ab.OtaResRetrieveRs.ReservationsList.HotelReservation.ResGlobalInfo(
hotel_reservation_ids=hotel_res_ids,
basic_property_info=basic_property_info,
comments=comments_xml,
)
)
hotel_reservation = ab.OtaResRetrieveRs.ReservationsList.HotelReservation(
create_date_time=datetime.now(UTC).isoformat(),
res_status=ab.HotelReservationResStatus.REQUESTED,
room_stay_reservation="true",
unique_id=unique_id,
room_stays=room_stays,
res_guests=res_guests,
res_global_info=res_global_info,
)
reservations_list = ab.OtaResRetrieveRs.ReservationsList(
hotel_reservation=[hotel_reservation]
)
ota_res_retrieve_rs = ab.OtaResRetrieveRs(
version="7.000", success=None, reservations_list=reservations_list
)
# Serialize to XML
try:
ota_res_retrieve_rs.model_validate(ota_res_retrieve_rs.model_dump())
print("✅ Pydantic validation successful!")
from xsdata.formats.dataclass.serializers.config import SerializerConfig
from xsdata_pydantic.bindings import XmlSerializer
config = SerializerConfig(
pretty_print=True, xml_declaration=True, encoding="UTF-8"
)
serializer = XmlSerializer(config=config)
ns_map = {None: "http://www.opentravel.org/OTA/2003/05"}
xml_string = serializer.render(ota_res_retrieve_rs, ns_map=ns_map)
with open("output.xml", "w", encoding="utf-8") as outfile:
outfile.write(xml_string)
print("✅ XML serialization successful!")
print("Generated XML written to output.xml")
print("\n📄 Generated XML:")
print(xml_string)
from xsdata_pydantic.bindings import XmlParser
parser = XmlParser()
with open("output.xml", encoding="utf-8") as infile:
xml_content = infile.read()
parsed_result = parser.from_string(xml_content, ab.OtaResRetrieveRs)
print("✅ Round-trip validation successful!")
print(
f"Parsed reservation status: {parsed_result.reservations_list.hotel_reservation[0].res_status}"
)
except Exception as e:
print(f"❌ Validation/Serialization failed: {e}")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -19,8 +19,7 @@ REDIS_URL = os.getenv("REDIS_URL", None)
def get_remote_address_with_forwarded(request: Request):
"""Get client IP address, considering forwarded headers from proxies/load balancers
"""
"""Get client IP address, considering forwarded headers from proxies/load balancers."""
# Check for forwarded headers (common in production behind proxies)
forwarded_for = request.headers.get("X-Forwarded-For")
if forwarded_for:
@@ -59,7 +58,7 @@ else:
def get_api_key_identifier(request: Request) -> str:
"""Get identifier for rate limiting based on API key if available, otherwise IP
This allows different rate limits per API key
This allows different rate limits per API key.
"""
# Try to get API key from Authorization header
auth_header = request.headers.get("Authorization")
@@ -85,7 +84,7 @@ webhook_limiter = Limiter(
# Custom rate limit exceeded handler
def custom_rate_limit_handler(request: Request, exc: RateLimitExceeded):
"""Custom handler for rate limit exceeded"""
"""Custom handler for rate limit exceeded."""
logger.warning(
f"Rate limit exceeded for {get_remote_address_with_forwarded(request)}: "
f"{exc.detail}"

View File

@@ -1,6 +1,5 @@
#!/usr/bin/env python3
"""Startup script for the Wix Form Handler API
"""
"""Startup script for the Wix Form Handler API."""
import os
@@ -10,7 +9,6 @@ if __name__ == "__main__":
db_path = "alpinebits.db" # Adjust path if needed
if os.path.exists(db_path):
os.remove(db_path)
print(f"Deleted database file: {db_path}")
uvicorn.run(
"alpine_bits_python.api:app",

View File

@@ -1,6 +1,6 @@
from xsdata_pydantic.bindings import XmlParser
from ..generated.alpinebits import OtaPingRs
from alpine_bits_python.generated.alpinebits import OtaPingRs
def main():
@@ -21,15 +21,11 @@ def main():
parsed_result = parser.from_string(xml, OtaPingRs)
print(parsed_result.echo_data)
warning = parsed_result.warnings.warning[0]
parsed_result.warnings.warning[0]
print(warning.type_value)
print(type(warning.content))
print(warning.content[0])
# save json in echo_data to file with indents
output_path = "echo_data_response.json"
@@ -37,7 +33,6 @@ def main():
import json
json.dump(json.loads(parsed_result.echo_data), out_f, indent=4)
print(f"Saved echo_data json to {output_path}")
if __name__ == "__main__":

View File

@@ -1,6 +1,5 @@
#!/usr/bin/env python3
"""Convenience launcher for the Wix Form Handler API
"""
"""Convenience launcher for the Wix Form Handler API."""
import os
import subprocess

View File

@@ -1,6 +1,5 @@
#!/usr/bin/env python3
"""Test the handshake functionality with the real AlpineBits sample file.
"""
"""Test the handshake functionality with the real AlpineBits sample file."""
import asyncio
@@ -8,8 +7,6 @@ from alpine_bits_python.alpinebits_server import AlpineBitsServer
async def main():
print("🔄 Testing AlpineBits Handshake with Sample File")
print("=" * 60)
# Create server instance
server = AlpineBitsServer()
@@ -20,15 +17,12 @@ async def main():
) as f:
ping_request_xml = f.read()
print("📤 Sending handshake request...")
# Handle the ping request
response = await server.handle_request(
await server.handle_request(
"OTA_Ping:Handshaking", ping_request_xml, "2024-10"
)
print(f"\n📥 Response Status: {response.status_code}")
print(f"📄 Response XML:\n{response.xml_content}")
if __name__ == "__main__":

View File

@@ -458,7 +458,7 @@ class TestEdgeCases:
config = SerializerConfig(pretty_print=True)
serializer = XmlSerializer(config=config)
xml_output = serializer.render(
serializer.render(
response, ns_map={None: "http://www.opentravel.org/OTA/2003/05"}
)

View File

@@ -11,8 +11,7 @@ def extract_relevant_sections(xml_string):
# Remove version attribute value, keep only presence
# Use the same XmlParser as AlpineBitsServer
parser = XmlParser()
obj = parser.from_string(xml_string, OtaPingRs)
return obj
return parser.from_string(xml_string, OtaPingRs)
@pytest.mark.asyncio