Linting and formatting

This commit is contained in:
Jonas Linter
2025-10-07 09:46:44 +02:00
parent b4b7a537e1
commit f0945ed431
21 changed files with 930 additions and 945 deletions

View File

@@ -40,3 +40,110 @@ pythonpath = ["src"]
[tool.ruff]
src = ["src", "test"]
[tool.ruff.lint]
select = [
"A001", # Variable {name} is shadowing a Python builtin
"ASYNC210", # Async functions should not call blocking HTTP methods
"ASYNC220", # Async functions should not create subprocesses with blocking methods
"ASYNC221", # Async functions should not run processes with blocking methods
"ASYNC222", # Async functions should not wait on processes with blocking methods
"ASYNC230", # Async functions should not open files with blocking methods like open
"ASYNC251", # Async functions should not call time.sleep
"B002", # Python does not support the unary prefix increment
"B005", # Using .strip() with multi-character strings is misleading
"B007", # Loop control variable {name} not used within loop body
"B014", # Exception handler with duplicate exception
"B015", # Pointless comparison. Did you mean to assign a value? Otherwise, prepend assert or remove it.
"B017", # pytest.raises(BaseException) should be considered evil
"B018", # Found useless attribute access. Either assign it to a variable or remove it.
"B023", # Function definition does not bind loop variable {name}
"B024", # `{name}` is an abstract base class, but it has no abstract methods or properties
"B026", # Star-arg unpacking after a keyword argument is strongly discouraged
"B032", # Possible unintentional type annotation (using :). Did you mean to assign (using =)?
"B035", # Dictionary comprehension uses static key
"B904", # Use raise from to specify exception cause
"B905", # zip() without an explicit strict= parameter
"BLE",
"C", # complexity
"COM818", # Trailing comma on bare tuple prohibited
"D", # docstrings
"DTZ003", # Use datetime.now(tz=) instead of datetime.utcnow()
"DTZ004", # Use datetime.fromtimestamp(ts, tz=) instead of datetime.utcfromtimestamp(ts)
"E", # pycodestyle
"F", # pyflakes/autoflake
"F541", # f-string without any placeholders
"FLY", # flynt
"FURB", # refurb
"G", # flake8-logging-format
"I", # isort
"INP", # flake8-no-pep420
"ISC", # flake8-implicit-str-concat
"ICN001", # import concentions; {name} should be imported as {asname}
"LOG", # flake8-logging
"N804", # First argument of a class method should be named cls
"N805", # First argument of a method should be named self
"N815", # Variable {name} in class scope should not be mixedCase
"PERF", # Perflint
"PGH", # pygrep-hooks
"PIE", # flake8-pie
"PL", # pylint
"PT", # flake8-pytest-style
"PTH", # flake8-pathlib
"PYI", # flake8-pyi
"RET", # flake8-return
"RSE", # flake8-raise
"RUF005", # Consider iterable unpacking instead of concatenation
"RUF006", # Store a reference to the return value of asyncio.create_task
"RUF007", # Prefer itertools.pairwise() over zip() when iterating over successive pairs
"RUF008", # Do not use mutable default values for dataclass attributes
"RUF010", # Use explicit conversion flag
"RUF013", # PEP 484 prohibits implicit Optional
"RUF016", # Slice in indexed access to type {value_type} uses type {index_type} instead of an integer
"RUF017", # Avoid quadratic list summation
"RUF018", # Avoid assignment expressions in assert statements
"RUF019", # Unnecessary key check before dictionary access
"RUF020", # {never_like} | T is equivalent to T
"RUF021", # Parenthesize a and b expressions when chaining and and or together, to make the precedence clear
"RUF022", # Sort __all__
"RUF023", # Sort __slots__
"RUF024", # Do not pass mutable objects as values to dict.fromkeys
"RUF026", # default_factory is a positional-only argument to defaultdict
"RUF030", # print() call in assert statement is likely unintentional
"RUF032", # Decimal() called with float literal argument
"RUF033", # __post_init__ method with argument defaults
"RUF034", # Useless if-else condition
"RUF100", # Unused `noqa` directive
"RUF101", # noqa directives that use redirected rule codes
"RUF200", # Failed to parse pyproject.toml: {message}
"S102", # Use of exec detected
"S103", # bad-file-permissions
"S108", # hardcoded-temp-file
"S306", # suspicious-mktemp-usage
"S307", # suspicious-eval-usage
"S313", # suspicious-xmlc-element-tree-usage
"S314", # suspicious-xml-element-tree-usage
"S315", # suspicious-xml-expat-reader-usage
"S316", # suspicious-xml-expat-builder-usage
"S317", # suspicious-xml-sax-usage
"S318", # suspicious-xml-mini-dom-usage
"S319", # suspicious-xml-pull-dom-usage
"S601", # paramiko-call
"S602", # subprocess-popen-with-shell-equals-true
"S604", # call-with-shell-equals-true
"S608", # hardcoded-sql-expression
"S609", # unix-command-wildcard-injection
"SIM", # flake8-simplify
"SLF", # flake8-self
"SLOT", # flake8-slots
"T100", # Trace found: {name} used
"T20", # flake8-print
"TC", # flake8-type-checking
"TID", # Tidy imports
"TRY", # tryceratops
"UP", # pyupgrade
"UP031", # Use format specifiers instead of percent format
"UP032", # Use f-string instead of `format` call
"W", # pycodestyle
]

View File

@@ -1,24 +1,21 @@
from datetime import datetime, timezone
import logging
import traceback
from typing import Union, Optional, Any, TypeVar
from pydantic import BaseModel, ConfigDict, Field
from dataclasses import dataclass
from datetime import UTC, datetime
from enum import Enum
from typing import Tuple
from typing import Any
from alpine_bits_python.db import Customer, Reservation
# Import the generated classes
from .generated.alpinebits import (
CommentName2,
HotelReservationResStatus,
OtaHotelResNotifRq,
OtaResRetrieveRs,
CommentName2,
ProfileProfileType,
UniqueIdType2,
)
import logging
_LOGGER = logging.getLogger(__name__)
_LOGGER.setLevel(logging.INFO)
@@ -127,11 +124,10 @@ class GuestCountsFactory:
@staticmethod
def create_guest_counts(
adults: int,
kids: Optional[list[int]] = None,
kids: list[int] | None = None,
message_type: OtaMessageType = OtaMessageType.RETRIEVE,
) -> NotifGuestCounts:
"""
Create a GuestCounts object for OtaHotelResNotifRq or OtaResRetrieveRs.
"""Create a GuestCounts object for OtaHotelResNotifRq or OtaResRetrieveRs.
:param adults: Number of adults
:param kids: List of ages for each kid (optional)
:return: GuestCounts instance
@@ -140,19 +136,17 @@ class GuestCountsFactory:
return GuestCountsFactory._create_guest_counts(
adults, kids, RetrieveGuestCounts
)
elif message_type == OtaMessageType.NOTIF:
if message_type == OtaMessageType.NOTIF:
return GuestCountsFactory._create_guest_counts(
adults, kids, NotifGuestCounts
)
else:
raise ValueError(f"Unsupported message type: {message_type}")
raise ValueError(f"Unsupported message type: {message_type}")
@staticmethod
def _create_guest_counts(
adults: int, kids: Optional[list[int]], guest_counts_class: type
adults: int, kids: list[int] | None, guest_counts_class: type
) -> Any:
"""
Internal method to create a GuestCounts object of the specified type.
"""Internal method to create a GuestCounts object of the specified type.
:param adults: Number of adults
:param kids: List of ages for each kid (optional)
:param guest_counts_class: The GuestCounts class to instantiate
@@ -193,7 +187,6 @@ class CustomerFactory:
@staticmethod
def _create_customer(customer_class: type, data: CustomerData) -> Any:
"""Internal method to create a customer of the specified type."""
# Create PersonName
person_name = customer_class.PersonName(
given_name=data.given_name,
@@ -267,7 +260,6 @@ class CustomerFactory:
@staticmethod
def _customer_to_data(customer: Any) -> CustomerData:
"""Internal method to convert any customer type to CustomerData."""
# Extract phone numbers
phone_numbers = []
if customer.telephone:
@@ -420,7 +412,7 @@ class CommentData:
"""Simple data class to hold comment information without nested type constraints."""
name: CommentName2 # Required: "included services", "customer comment", "additional info"
text: Optional[str] = None # Optional text content
text: str | None = None # Optional text content
list_items: list[CommentListItemData] = None # Optional list items
def __post_init__(self):
@@ -459,7 +451,6 @@ class CommentFactory:
data: CommentsData,
) -> Any:
"""Internal method to create comments of the specified type."""
comments_list = []
for comment_data in data.comments:
# Create list items
@@ -498,7 +489,6 @@ class CommentFactory:
@staticmethod
def _comments_to_data(comments: Any) -> CommentsData:
"""Internal method to convert any comments type to CommentsData."""
comments_data_list = []
for comment in comments.comment:
# Extract list items
@@ -549,7 +539,6 @@ class ResGuestFactory:
res_guests_class: type, customer_class: type, customer_data: CustomerData
) -> Any:
"""Internal method to create complete ResGuests structure."""
# Create the customer using the existing CustomerFactory
customer = CustomerFactory._create_customer(customer_class, customer_data)
@@ -572,18 +561,16 @@ class ResGuestFactory:
@staticmethod
def extract_primary_customer(
res_guests: Union[NotifResGuests, RetrieveResGuests],
res_guests: NotifResGuests | RetrieveResGuests,
) -> CustomerData:
"""Extract the primary customer data from a ResGuests structure."""
# Navigate down the nested structure to get the customer
customer = res_guests.res_guest.profiles.profile_info.profile.customer
# Use the existing CustomerFactory conversion method
if isinstance(res_guests, NotifResGuests):
return CustomerFactory.from_notif_customer(customer)
else:
return CustomerFactory.from_retrieve_customer(customer)
return CustomerFactory.from_retrieve_customer(customer)
class AlpineBitsFactory:
@@ -591,11 +578,10 @@ class AlpineBitsFactory:
@staticmethod
def create(
data: Union[CustomerData, HotelReservationIdData, CommentsData],
data: CustomerData | HotelReservationIdData | CommentsData,
message_type: OtaMessageType,
) -> Any:
"""
Create an AlpineBits object based on the data type and message type.
"""Create an AlpineBits object based on the data type and message type.
Args:
data: The data object (CustomerData, HotelReservationIdData, CommentsData, etc.)
@@ -603,36 +589,32 @@ class AlpineBitsFactory:
Returns:
The appropriate AlpineBits object based on the data type and message type
"""
if isinstance(data, CustomerData):
if message_type == OtaMessageType.NOTIF:
return CustomerFactory.create_notif_customer(data)
else:
return CustomerFactory.create_retrieve_customer(data)
return CustomerFactory.create_retrieve_customer(data)
elif isinstance(data, HotelReservationIdData):
if isinstance(data, HotelReservationIdData):
if message_type == OtaMessageType.NOTIF:
return HotelReservationIdFactory.create_notif_hotel_reservation_id(data)
else:
return HotelReservationIdFactory.create_retrieve_hotel_reservation_id(
data
)
return HotelReservationIdFactory.create_retrieve_hotel_reservation_id(
data
)
elif isinstance(data, CommentsData):
if isinstance(data, CommentsData):
if message_type == OtaMessageType.NOTIF:
return CommentFactory.create_notif_comments(data)
else:
return CommentFactory.create_retrieve_comments(data)
return CommentFactory.create_retrieve_comments(data)
else:
raise ValueError(f"Unsupported data type: {type(data)}")
raise ValueError(f"Unsupported data type: {type(data)}")
@staticmethod
def create_res_guests(
customer_data: CustomerData, message_type: OtaMessageType
) -> Union[NotifResGuests, RetrieveResGuests]:
"""
Create a complete ResGuests structure with a primary customer.
) -> NotifResGuests | RetrieveResGuests:
"""Create a complete ResGuests structure with a primary customer.
Args:
customer_data: The customer data
@@ -640,44 +622,44 @@ class AlpineBitsFactory:
Returns:
The appropriate ResGuests object
"""
if message_type == OtaMessageType.NOTIF:
return ResGuestFactory.create_notif_res_guests(customer_data)
else:
return ResGuestFactory.create_retrieve_res_guests(customer_data)
return ResGuestFactory.create_retrieve_res_guests(customer_data)
@staticmethod
def extract_data(
obj: Any,
) -> Union[CustomerData, HotelReservationIdData, CommentsData]:
"""
Extract data from an AlpineBits object back to a simple data class.
) -> CustomerData | HotelReservationIdData | CommentsData:
"""Extract data from an AlpineBits object back to a simple data class.
Args:
obj: The AlpineBits object to extract data from
Returns:
The appropriate data object
"""
# Check if it's a Customer object
if hasattr(obj, "person_name") and hasattr(obj.person_name, "given_name"):
if isinstance(obj, NotifCustomer):
return CustomerFactory.from_notif_customer(obj)
elif isinstance(obj, RetrieveCustomer):
if isinstance(obj, RetrieveCustomer):
return CustomerFactory.from_retrieve_customer(obj)
# Check if it's a HotelReservationId object
elif hasattr(obj, "res_id_type"):
if isinstance(obj, NotifHotelReservationId):
return HotelReservationIdFactory.from_notif_hotel_reservation_id(obj)
elif isinstance(obj, RetrieveHotelReservationId):
if isinstance(obj, RetrieveHotelReservationId):
return HotelReservationIdFactory.from_retrieve_hotel_reservation_id(obj)
# Check if it's a Comments object
elif hasattr(obj, "comment"):
if isinstance(obj, NotifComments):
return CommentFactory.from_notif_comments(obj)
elif isinstance(obj, RetrieveComments):
if isinstance(obj, RetrieveComments):
return CommentFactory.from_retrieve_comments(obj)
# Check if it's a ResGuests object
@@ -688,15 +670,13 @@ class AlpineBitsFactory:
raise ValueError(f"Unsupported object type: {type(obj)}")
def create_res_retrieve_response(list: list[Tuple[Reservation, Customer]]):
def create_res_retrieve_response(list: list[tuple[Reservation, Customer]]):
"""Create RetrievedReservation XML from database entries."""
return _create_xml_from_db(list, OtaMessageType.RETRIEVE)
def create_res_notif_push_message(list: Tuple[Reservation, Customer]):
def create_res_notif_push_message(list: tuple[Reservation, Customer]):
"""Create Reservation Notification XML from database entries."""
return _create_xml_from_db(list, OtaMessageType.NOTIF)
@@ -819,8 +799,7 @@ def _process_single_reservation(
if reservation.hotel_code is None:
raise ValueError("Reservation hotel_code is None")
else:
hotel_code = str(reservation.hotel_code)
hotel_code = str(reservation.hotel_code)
if reservation.hotel_name is None:
hotel_name = None
else:
@@ -897,7 +876,7 @@ def _process_single_reservation(
)
hotel_reservation = HotelReservation(
create_date_time=datetime.now(timezone.utc).isoformat(),
create_date_time=datetime.now(UTC).isoformat(),
res_status=HotelReservationResStatus.REQUESTED,
room_stay_reservation="true",
unique_id=unique_id,
@@ -910,14 +889,13 @@ def _process_single_reservation(
def _create_xml_from_db(
entries: list[Tuple[Reservation, Customer]] | Tuple[Reservation, Customer],
entries: list[tuple[Reservation, Customer]] | tuple[Reservation, Customer],
type: OtaMessageType,
):
"""Create RetrievedReservation XML from database entries.
list of pairs (Reservation, Customer)
"""
reservations_list = []
# if entries isn't a list wrap the element in a list
@@ -957,7 +935,7 @@ def _create_xml_from_db(
raise
return ota_hotel_res_notif_rq
elif type == OtaMessageType.RETRIEVE:
if type == OtaMessageType.RETRIEVE:
retrieved_reservations = OtaResRetrieveRs.ReservationsList(
hotel_reservation=reservations_list
)
@@ -974,8 +952,7 @@ def _create_xml_from_db(
return ota_res_retrieve_rs
else:
raise ValueError(f"Unsupported message type: {type}")
raise ValueError(f"Unsupported message type: {type}")
# Usage examples

View File

@@ -1,47 +1,39 @@
"""
AlpineBits Server for handling hotel data exchange.
"""AlpineBits Server for handling hotel data exchange.
This module provides an asynchronous AlpineBits server that can handle various
OTA (OpenTravel Alliance) actions for hotel data exchange. Currently implements
handshaking functionality with configurable supported actions and capabilities.
"""
import asyncio
from datetime import datetime
from zoneinfo import ZoneInfo
import difflib
import json
import inspect
import json
import logging
import re
from typing import Dict, List, Optional, Any, Union, Tuple, Type, override
from xml.etree import ElementTree as ET
from abc import ABC
from dataclasses import dataclass
from datetime import datetime
from enum import Enum, IntEnum
from typing import Any, Optional, override
from zoneinfo import ZoneInfo
from sqlalchemy import select
from xsdata.formats.dataclass.serializers.config import SerializerConfig
from xsdata_pydantic.bindings import XmlParser, XmlSerializer
from alpine_bits_python.alpine_bits_helpers import (
PhoneTechType,
create_res_notif_push_message,
create_res_retrieve_response,
)
from .db import AckedRequest, Customer, Reservation
from .generated.alpinebits import (
OtaNotifReportRq,
OtaNotifReportRs,
OtaPingRq,
OtaPingRs,
WarningStatus,
OtaReadRq,
WarningStatus,
)
from xsdata_pydantic.bindings import XmlSerializer
from xsdata.formats.dataclass.serializers.config import SerializerConfig
from abc import ABC, abstractmethod
from xsdata_pydantic.bindings import XmlParser
import logging
from .db import AckedRequest, Reservation, Customer
from sqlalchemy import select
from sqlalchemy.orm import joinedload
# Configure logging
logging.basicConfig(level=logging.INFO)
@@ -178,8 +170,7 @@ class AlpineBitsAction(ABC):
dbsession=None,
server_capabilities=None,
) -> AlpineBitsResponse:
"""
Handle the incoming request XML and return response XML.
"""Handle the incoming request XML and return response XML.
Default implementation returns "not implemented" error.
Override this method in subclasses to provide actual functionality.
@@ -191,18 +182,19 @@ class AlpineBitsAction(ABC):
Returns:
AlpineBitsResponse with error or actual response
"""
return_string = f"Error: Action {action} not implemented"
return AlpineBitsResponse(return_string, HttpStatusCode.BAD_REQUEST)
async def check_version_supported(self, version: Version) -> bool:
"""
Check if the action supports the given version.
"""Check if the action supports the given version.
Args:
version: The AlpineBits version to check
Returns:
True if supported, False otherwise
"""
if isinstance(self.version, list):
return version in self.version
@@ -210,12 +202,11 @@ class AlpineBitsAction(ABC):
class ServerCapabilities:
"""
Automatically discovers AlpineBitsAction implementations and generates capabilities.
"""Automatically discovers AlpineBitsAction implementations and generates capabilities.
"""
def __init__(self):
self.action_registry: Dict[AlpineBitsActionName, Type[AlpineBitsAction]] = {}
self.action_registry: dict[AlpineBitsActionName, type[AlpineBitsAction]] = {}
self._discover_actions()
self.capability_dict = None
@@ -236,9 +227,8 @@ class ServerCapabilities:
# Use capability attribute as registry key
self.action_registry[action_instance.name] = obj
def _is_action_implemented(self, action_class: Type[AlpineBitsAction]) -> bool:
"""
Check if an action is actually implemented or just uses the default behavior.
def _is_action_implemented(self, action_class: type[AlpineBitsAction]) -> bool:
"""Check if an action is actually implemented or just uses the default behavior.
This is a simple check - in practice, you might want more sophisticated detection.
"""
# Check if the class has overridden the handle method
@@ -247,8 +237,7 @@ class ServerCapabilities:
return False
def create_capabilities_dict(self) -> None:
"""
Generate the capabilities dictionary based on discovered actions.
"""Generate the capabilities dictionary based on discovered actions.
"""
versions_dict = {}
@@ -298,18 +287,15 @@ class ServerCapabilities:
if action.get("action") != "action_OTA_Ping"
]
return None
def get_capabilities_dict(self) -> Dict:
def get_capabilities_dict(self) -> dict:
"""Get capabilities as a dictionary. Generates if not already created.
"""
Get capabilities as a dictionary. Generates if not already created.
"""
if self.capability_dict is None:
self.create_capabilities_dict()
return self.capability_dict
def get_supported_actions(self) -> List[str]:
def get_supported_actions(self) -> list[str]:
"""Get list of all supported action names."""
return list(self.action_registry.keys())
@@ -320,7 +306,7 @@ class ServerCapabilities:
class PingAction(AlpineBitsAction):
"""Implementation for OTA_Ping action (handshaking)."""
def __init__(self, config: Dict = {}):
def __init__(self, config: dict = {}):
self.name = AlpineBitsActionName.OTA_PING
self.version = [
Version.V2024_10,
@@ -338,10 +324,9 @@ class PingAction(AlpineBitsAction):
server_capabilities: None | ServerCapabilities = None,
) -> AlpineBitsResponse:
"""Handle ping requests."""
if request_xml is None:
return AlpineBitsResponse(
f"Error: Xml Request missing", HttpStatusCode.BAD_REQUEST
"Error: Xml Request missing", HttpStatusCode.BAD_REQUEST
)
if server_capabilities is None:
@@ -356,9 +341,9 @@ class PingAction(AlpineBitsAction):
parsed_request = parser.from_string(request_xml, OtaPingRq)
echo_data_client = json.loads(parsed_request.echo_data)
except Exception as e:
except Exception:
return AlpineBitsResponse(
f"Error: Invalid XML request", HttpStatusCode.BAD_REQUEST
"Error: Invalid XML request", HttpStatusCode.BAD_REQUEST
)
# compare echo data with capabilities, create a dictionary containing the matching capabilities
@@ -441,7 +426,7 @@ def strip_control_chars(s):
def validate_hotel_authentication(
username: str, password: str, hotelid: str, config: Dict
username: str, password: str, hotelid: str, config: dict
) -> bool:
"""Validate hotel authentication based on username, password, and hotel ID.
@@ -452,7 +437,6 @@ def validate_hotel_authentication(
username: "alice"
password: !secret ALICE_PASSWORD
"""
if not config or "alpine_bits_auth" not in config:
return False
auth_list = config["alpine_bits_auth"]
@@ -471,7 +455,7 @@ def validate_hotel_authentication(
class ReadAction(AlpineBitsAction):
"""Implementation for OTA_Read action."""
def __init__(self, config: Dict = {}):
def __init__(self, config: dict = {}):
self.name = AlpineBitsActionName.OTA_READ
self.version = [Version.V2024_10, Version.V2022_10]
self.config = config
@@ -486,7 +470,6 @@ class ReadAction(AlpineBitsAction):
server_capabilities=None,
) -> AlpineBitsResponse:
"""Handle read requests."""
clean_action = strip_control_chars(str(action)).strip()
clean_expected = strip_control_chars(self.name.value[1]).strip()
@@ -513,7 +496,7 @@ class ReadAction(AlpineBitsAction):
if hotelid is None:
return AlpineBitsResponse(
f"Error: Unauthorized Read Request. No target hotel specified. Check credentials",
"Error: Unauthorized Read Request. No target hotel specified. Check credentials",
HttpStatusCode.UNAUTHORIZED,
)
@@ -541,18 +524,17 @@ class ReadAction(AlpineBitsAction):
)
if start_date:
stmt = stmt.filter(Reservation.start_date >= start_date)
else:
# remove reservations that have been acknowledged via client_id
if client_info.client_id:
subquery = (
select(Reservation.id)
.join(
AckedRequest,
AckedRequest.unique_id == Reservation.unique_id,
)
.filter(AckedRequest.client_id == client_info.client_id)
# remove reservations that have been acknowledged via client_id
elif client_info.client_id:
subquery = (
select(Reservation.id)
.join(
AckedRequest,
AckedRequest.unique_id == Reservation.unique_id,
)
stmt = stmt.filter(~Reservation.id.in_(subquery))
.filter(AckedRequest.client_id == client_info.client_id)
)
stmt = stmt.filter(~Reservation.id.in_(subquery))
result = await dbsession.execute(stmt)
reservation_customer_pairs: list[tuple[Reservation, Customer]] = (
@@ -583,7 +565,7 @@ class ReadAction(AlpineBitsAction):
class NotifReportReadAction(AlpineBitsAction):
"""Necessary for read action to follow specification. Clients need to report acknowledgements"""
def __init__(self, config: Dict = {}):
def __init__(self, config: dict = {}):
self.name = AlpineBitsActionName.OTA_HOTEL_NOTIF_REPORT
self.version = [Version.V2024_10, Version.V2022_10]
self.config = config
@@ -598,7 +580,6 @@ class NotifReportReadAction(AlpineBitsAction):
server_capabilities=None,
) -> AlpineBitsResponse:
"""Handle read requests."""
notif_report = XmlParser().from_string(request_xml, OtaNotifReportRq)
# we can't check hotel auth here, because this action does not contain hotel info
@@ -621,34 +602,29 @@ class NotifReportReadAction(AlpineBitsAction):
success_message, ns_map={None: "http://www.opentravel.org/OTA/2003/05"}
)
if warnings is None and notif_report_details is None:
return AlpineBitsResponse(
response_xml, HttpStatusCode.OK
) # Nothing to process
elif (
if (warnings is None and notif_report_details is None) or (
notif_report_details is not None
and notif_report_details.hotel_notif_report is None
):
return AlpineBitsResponse(
response_xml, HttpStatusCode.OK
) # Nothing to process
else:
if dbsession is None:
return AlpineBitsResponse(
"Error: Something went wrong", HttpStatusCode.INTERNAL_SERVER_ERROR
)
if dbsession is None:
return AlpineBitsResponse(
"Error: Something went wrong", HttpStatusCode.INTERNAL_SERVER_ERROR
)
timestamp = datetime.now(ZoneInfo("UTC"))
for entry in notif_report_details.hotel_notif_report.hotel_reservations.hotel_reservation: # type: ignore
unique_id = entry.unique_id.id
acked_request = AckedRequest(
unique_id=unique_id,
client_id=client_info.client_id,
timestamp=timestamp,
)
dbsession.add(acked_request)
timestamp = datetime.now(ZoneInfo("UTC"))
for entry in notif_report_details.hotel_notif_report.hotel_reservations.hotel_reservation: # type: ignore
unique_id = entry.unique_id.id
acked_request = AckedRequest(
unique_id=unique_id,
client_id=client_info.client_id,
timestamp=timestamp,
)
dbsession.add(acked_request)
await dbsession.commit()
await dbsession.commit()
return AlpineBitsResponse(response_xml, HttpStatusCode.OK)
@@ -656,7 +632,7 @@ class NotifReportReadAction(AlpineBitsAction):
class PushAction(AlpineBitsAction):
"""Creates the necessary xml for OTA_HotelResNotif:GuestRequests"""
def __init__(self, config: Dict = {}):
def __init__(self, config: dict = {}):
self.name = AlpineBitsActionName.OTA_HOTEL_RES_NOTIF_GUEST_REQUESTS
self.version = [Version.V2024_10, Version.V2022_10]
self.config = config
@@ -664,14 +640,13 @@ class PushAction(AlpineBitsAction):
async def handle(
self,
action: str,
request_xml: Tuple[Reservation, Customer],
request_xml: tuple[Reservation, Customer],
version: Version,
client_info: AlpineBitsClientInfo,
dbsession=None,
server_capabilities=None,
) -> AlpineBitsResponse:
"""Create push request XML."""
xml_push_request = create_res_notif_push_message(request_xml)
config = SerializerConfig(
@@ -686,15 +661,14 @@ class PushAction(AlpineBitsAction):
class AlpineBitsServer:
"""
Asynchronous AlpineBits server for handling hotel data exchange requests.
"""Asynchronous AlpineBits server for handling hotel data exchange requests.
This server handles various OTA actions and implements the AlpineBits protocol
for hotel data exchange. It maintains a registry of supported actions and
their capabilities, and can respond to handshake requests with its capabilities.
"""
def __init__(self, config: Dict = None):
def __init__(self, config: dict = None):
self.capabilities = ServerCapabilities()
self._action_instances = {}
self.config = config
@@ -706,20 +680,19 @@ class AlpineBitsServer:
_LOGGER.info(f"Initializing action instance for {capability_name}")
self._action_instances[capability_name] = action_class(config=self.config)
def get_capabilities(self) -> Dict:
def get_capabilities(self) -> dict:
"""Get server capabilities."""
return self.capabilities.get_capabilities_dict()
async def handle_request(
self,
request_action_name: str,
request_xml: str | Tuple[Reservation, Customer],
request_xml: str | tuple[Reservation, Customer],
client_info: AlpineBitsClientInfo,
version: str = "2024-10",
dbsession=None,
) -> AlpineBitsResponse:
"""
Handle an incoming AlpineBits request by routing to appropriate action handler.
"""Handle an incoming AlpineBits request by routing to appropriate action handler.
Args:
request_action_name: The action name from the request (e.g., "OTA_Read:GuestRequests")
@@ -728,6 +701,7 @@ class AlpineBitsServer:
Returns:
AlpineBitsResponse with the result
"""
# Convert string version to enum
try:
@@ -774,7 +748,7 @@ class AlpineBitsServer:
action_instance: PushAction
if request_xml is None or not isinstance(request_xml, tuple):
return AlpineBitsResponse(
f"Error: Invalid data for push request",
"Error: Invalid data for push request",
HttpStatusCode.BAD_REQUEST,
)
return await action_instance.handle(
@@ -792,26 +766,25 @@ class AlpineBitsServer:
server_capabilities=self.capabilities,
client_info=client_info,
)
else:
return await action_instance.handle(
action=request_action_name,
request_xml=request_xml,
version=version_enum,
dbsession=dbsession,
client_info=client_info,
)
return await action_instance.handle(
action=request_action_name,
request_xml=request_xml,
version=version_enum,
dbsession=dbsession,
client_info=client_info,
)
except Exception as e:
print(f"Error handling request {request_action_name}: {str(e)}")
print(f"Error handling request {request_action_name}: {e!s}")
# print stack trace for debugging
import traceback
traceback.print_exc()
return AlpineBitsResponse(
f"Error: Internal server error while processing {request_action_name}: {str(e)}",
f"Error: Internal server error while processing {request_action_name}: {e!s}",
HttpStatusCode.INTERNAL_SERVER_ERROR,
)
def get_supported_request_names(self) -> List[str]:
def get_supported_request_names(self) -> list[str]:
"""Get all supported request names (not capability names)."""
request_names = []
for capability_name in self._action_instances.keys():
@@ -823,8 +796,7 @@ class AlpineBitsServer:
def is_action_supported(
self, request_action_name: str, version: str | None = None
) -> bool:
"""
Check if a request action is supported.
"""Check if a request action is supported.
Args:
request_action_name: The request action name (e.g., "OTA_Read:GuestRequests")
@@ -832,6 +804,7 @@ class AlpineBitsServer:
Returns:
True if supported, False otherwise
"""
action_enum = AlpineBitsActionName.get_by_request_name(request_action_name)
if not action_enum:
@@ -848,8 +821,7 @@ class AlpineBitsServer:
# This would need to be async, but for simplicity we'll just check if version exists
if isinstance(action_instance.version, list):
return version_enum in action_instance.version
else:
return action_instance.version == version_enum
return action_instance.version == version_enum
except ValueError:
return False

View File

@@ -1,62 +1,57 @@
from fastapi import (
FastAPI,
HTTPException,
BackgroundTasks,
Request,
Depends,
APIRouter,
Form,
File,
UploadFile,
)
from fastapi.concurrency import asynccontextmanager
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPBasicCredentials, HTTPBasic
from .config_loader import load_config
from fastapi.responses import HTMLResponse, PlainTextResponse, Response
from .models import WixFormSubmission
from datetime import datetime, date, timezone
from .auth import (
generate_unique_id,
validate_api_key,
validate_wix_signature,
generate_api_key,
)
from .rate_limit import (
limiter,
webhook_limiter,
custom_rate_limit_handler,
DEFAULT_RATE_LIMIT,
WEBHOOK_RATE_LIMIT,
BURST_RATE_LIMIT,
)
from slowapi.errors import RateLimitExceeded
import logging
from datetime import datetime
from typing import Dict, Any, Optional, List
import json
import os
import asyncio
import gzip
import xml.etree.ElementTree as ET
import json
import logging
import os
import urllib.parse
from datetime import UTC, date, datetime
from functools import partial
from typing import Any
import httpx
from fastapi import (
APIRouter,
Depends,
FastAPI,
HTTPException,
Request,
)
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, Response
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from slowapi.errors import RateLimitExceeded
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from .alpinebits_server import (
AlpineBitsActionName,
AlpineBitsClientInfo,
AlpineBitsServer,
Version,
AlpineBitsActionName,
)
import urllib.parse
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from functools import partial
import httpx
from .auth import (
generate_api_key,
generate_unique_id,
validate_api_key,
)
from .config_loader import load_config
from .db import (
Base,
Customer as DBCustomer,
Reservation as DBReservation,
get_database_url,
)
from .db import (
Customer as DBCustomer,
)
from .db import (
Reservation as DBReservation,
)
from .rate_limit import (
BURST_RATE_LIMIT,
DEFAULT_RATE_LIMIT,
WEBHOOK_RATE_LIMIT,
custom_rate_limit_handler,
limiter,
webhook_limiter,
)
# Configure logging
logging.basicConfig(level=logging.INFO)
@@ -98,8 +93,7 @@ event_dispatcher = EventDispatcher()
async def push_listener(customer: DBCustomer, reservation: DBReservation, hotel):
"""
Push listener that sends reservation data to hotel's push endpoint.
"""Push listener that sends reservation data to hotel's push endpoint.
Only called for reservations that match this hotel's hotel_id.
"""
push_endpoint = hotel.get("push_endpoint")
@@ -186,7 +180,7 @@ async def lifespan(app: FastAPI):
try:
config = load_config()
except Exception as e:
_LOGGER.error(f"Failed to load config: {str(e)}")
_LOGGER.error(f"Failed to load config: {e!s}")
config = {}
DATABASE_URL = get_database_url(config)
@@ -263,9 +257,8 @@ app.add_middleware(
)
async def process_form_submission(submission_data: Dict[str, Any]) -> None:
"""
Background task to process the form submission.
async def process_form_submission(submission_data: dict[str, Any]) -> None:
"""Background task to process the form submission.
Add your business logic here.
"""
try:
@@ -297,7 +290,7 @@ async def process_form_submission(submission_data: Dict[str, Any]) -> None:
# - Process the data further
except Exception as e:
_LOGGER.error(f"Error processing form submission: {str(e)}")
_LOGGER.error(f"Error processing form submission: {e!s}")
@api_router.get("/")
@@ -332,9 +325,8 @@ async def health_check(request: Request):
# Extracted business logic for handling Wix form submissions
async def process_wix_form_submission(request: Request, data: Dict[str, Any], db):
"""
Shared business logic for handling Wix form submissions (test and production).
async def process_wix_form_submission(request: Request, data: dict[str, Any], db):
"""Shared business logic for handling Wix form submissions (test and production).
"""
timestamp = datetime.now().isoformat()
@@ -485,7 +477,7 @@ async def process_wix_form_submission(request: Request, data: Dict[str, Any], db
num_children=num_children,
children_ages=",".join(str(a) for a in children_ages),
offer=offer,
created_at=datetime.now(timezone.utc),
created_at=datetime.now(UTC),
utm_source=data.get("field:utm_source"),
utm_medium=data.get("field:utm_medium"),
utm_campaign=data.get("field:utm_campaign"),
@@ -532,38 +524,36 @@ async def process_wix_form_submission(request: Request, data: Dict[str, Any], db
@api_router.post("/webhook/wix-form")
@webhook_limiter.limit(WEBHOOK_RATE_LIMIT)
async def handle_wix_form(
request: Request, data: Dict[str, Any], db_session=Depends(get_async_session)
request: Request, data: dict[str, Any], db_session=Depends(get_async_session)
):
"""
Unified endpoint to handle Wix form submissions (test and production).
"""Unified endpoint to handle Wix form submissions (test and production).
No authentication required for this endpoint.
"""
try:
return await process_wix_form_submission(request, data, db_session)
except Exception as e:
_LOGGER.error(f"Error in handle_wix_form: {str(e)}")
_LOGGER.error(f"Error in handle_wix_form: {e!s}")
# log stacktrace
import traceback
traceback_str = traceback.format_exc()
_LOGGER.error(f"Stack trace for handle_wix_form: {traceback_str}")
raise HTTPException(status_code=500, detail=f"Error processing Wix form data")
raise HTTPException(status_code=500, detail="Error processing Wix form data")
@api_router.post("/webhook/wix-form/test")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def handle_wix_form_test(
request: Request, data: Dict[str, Any], db_session=Depends(get_async_session)
request: Request, data: dict[str, Any], db_session=Depends(get_async_session)
):
"""
Test endpoint to verify the API is working with raw JSON data.
"""Test endpoint to verify the API is working with raw JSON data.
No authentication required for testing purposes.
"""
try:
return await process_wix_form_submission(request, data, db_session)
except Exception as e:
_LOGGER.error(f"Error in handle_wix_form_test: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error processing test data")
_LOGGER.error(f"Error in handle_wix_form_test: {e!s}")
raise HTTPException(status_code=500, detail="Error processing test data")
@api_router.post("/admin/generate-api-key")
@@ -571,8 +561,7 @@ async def handle_wix_form_test(
async def generate_new_api_key(
request: Request, admin_key: str = Depends(validate_api_key)
):
"""
Admin endpoint to generate new API keys.
"""Admin endpoint to generate new API keys.
Requires admin API key and is heavily rate limited.
"""
if admin_key != "admin-key":
@@ -593,8 +582,7 @@ async def generate_new_api_key(
async def validate_basic_auth(
credentials: HTTPBasicCredentials = Depends(security_basic),
) -> str:
"""
Validate basic authentication for AlpineBits protocol.
"""Validate basic authentication for AlpineBits protocol.
Returns username if valid, raises HTTPException if not.
"""
# Accept any username/password pair present in config['alpine_bits_auth']
@@ -626,9 +614,8 @@ async def validate_basic_auth(
return credentials.username, credentials.password
def parse_multipart_data(content_type: str, body: bytes) -> Dict[str, Any]:
"""
Parse multipart/form-data from raw request body.
def parse_multipart_data(content_type: str, body: bytes) -> dict[str, Any]:
"""Parse multipart/form-data from raw request body.
This is a simplified parser for the AlpineBits use case.
"""
if "multipart/form-data" not in content_type:
@@ -692,8 +679,7 @@ async def alpinebits_server_handshake(
credentials_tupel: tuple = Depends(validate_basic_auth),
dbsession=Depends(get_async_session),
):
"""
AlpineBits server endpoint implementing the handshake protocol.
"""AlpineBits server endpoint implementing the handshake protocol.
This endpoint handles:
- Protocol version negotiation via X-AlpineBits-ClientProtocolVersion header
@@ -747,10 +733,10 @@ async def alpinebits_server_handshake(
try:
body = gzip.decompress(body)
except Exception as e:
except Exception:
raise HTTPException(
status_code=400,
detail=f"ERROR: Failed to decompress gzip content",
detail="ERROR: Failed to decompress gzip content",
)
# Check content type (after decompression)
@@ -767,10 +753,10 @@ async def alpinebits_server_handshake(
if "multipart/form-data" in content_type:
try:
form_data = parse_multipart_data(content_type, body)
except Exception as e:
except Exception:
raise HTTPException(
status_code=400,
detail=f"ERROR: Failed to parse multipart/form-data",
detail="ERROR: Failed to parse multipart/form-data",
)
elif "application/x-www-form-urlencoded" in content_type:
# Parse as urlencoded
@@ -829,15 +815,14 @@ async def alpinebits_server_handshake(
# Re-raise HTTP exceptions (auth errors, etc.)
raise
except Exception as e:
_LOGGER.error(f"Error in AlpineBits handshake: {str(e)}")
raise HTTPException(status_code=500, detail=f"Internal server error")
_LOGGER.error(f"Error in AlpineBits handshake: {e!s}")
raise HTTPException(status_code=500, detail="Internal server error")
@api_router.get("/admin/stats")
@limiter.limit("10/minute")
async def get_api_stats(request: Request, admin_key: str = Depends(validate_api_key)):
"""
Admin endpoint to get API usage statistics.
"""Admin endpoint to get API usage statistics.
Requires admin API key.
"""
if admin_key != "admin-key":
@@ -862,8 +847,7 @@ app.include_router(api_router)
@app.get("/", response_class=HTMLResponse)
async def landing_page():
"""
Serve the under construction landing page at the root route
"""Serve the under construction landing page at the root route
"""
try:
# Get the path to the HTML file
@@ -871,7 +855,7 @@ async def landing_page():
html_path = os.path.join(os.path.dirname(__file__), "templates", "index.html")
with open(html_path, "r", encoding="utf-8") as f:
with open(html_path, encoding="utf-8") as f:
html_content = f.read()
return HTMLResponse(content=html_content, status_code=200)

View File

@@ -1,13 +1,12 @@
import os
import secrets
from typing import Optional
from fastapi import HTTPException, Security, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import hashlib
import hmac
from datetime import datetime, timedelta
import logging
import os
import secrets
from dotenv import load_dotenv
from fastapi import HTTPException, Security, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
# Load environment variables from .env file
load_dotenv()
@@ -44,8 +43,7 @@ def generate_api_key() -> str:
def validate_api_key(
credentials: HTTPAuthorizationCredentials = Security(security),
) -> str:
"""
Validate API key from Authorization header.
"""Validate API key from Authorization header.
Expected format: Authorization: Bearer your_api_key_here
"""
token = credentials.credentials
@@ -65,8 +63,7 @@ def validate_api_key(
def validate_wix_signature(payload: bytes, signature: str, secret: str) -> bool:
"""
Validate Wix webhook signature for additional security.
"""Validate Wix webhook signature for additional security.
Wix signs their webhooks with HMAC-SHA256.
"""
if not signature or not secret:
@@ -74,8 +71,7 @@ def validate_wix_signature(payload: bytes, signature: str, secret: str) -> bool:
try:
# Remove 'sha256=' prefix if present
if signature.startswith("sha256="):
signature = signature[7:]
signature = signature.removeprefix("sha256=")
# Calculate expected signature
expected_signature = hmac.new(
@@ -95,7 +91,7 @@ class APIKeyAuth:
def __init__(self, api_keys: dict):
self.api_keys = api_keys
def authenticate(self, api_key: str) -> Optional[str]:
def authenticate(self, api_key: str) -> str | None:
"""Authenticate an API key and return the key name if valid"""
for key_name, valid_key in self.api_keys.items():
if secrets.compare_digest(api_key, valid_key):

View File

@@ -1,25 +1,20 @@
import os
from pathlib import Path
from typing import Any, Dict, List
from annotatedyaml.loader import (
HAS_C_LOADER,
JSON_TYPE,
LoaderType,
Secrets,
add_constructor,
)
from annotatedyaml.loader import (
load_yaml as load_annotated_yaml,
load_yaml_dict as load_annotated_yaml_dict,
parse_yaml as parse_annotated_yaml,
secret_yaml as annotated_secret_yaml,
)
from voluptuous import (
Schema,
Required,
PREVENT_EXTRA,
All,
Length,
PREVENT_EXTRA,
MultipleInvalid,
Optional,
Required,
Schema,
)
# --- Voluptuous schemas ---
@@ -101,7 +96,7 @@ class Config:
return self.basic_auth["hotel_name"]
@property
def users(self) -> List[Dict[str, str]]:
def users(self) -> list[dict[str, str]]:
return self.basic_auth["users"]

View File

@@ -1,8 +1,8 @@
from sqlalchemy import Column, Integer, String, Date, Boolean, ForeignKey, DateTime
from sqlalchemy.orm import declarative_base, relationship
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
import os
from sqlalchemy import Boolean, Column, Date, DateTime, ForeignKey, Integer, String
from sqlalchemy.orm import declarative_base, relationship
Base = declarative_base()

View File

@@ -85,6 +85,7 @@ __all__ = [
"CommentName1",
"CommentName2",
"ContactInfoLocation",
"DefSendComplete",
"DescriptionName",
"DescriptionTextFormat1",
"DescriptionTextFormat2",
@@ -103,6 +104,7 @@ __all__ = [
"MealsIncludedMealPlanIndicator",
"MultimediaDescriptionInfoCode1",
"MultimediaDescriptionInfoCode2",
"OccupancyAgeQualifyingCode",
"OtaHotelDescriptiveContentNotifRq",
"OtaHotelDescriptiveContentNotifRs",
"OtaHotelDescriptiveInfoRq",
@@ -123,7 +125,6 @@ __all__ = [
"OtaPingRs",
"OtaReadRq",
"OtaResRetrieveRs",
"OccupancyAgeQualifyingCode",
"PositionAltitudeUnitOfMeasureCode",
"PrerequisiteInventoryInvType",
"ProfileProfileType",
@@ -150,12 +151,11 @@ __all__ = [
"TextTextFormat2",
"TimeUnitType",
"TypeRoomRoomType",
"UrlType",
"UniqueIdInstance",
"UniqueIdType1",
"UniqueIdType2",
"UniqueIdType3",
"UrlType",
"VideoItemCategory",
"WarningStatus",
"DefSendComplete",
]

File diff suppressed because it is too large Load Diff

View File

@@ -1,52 +1,36 @@
import asyncio
import json
import logging
from .alpinebits_guestrequests import ResGuest, RoomStay
from .generated import alpinebits as ab
from io import BytesIO
import sys
from datetime import datetime, timezone, date
import re
from xsdata_pydantic.bindings import XmlSerializer
from .alpine_bits_helpers import (
CustomerData,
GuestCountsFactory,
HotelReservationIdData,
AlpineBitsFactory,
OtaMessageType,
CommentData,
CommentsData,
CommentListItemData,
)
from .generated import alpinebits as ab
from datetime import datetime, timezone
import os
from datetime import UTC, date, datetime
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from .alpine_bits_helpers import (
AlpineBitsFactory,
CommentData,
CommentsData,
CommentListItemData,
CommentsData,
CustomerData,
GuestCountsFactory,
HotelReservationIdData,
PhoneTechType,
AlpineBitsFactory,
OtaMessageType,
PhoneTechType,
)
from .config_loader import load_config
# DB and config
from .db import (
Base,
Customer as DBCustomer,
Reservation as DBReservation,
HashedCustomer,
get_database_url,
)
from .config_loader import load_config
import hashlib
import json
import os
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
import asyncio
from alpine_bits_python import db
from .db import (
Customer as DBCustomer,
)
from .db import (
Reservation as DBReservation,
)
from .generated import alpinebits as ab
# Configure logging
logging.basicConfig(level=logging.INFO)
@@ -101,7 +85,7 @@ async def main():
os.path.dirname(__file__),
"../../test_data/wix_test_data_20250928_132611.json",
)
with open(json_path, "r", encoding="utf-8") as f:
with open(json_path, encoding="utf-8") as f:
wix_data = json.load(f)
data = wix_data["data"]["data"]
@@ -197,7 +181,7 @@ async def main():
children_ages=",".join(str(a) for a in children_ages),
offer=offer,
utm_comment=utm_comment,
created_at=datetime.now(timezone.utc),
created_at=datetime.now(UTC),
utm_source=data.get("field:utm_source"),
utm_medium=data.get("field:utm_medium"),
utm_campaign=data.get("field:utm_campaign"),
@@ -330,7 +314,7 @@ def create_xml_from_db(customer: DBCustomer, reservation: DBReservation):
)
hotel_reservation = ab.OtaResRetrieveRs.ReservationsList.HotelReservation(
create_date_time=datetime.now(timezone.utc).isoformat(),
create_date_time=datetime.now(UTC).isoformat(),
res_status=ab.HotelReservationResStatus.REQUESTED,
room_stay_reservation="true",
unique_id=unique_id,
@@ -361,13 +345,13 @@ def create_xml_from_db(customer: DBCustomer, reservation: DBReservation):
with open("output.xml", "w", encoding="utf-8") as outfile:
outfile.write(xml_string)
print("✅ XML serialization successful!")
print(f"Generated XML written to output.xml")
print("Generated XML written to output.xml")
print("\n📄 Generated XML:")
print(xml_string)
from xsdata_pydantic.bindings import XmlParser
parser = XmlParser()
with open("output.xml", "r", encoding="utf-8") as infile:
with open("output.xml", encoding="utf-8") as infile:
xml_content = infile.read()
parsed_result = parser.from_string(xml_content, ab.OtaResRetrieveRs)
print("✅ Round-trip validation successful!")

View File

@@ -1,6 +1,6 @@
from typing import Dict, List, Any, Optional
from typing import Any
from pydantic import BaseModel, Field
from datetime import datetime
class AlpineBitsHandshakeRequest(BaseModel):
@@ -9,64 +9,64 @@ class AlpineBitsHandshakeRequest(BaseModel):
action: str = Field(
..., description="Action parameter, typically 'OTA_Ping:Handshaking'"
)
request_xml: Optional[str] = Field(None, description="XML request document")
request_xml: str | None = Field(None, description="XML request document")
class ContactName(BaseModel):
"""Contact name structure"""
first: Optional[str] = None
last: Optional[str] = None
first: str | None = None
last: str | None = None
class ContactAddress(BaseModel):
"""Contact address structure"""
street: Optional[str] = None
city: Optional[str] = None
state: Optional[str] = None
country: Optional[str] = None
postalCode: Optional[str] = None
street: str | None = None
city: str | None = None
state: str | None = None
country: str | None = None
postalCode: str | None = None
class Contact(BaseModel):
"""Contact information from Wix form"""
name: Optional[ContactName] = None
email: Optional[str] = None
locale: Optional[str] = None
company: Optional[str] = None
birthdate: Optional[str] = None
labelKeys: Optional[Dict[str, Any]] = None
contactId: Optional[str] = None
address: Optional[ContactAddress] = None
jobTitle: Optional[str] = None
imageUrl: Optional[str] = None
updatedDate: Optional[str] = None
phone: Optional[str] = None
createdDate: Optional[str] = None
name: ContactName | None = None
email: str | None = None
locale: str | None = None
company: str | None = None
birthdate: str | None = None
labelKeys: dict[str, Any] | None = None
contactId: str | None = None
address: ContactAddress | None = None
jobTitle: str | None = None
imageUrl: str | None = None
updatedDate: str | None = None
phone: str | None = None
createdDate: str | None = None
class SubmissionPdf(BaseModel):
"""PDF submission structure"""
url: Optional[str] = None
filename: Optional[str] = None
url: str | None = None
filename: str | None = None
class WixFormSubmission(BaseModel):
"""Model for Wix form submission data"""
formName: str
submissions: List[Dict[str, Any]] = Field(default_factory=list)
submissions: list[dict[str, Any]] = Field(default_factory=list)
submissionTime: str
formFieldMask: List[str] = Field(default_factory=list)
formFieldMask: list[str] = Field(default_factory=list)
submissionId: str
contactId: str
submissionsLink: str
submissionPdf: Optional[SubmissionPdf] = None
submissionPdf: SubmissionPdf | None = None
formId: str
contact: Optional[Contact] = None
contact: Contact | None = None
# Dynamic form fields - these will capture all field:* entries
class Config:

View File

@@ -1,10 +1,11 @@
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from fastapi import Request
import redis
import os
import logging
import os
import redis
from fastapi import Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
from slowapi.util import get_remote_address
logger = logging.getLogger(__name__)
@@ -18,8 +19,7 @@ REDIS_URL = os.getenv("REDIS_URL", None)
def get_remote_address_with_forwarded(request: Request):
"""
Get client IP address, considering forwarded headers from proxies/load balancers
"""Get client IP address, considering forwarded headers from proxies/load balancers
"""
# Check for forwarded headers (common in production behind proxies)
forwarded_for = request.headers.get("X-Forwarded-For")
@@ -58,8 +58,7 @@ else:
def get_api_key_identifier(request: Request) -> str:
"""
Get identifier for rate limiting based on API key if available, otherwise IP
"""Get identifier for rate limiting based on API key if available, otherwise IP
This allows different rate limits per API key
"""
# Try to get API key from Authorization header

View File

@@ -1,11 +1,10 @@
#!/usr/bin/env python3
"""
Startup script for the Wix Form Handler API
"""Startup script for the Wix Form Handler API
"""
import os
import uvicorn
from .api import app
if __name__ == "__main__":
db_path = "alpinebits.db" # Adjust path if needed

View File

@@ -1,11 +1,10 @@
#!/usr/bin/env python3
"""
Configuration and setup script for the Wix Form Handler API
"""Configuration and setup script for the Wix Form Handler API
"""
import os
import sys
import secrets
import sys
# Add parent directory to path to import from src
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@@ -15,7 +14,6 @@ from alpine_bits_python.auth import generate_api_key
def generate_secure_keys():
"""Generate secure API keys for the application"""
print("🔐 Generating Secure API Keys")
print("=" * 50)
@@ -33,7 +31,7 @@ def generate_secure_keys():
print(f"export WIX_API_KEY='{wix_api_key}'")
print(f"export ADMIN_API_KEY='{admin_api_key}'")
print(f"export WIX_WEBHOOK_SECRET='{webhook_secret}'")
print(f"export REDIS_URL='redis://localhost:6379' # Optional for production")
print("export REDIS_URL='redis://localhost:6379' # Optional for production")
print("\n🔧 .env File Content")
print("-" * 20)
@@ -73,7 +71,6 @@ def generate_secure_keys():
def check_security_setup():
"""Check current security configuration"""
print("🔍 Security Configuration Check")
print("=" * 40)
@@ -93,11 +90,10 @@ def check_security_setup():
print("\n🛡️ Security Recommendations:")
if not wix_key:
print(" ❌ Set WIX_API_KEY environment variable")
elif len(wix_key) < 32:
print(" ⚠️ WIX_API_KEY should be longer for better security")
else:
if len(wix_key) < 32:
print(" ⚠️ WIX_API_KEY should be longer for better security")
else:
print(" ✅ WIX_API_KEY looks secure")
print(" ✅ WIX_API_KEY looks secure")
if not admin_key:
print(" ❌ Set ADMIN_API_KEY environment variable")

View File

@@ -1,15 +1,14 @@
#!/usr/bin/env python3
"""
Test script for the Secure Wix Form Handler API
"""Test script for the Secure Wix Form Handler API
"""
import asyncio
import aiohttp
import json
import os
import sys
from datetime import datetime
import aiohttp
# Add parent directory to path to import from src
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@@ -67,7 +66,6 @@ SAMPLE_WIX_DATA = {
async def test_api():
"""Test the API endpoints with authentication"""
headers_with_auth = {
"Content-Type": "application/json",
"Authorization": f"Bearer {TEST_API_KEY}",

View File

@@ -1,6 +1,7 @@
from ..generated.alpinebits import OtaPingRq, OtaPingRs
from xsdata_pydantic.bindings import XmlParser
from ..generated.alpinebits import OtaPingRs
def main():
# test parsing a ping request sample
@@ -9,7 +10,7 @@ def main():
"AlpineBits-HotelData-2024-10/files/samples/Handshake/Handshake-OTA_PingRS.xml"
)
with open(path, "r", encoding="utf-8") as f:
with open(path, encoding="utf-8") as f:
xml = f.read()
# Parse the XML into the request object

View File

@@ -1,6 +1,5 @@
#!/usr/bin/env python3
"""
Convenience launcher for the Wix Form Handler API
"""Convenience launcher for the Wix Form Handler API
"""
import os
@@ -11,4 +10,4 @@ src_dir = os.path.join(os.path.dirname(__file__), "src/alpine_bits_python")
# Run the API using uv
if __name__ == "__main__":
subprocess.run(["uv", "run", "python", os.path.join(src_dir, "run_api.py")])
subprocess.run(["uv", "run", "python", os.path.join(src_dir, "run_api.py")], check=False)

View File

@@ -1,24 +1,21 @@
import pytest
from typing import Union
import sys
import os
import pytest
from alpine_bits_python.alpine_bits_helpers import (
AlpineBitsFactory,
CustomerData,
CustomerFactory,
ResGuestFactory,
HotelReservationIdData,
HotelReservationIdFactory,
AlpineBitsFactory,
PhoneTechType,
OtaMessageType,
NotifCustomer,
RetrieveCustomer,
NotifResGuests,
RetrieveResGuests,
NotifHotelReservationId,
NotifResGuests,
OtaMessageType,
PhoneTechType,
ResGuestFactory,
RetrieveCustomer,
RetrieveHotelReservationId,
RetrieveResGuests,
)

View File

@@ -1,7 +1,2 @@
import pytest
from alpine_bits_python.alpinebits_server import AlpineBitsServer, AlpineBitsClientInfo
from xsdata_pydantic.bindings import XmlParser
from alpine_bits_python.generated.alpinebits import OtaResRetrieveRs, OtaHotelResNotifRq
pass

View File

@@ -1,9 +1,9 @@
import json
import pytest
import asyncio
from alpine_bits_python.alpinebits_server import AlpineBitsServer, AlpineBitsClientInfo
import re
from xsdata_pydantic.bindings import XmlParser
from alpine_bits_python.alpinebits_server import AlpineBitsClientInfo, AlpineBitsServer
from alpine_bits_python.generated.alpinebits import OtaPingRs
@@ -17,14 +17,14 @@ def extract_relevant_sections(xml_string):
@pytest.mark.asyncio
async def test_ping_action_response_matches_expected():
with open("test/test_data/Handshake-OTA_PingRQ.xml", "r", encoding="utf-8") as f:
with open("test/test_data/Handshake-OTA_PingRQ.xml", encoding="utf-8") as f:
server = AlpineBitsServer()
with open(
"test/test_data/Handshake-OTA_PingRQ.xml", "r", encoding="utf-8"
"test/test_data/Handshake-OTA_PingRQ.xml", encoding="utf-8"
) as f:
request_xml = f.read()
with open(
"test/test_data/Handshake-OTA_PingRS.xml", "r", encoding="utf-8"
"test/test_data/Handshake-OTA_PingRS.xml", encoding="utf-8"
) as f:
expected_xml = f.read()
client_info = AlpineBitsClientInfo(username="irrelevant", password="irrelevant")
@@ -56,7 +56,7 @@ async def test_ping_action_response_matches_expected():
@pytest.mark.asyncio
async def test_ping_action_response_success():
server = AlpineBitsServer()
with open("test/test_data/Handshake-OTA_PingRQ.xml", "r", encoding="utf-8") as f:
with open("test/test_data/Handshake-OTA_PingRQ.xml", encoding="utf-8") as f:
request_xml = f.read()
client_info = AlpineBitsClientInfo(username="irrelevant", password="irrelevant")
response = await server.handle_request(
@@ -74,7 +74,7 @@ async def test_ping_action_response_success():
@pytest.mark.asyncio
async def test_ping_action_response_version_arbitrary():
server = AlpineBitsServer()
with open("test/test_data/Handshake-OTA_PingRQ.xml", "r", encoding="utf-8") as f:
with open("test/test_data/Handshake-OTA_PingRQ.xml", encoding="utf-8") as f:
request_xml = f.read()
client_info = AlpineBitsClientInfo(username="irrelevant", password="irrelevant")
response = await server.handle_request(
@@ -91,7 +91,7 @@ async def test_ping_action_response_version_arbitrary():
@pytest.mark.asyncio
async def test_ping_action_response_invalid_action():
server = AlpineBitsServer()
with open("test/test_data/Handshake-OTA_PingRQ.xml", "r", encoding="utf-8") as f:
with open("test/test_data/Handshake-OTA_PingRQ.xml", encoding="utf-8") as f:
request_xml = f.read()
client_info = AlpineBitsClientInfo(username="irrelevant", password="irrelevant")
response = await server.handle_request(

View File

@@ -1,9 +1,9 @@
#!/usr/bin/env python3
"""
Test the handshake functionality with the real AlpineBits sample file.
"""Test the handshake functionality with the real AlpineBits sample file.
"""
import asyncio
from alpine_bits_python.alpinebits_server import AlpineBitsServer
@@ -17,7 +17,6 @@ async def main():
# Read the sample handshake request
with open(
"AlpineBits-HotelData-2024-10/files/samples/Handshake/Handshake-OTA_PingRQ.xml",
"r",
) as f:
ping_request_xml = f.read()