Looking good. Db querying works

This commit is contained in:
Jonas Linter
2025-09-29 15:51:58 +02:00
parent 382bf2334a
commit 54c002ac96
8 changed files with 706 additions and 57 deletions

View File

@@ -3,5 +3,6 @@
"test" "test"
], ],
"python.testing.unittestEnabled": false, "python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true "python.testing.pytestEnabled": true,
"python.analysis.typeCheckingMode": "basic"
} }

View File

@@ -0,0 +1,250 @@
{
"timestamp": "2025-09-29T15:44:11.839852",
"client_ip": "127.0.0.1",
"headers": {
"host": "localhost:8080",
"content-type": "application/json",
"user-agent": "insomnia/2023.5.8",
"accept": "*/*",
"content-length": "6920"
},
"data": {
"data": {
"formName": "Contact us",
"submissions": [
{
"label": "Angebot auswählen",
"value": "Herbstferien - Familienzeit mit Dolomitenblick"
},
{
"label": "Anreisedatum",
"value": "2025-10-31"
},
{
"label": "Abreisedatum",
"value": "2025-11-02"
},
{
"label": "Anzahl Erwachsene",
"value": "2"
},
{
"label": "Anzahl Kinder",
"value": "3"
},
{
"label": "Alter Kind 1",
"value": "3"
},
{
"label": "Alter Kind 2",
"value": "1"
},
{
"label": "Alter Kind 3",
"value": "0"
},
{
"label": "Anrede",
"value": "Frau"
},
{
"label": "Vorname",
"value": "Elena"
},
{
"label": "Nachname",
"value": "Battiloro"
},
{
"label": "Email",
"value": "e.battiloro1@gmail.com"
},
{
"label": "Phone",
"value": "+39 333 767 3262"
},
{
"label": "Einwilligung Marketing",
"value": "Non selezionato"
},
{
"label": "utm_Source",
"value": "ig"
},
{
"label": "utm_Medium",
"value": "Instagram_Stories"
},
{
"label": "utm_Campaign",
"value": "Conversions_Hotel_Bemelmans_ITA"
},
{
"label": "utm_Term",
"value": "Cold_Traffic_Conversions_Hotel_Bemelmans_ITA"
},
{
"label": "utm_Content",
"value": "Grafik_4_Spätsommer_23.08-07.09_Landingpage_ITA"
},
{
"label": "utm_term_id",
"value": "120232007764490196"
},
{
"label": "utm_content_id",
"value": "120232007764490196"
},
{
"label": "gad_source",
"value": ""
},
{
"label": "gad_campaignid",
"value": ""
},
{
"label": "gbraid",
"value": ""
},
{
"label": "gclid",
"value": ""
},
{
"label": "fbclid",
"value": "PAZXh0bgNhZW0BMABhZGlkAasmYBhk4DQBp02L46Rl1jAuccxsOaeFSv7WSFnP-MQCsOrz9yDnKRH4hwZ7GEgxF9gy0_OF_aem_qSvrs6xsBkvTaI_Y9_hfnQ"
}
],
"field:date_picker_7e65": "2025-11-02",
"field:number_7cf5": "2",
"field:utm_source": "ig",
"submissionTime": "2025-09-28T13:26:07.938Z",
"field:alter_kind_3": "3",
"field:gad_source": "",
"field:form_field_5a7b": "Non selezionato",
"field:gad_campaignid": "",
"field:utm_medium": "Instagram_Stories",
"field:utm_term_id": "120232007764490196",
"context": {
"metaSiteId": "1dea821c-8168-4736-96e4-4b92e8b364cf",
"activationId": "3fd865e1-f44a-49d2-ae29-19cf77ee488a"
},
"field:email_5139": "e.battiloro1@gmail.com",
"field:phone_4c77": "+39 333 767 3262",
"_context": {
"activation": {
"id": "3fd865e1-f44a-49d2-ae29-19cf77ee488a"
},
"configuration": {
"id": "a976f18c-fa86-495d-be1e-676df188eeae"
},
"app": {
"id": "225dd912-7dea-4738-8688-4b8c6955ffc2"
},
"action": {
"id": "152db4d7-5263-40c4-be2b-1c81476318b7"
},
"trigger": {
"key": "wix_form_app-form_submitted"
}
},
"field:gclid": "",
"formFieldMask": [
"field:angebot_auswaehlen",
"field:date_picker_a7c8",
"field:date_picker_7e65",
"field:number_7cf5",
"field:anzahl_kinder",
"field:alter_kind_3",
"field:alter_kind_25",
"field:alter_kind_4",
"field:alter_kind_5",
"field:alter_kind_6",
"field:alter_kind_7",
"field:alter_kind_8",
"field:alter_kind_9",
"field:alter_kind_10",
"field:alter_kind_11",
"field:anrede",
"field:first_name_abae",
"field:last_name_d97c",
"field:email_5139",
"field:phone_4c77",
"field:long_answer_3524",
"field:form_field_5a7b",
"field:utm_source",
"field:utm_medium",
"field:utm_campaign",
"field:utm_term",
"field:utm_content",
"field:utm_term_id",
"field:utm_content_id",
"field:gad_source",
"field:gad_campaignid",
"field:gbraid",
"field:gclid",
"field:fbclid",
"metaSiteId"
],
"field:alter_kind_4": "0",
"contact": {
"name": {
"first": "Elena",
"last": "Battiloro"
},
"email": "e.battiloro1@gmail.com",
"locale": "it-it",
"phones": [
{
"tag": "UNTAGGED",
"formattedPhone": "+39 333 767 3262",
"id": "7e5c8512-b88e-4cf0-8d0c-9ebe6b210924",
"countryCode": "IT",
"e164Phone": "+393337673262",
"primary": true,
"phone": "333 767 3262"
}
],
"contactId": "b9d47825-9f84-4ae7-873c-d169851b5888",
"emails": [
{
"id": "c5609c67-5eba-4068-ab21-8a2ab9a09a27",
"tag": "UNTAGGED",
"email": "e.battiloro1@gmail.com",
"primary": true
}
],
"updatedDate": "2025-09-28T13:26:09.916Z",
"phone": "+393337673262",
"createdDate": "2025-08-08T13:05:23.733Z"
},
"submissionId": "02fbc71c-745b-4c73-9cba-827d0958117a",
"field:anzahl_kinder": "3",
"field:alter_kind_25": "1",
"field:first_name_abae": "Elena",
"field:utm_content_id": "120232007764490196",
"field:utm_campaign": "Conversions_Hotel_Bemelmans_ITA",
"field:utm_term": "Cold_Traffic_Conversions_Hotel_Bemelmans_ITA",
"contactId": "b9d47825-9f84-4ae7-873c-d169851b5888",
"field:date_picker_a7c8": "2025-10-31",
"field:angebot_auswaehlen": "Herbstferien - Familienzeit mit Dolomitenblick",
"field:utm_content": "Grafik_4_Spätsommer_23.08-07.09_Landingpage_ITA",
"field:last_name_d97c": "Battiloro",
"submissionsLink": "https://manage.wix.app/forms/submissions/1dea821c-8168-4736-96e4-4b92e8b364cf/e084006b-ae83-4e4d-b2f5-074118cdb3b1?d=https%3A%2F%2Fmanage.wix.com%2Fdashboard%2F1dea821c-8168-4736-96e4-4b92e8b364cf%2Fwix-forms%2Fform%2Fe084006b-ae83-4e4d-b2f5-074118cdb3b1%2Fsubmissions&s=true",
"field:gbraid": "",
"field:fbclid": "PAZXh0bgNhZW0BMABhZGlkAasmYBhk4DQBp02L46Rl1jAuccxsOaeFSv7WSFnP-MQCsOrz9yDnKRH4hwZ7GEgxF9gy0_OF_aem_qSvrs6xsBkvTaI_Y9_hfnQ",
"field:anrede": "Frau",
"formId": "e084006b-ae83-4e4d-b2f5-074118cdb3b1"
}
},
"origin_header": null,
"all_headers": {
"host": "localhost:8080",
"content-type": "application/json",
"user-agent": "insomnia/2023.5.8",
"accept": "*/*",
"content-length": "6920"
}
}

View File

@@ -0,0 +1,250 @@
{
"timestamp": "2025-09-29T15:44:54.746579",
"client_ip": "127.0.0.1",
"headers": {
"host": "localhost:8080",
"content-type": "application/json",
"user-agent": "insomnia/2023.5.8",
"accept": "*/*",
"content-length": "6920"
},
"data": {
"data": {
"formName": "Contact us",
"submissions": [
{
"label": "Angebot auswählen",
"value": "Herbstferien - Familienzeit mit Dolomitenblick"
},
{
"label": "Anreisedatum",
"value": "2025-10-31"
},
{
"label": "Abreisedatum",
"value": "2025-11-02"
},
{
"label": "Anzahl Erwachsene",
"value": "2"
},
{
"label": "Anzahl Kinder",
"value": "3"
},
{
"label": "Alter Kind 1",
"value": "3"
},
{
"label": "Alter Kind 2",
"value": "1"
},
{
"label": "Alter Kind 3",
"value": "0"
},
{
"label": "Anrede",
"value": "Frau"
},
{
"label": "Vorname",
"value": "Elena"
},
{
"label": "Nachname",
"value": "Battiloro"
},
{
"label": "Email",
"value": "e.battiloro1@gmail.com"
},
{
"label": "Phone",
"value": "+39 333 767 3262"
},
{
"label": "Einwilligung Marketing",
"value": "Non selezionato"
},
{
"label": "utm_Source",
"value": "ig"
},
{
"label": "utm_Medium",
"value": "Instagram_Stories"
},
{
"label": "utm_Campaign",
"value": "Conversions_Hotel_Bemelmans_ITA"
},
{
"label": "utm_Term",
"value": "Cold_Traffic_Conversions_Hotel_Bemelmans_ITA"
},
{
"label": "utm_Content",
"value": "Grafik_4_Spätsommer_23.08-07.09_Landingpage_ITA"
},
{
"label": "utm_term_id",
"value": "120232007764490196"
},
{
"label": "utm_content_id",
"value": "120232007764490196"
},
{
"label": "gad_source",
"value": ""
},
{
"label": "gad_campaignid",
"value": ""
},
{
"label": "gbraid",
"value": ""
},
{
"label": "gclid",
"value": ""
},
{
"label": "fbclid",
"value": "PAZXh0bgNhZW0BMABhZGlkAasmYBhk4DQBp02L46Rl1jAuccxsOaeFSv7WSFnP-MQCsOrz9yDnKRH4hwZ7GEgxF9gy0_OF_aem_qSvrs6xsBkvTaI_Y9_hfnQ"
}
],
"field:date_picker_7e65": "2025-11-02",
"field:number_7cf5": "2",
"field:utm_source": "ig",
"submissionTime": "2025-09-28T13:26:07.938Z",
"field:alter_kind_3": "3",
"field:gad_source": "",
"field:form_field_5a7b": "Non selezionato",
"field:gad_campaignid": "",
"field:utm_medium": "Instagram_Stories",
"field:utm_term_id": "120232007764490196",
"context": {
"metaSiteId": "1dea821c-8168-4736-96e4-4b92e8b364cf",
"activationId": "3fd865e1-f44a-49d2-ae29-19cf77ee488a"
},
"field:email_5139": "e.battiloro1@gmail.com",
"field:phone_4c77": "+39 333 767 3262",
"_context": {
"activation": {
"id": "3fd865e1-f44a-49d2-ae29-19cf77ee488a"
},
"configuration": {
"id": "a976f18c-fa86-495d-be1e-676df188eeae"
},
"app": {
"id": "225dd912-7dea-4738-8688-4b8c6955ffc2"
},
"action": {
"id": "152db4d7-5263-40c4-be2b-1c81476318b7"
},
"trigger": {
"key": "wix_form_app-form_submitted"
}
},
"field:gclid": "",
"formFieldMask": [
"field:angebot_auswaehlen",
"field:date_picker_a7c8",
"field:date_picker_7e65",
"field:number_7cf5",
"field:anzahl_kinder",
"field:alter_kind_3",
"field:alter_kind_25",
"field:alter_kind_4",
"field:alter_kind_5",
"field:alter_kind_6",
"field:alter_kind_7",
"field:alter_kind_8",
"field:alter_kind_9",
"field:alter_kind_10",
"field:alter_kind_11",
"field:anrede",
"field:first_name_abae",
"field:last_name_d97c",
"field:email_5139",
"field:phone_4c77",
"field:long_answer_3524",
"field:form_field_5a7b",
"field:utm_source",
"field:utm_medium",
"field:utm_campaign",
"field:utm_term",
"field:utm_content",
"field:utm_term_id",
"field:utm_content_id",
"field:gad_source",
"field:gad_campaignid",
"field:gbraid",
"field:gclid",
"field:fbclid",
"metaSiteId"
],
"field:alter_kind_4": "0",
"contact": {
"name": {
"first": "Elena",
"last": "Battiloro"
},
"email": "e.battiloro1@gmail.com",
"locale": "it-it",
"phones": [
{
"tag": "UNTAGGED",
"formattedPhone": "+39 333 767 3262",
"id": "7e5c8512-b88e-4cf0-8d0c-9ebe6b210924",
"countryCode": "IT",
"e164Phone": "+393337673262",
"primary": true,
"phone": "333 767 3262"
}
],
"contactId": "b9d47825-9f84-4ae7-873c-d169851b5888",
"emails": [
{
"id": "c5609c67-5eba-4068-ab21-8a2ab9a09a27",
"tag": "UNTAGGED",
"email": "e.battiloro1@gmail.com",
"primary": true
}
],
"updatedDate": "2025-09-28T13:26:09.916Z",
"phone": "+393337673262",
"createdDate": "2025-08-08T13:05:23.733Z"
},
"submissionId": "02fbc71c-745b-4c73-9cba-827d0958117a",
"field:anzahl_kinder": "3",
"field:alter_kind_25": "1",
"field:first_name_abae": "Elena",
"field:utm_content_id": "120232007764490196",
"field:utm_campaign": "Conversions_Hotel_Bemelmans_ITA",
"field:utm_term": "Cold_Traffic_Conversions_Hotel_Bemelmans_ITA",
"contactId": "b9d47825-9f84-4ae7-873c-d169851b5888",
"field:date_picker_a7c8": "2025-10-31",
"field:angebot_auswaehlen": "Herbstferien - Familienzeit mit Dolomitenblick",
"field:utm_content": "Grafik_4_Spätsommer_23.08-07.09_Landingpage_ITA",
"field:last_name_d97c": "Battiloro",
"submissionsLink": "https://manage.wix.app/forms/submissions/1dea821c-8168-4736-96e4-4b92e8b364cf/e084006b-ae83-4e4d-b2f5-074118cdb3b1?d=https%3A%2F%2Fmanage.wix.com%2Fdashboard%2F1dea821c-8168-4736-96e4-4b92e8b364cf%2Fwix-forms%2Fform%2Fe084006b-ae83-4e4d-b2f5-074118cdb3b1%2Fsubmissions&s=true",
"field:gbraid": "",
"field:fbclid": "PAZXh0bgNhZW0BMABhZGlkAasmYBhk4DQBp02L46Rl1jAuccxsOaeFSv7WSFnP-MQCsOrz9yDnKRH4hwZ7GEgxF9gy0_OF_aem_qSvrs6xsBkvTaI_Y9_hfnQ",
"field:anrede": "Frau",
"formId": "e084006b-ae83-4e4d-b2f5-074118cdb3b1"
}
},
"origin_header": null,
"all_headers": {
"host": "localhost:8080",
"content-type": "application/json",
"user-agent": "insomnia/2023.5.8",
"accept": "*/*",
"content-length": "6920"
}
}

View File

@@ -648,6 +648,10 @@ class AlpineBitsFactory:
else: else:
raise ValueError(f"Unsupported object type: {type(obj)}") raise ValueError(f"Unsupported object type: {type(obj)}")
# Usage examples # Usage examples

View File

@@ -7,18 +7,31 @@ handshaking functionality with configurable supported actions and capabilities.
""" """
import asyncio import asyncio
from datetime import datetime
import difflib
import json import json
import inspect import inspect
import re
from typing import Dict, List, Optional, Any, Union, Tuple, Type from typing import Dict, List, Optional, Any, Union, Tuple, Type
from xml.etree import ElementTree as ET from xml.etree import ElementTree as ET
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum, IntEnum from enum import Enum, IntEnum
from .generated.alpinebits import OtaPingRq, OtaPingRs, WarningStatus from .generated.alpinebits import OtaPingRq, OtaPingRs, WarningStatus, OtaReadRq
from xsdata_pydantic.bindings import XmlSerializer from xsdata_pydantic.bindings import XmlSerializer
from xsdata.formats.dataclass.serializers.config import SerializerConfig from xsdata.formats.dataclass.serializers.config import SerializerConfig
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from xsdata_pydantic.bindings import XmlParser from xsdata_pydantic.bindings import XmlParser
import logging
from .db import Reservation, Customer
from sqlalchemy import select
from sqlalchemy.orm import joinedload
# Configure logging
logging.basicConfig(level=logging.INFO)
_LOGGER = logging.getLogger(__name__)
class HttpStatusCode(IntEnum): class HttpStatusCode(IntEnum):
@@ -122,7 +135,7 @@ class AlpineBitsAction(ABC):
) # list of versions in case action supports multiple versions ) # list of versions in case action supports multiple versions
async def handle( async def handle(
self, action: str, request_xml: str, version: Version self, action: str, request_xml: str, version: Version, dbsession=None, server_capabilities=None, username=None, password=None, config: Dict = None
) -> AlpineBitsResponse: ) -> AlpineBitsResponse:
""" """
Handle the incoming request XML and return response XML. Handle the incoming request XML and return response XML.
@@ -251,12 +264,13 @@ class ServerCapabilities:
class PingAction(AlpineBitsAction): class PingAction(AlpineBitsAction):
"""Implementation for OTA_Ping action (handshaking).""" """Implementation for OTA_Ping action (handshaking)."""
def __init__(self): def __init__(self, config: Dict = None):
self.name = AlpineBitsActionName.OTA_PING self.name = AlpineBitsActionName.OTA_PING
self.version = [ self.version = [
Version.V2024_10, Version.V2024_10,
Version.V2022_10, Version.V2022_10,
] # Supports multiple versions ] # Supports multiple versions
self.config = config
async def handle( async def handle(
self, self,
@@ -291,6 +305,8 @@ class PingAction(AlpineBitsAction):
# compare echo data with capabilities, create a dictionary containing the matching capabilities # compare echo data with capabilities, create a dictionary containing the matching capabilities
capabilities_dict = server_capabilities.get_capabilities_dict() capabilities_dict = server_capabilities.get_capabilities_dict()
_LOGGER.info(f"Capabilities Dict: {capabilities_dict}")
matching_capabilities = {"versions": []} matching_capabilities = {"versions": []}
# Iterate through client's requested versions # Iterate through client's requested versions
@@ -339,10 +355,12 @@ class PingAction(AlpineBitsAction):
warning_response = OtaPingRs.Warnings(warning=[warning]) warning_response = OtaPingRs.Warnings(warning=[warning])
all_capabilities = server_capabilities.get_capabilities_json()
response_ota_ping = OtaPingRs( response_ota_ping = OtaPingRs(
version="7.000", version="7.000",
warnings=warning_response, warnings=warning_response,
echo_data=capabilities_json, echo_data=all_capabilities,
success="", success="",
) )
@@ -357,51 +375,164 @@ class PingAction(AlpineBitsAction):
) )
return AlpineBitsResponse(response_xml, HttpStatusCode.OK) return AlpineBitsResponse(response_xml, HttpStatusCode.OK)
def strip_control_chars(s):
# Remove all control characters (ASCII < 32 and DEL)
return re.sub(r'[\x00-\x1F\x7F]', '', s)
def validate_hotel_authentication(username: str, password: str, hotelid: str, config: Dict) -> bool:
""" Validate hotel authentication based on username, password, and hotel ID.
Example config
alpine_bits_auth:
- hotel_id: "123"
hotel_name: "Frangart Inn"
username: "alice"
password: !secret ALICE_PASSWORD
"""
if not config or "alpine_bits_auth" not in config:
return False
auth_list = config["alpine_bits_auth"]
for auth in auth_list:
if (
auth.get("hotel_id") == hotelid
and auth.get("username") == username
and auth.get("password") == password
):
return True
return False
# look for hotelid in config
class ReadAction(AlpineBitsAction): class ReadAction(AlpineBitsAction):
"""Implementation for OTA_Read action.""" """Implementation for OTA_Read action."""
def __init__(self): def __init__(self, config: Dict = None):
self.name = AlpineBitsActionName.OTA_READ self.name = AlpineBitsActionName.OTA_READ
self.version = [Version.V2024_10, Version.V2022_10] self.version = [Version.V2024_10, Version.V2022_10]
self.config = config
async def handle( async def handle(
self, action: str, request_xml: str, version: Version self, action: str, request_xml: str, version: Version, dbsession=None, username=None, password=None
) -> AlpineBitsResponse: ) -> AlpineBitsResponse:
"""Handle read requests.""" """Handle read requests."""
response_xml = f"""<?xml version="1.0" encoding="UTF-8"?>
<OTA_ReadRS xmlns="http://www.opentravel.org/OTA/2003/05" Version="8.000"> clean_action = strip_control_chars(str(action)).strip()
<Success/> clean_expected = strip_control_chars(self.name.value[1]).strip()
<Data>Read operation successful for {version.value}</Data>
</OTA_ReadRS>""" if clean_action != clean_expected:
return AlpineBitsResponse(response_xml, HttpStatusCode.OK)
return AlpineBitsResponse(
f"Error: Invalid action {action}, expected {self.name.value[1]}", HttpStatusCode.BAD_REQUEST
)
if dbsession is None:
return AlpineBitsResponse(
"Error: Something went wrong", HttpStatusCode.INTERNAL_SERVER_ERROR
)
read_request = XmlParser().from_string(request_xml, OtaReadRq)
hotel_read_request = read_request.read_requests.hotel_read_request
hotelid = hotel_read_request.hotel_code
hotelname = hotel_read_request.hotel_name
if hotelname is None:
hotelname = "unknown"
if username is None or password is None or hotelid is None:
return AlpineBitsResponse(
f"Error: Unauthorized Read Request for this specific hotel {hotelname}. Check credentials", HttpStatusCode.UNAUTHORIZED
)
if not validate_hotel_authentication(username, password, hotelid, self.config):
return AlpineBitsResponse(
f"Error: Unauthorized Read Request for this specific hotel {hotelname}. Check credentials", HttpStatusCode.UNAUTHORIZED
)
start_date = None
if hotel_read_request.selection_criteria is not None:
start_date = datetime.fromisoformat(hotel_read_request.selection_criteria.start)
class HotelAvailNotifAction(AlpineBitsAction):
"""Implementation for Hotel Availability Notification action with supports."""
def __init__(self): # query all reservations for this hotel from the database, where start_date is greater than or equal to the given start_date
self.name = AlpineBitsActionName.OTA_HOTEL_AVAIL_NOTIF
self.version = Version.V2022_10
self.supports = [
"OTA_HotelAvailNotif_accept_rooms",
"OTA_HotelAvailNotif_accept_categories",
"OTA_HotelAvailNotif_accept_deltas",
"OTA_HotelAvailNotif_accept_BookingThreshold",
]
async def handle( stmt = (
self, action: str, request_xml: str, version: Version select(Reservation, Customer)
) -> AlpineBitsResponse: .join(Customer, Reservation.customer_id == Customer.id)
"""Handle hotel availability notifications.""" .filter(Reservation.hotel_code == hotelid)
)
if start_date:
stmt = stmt.filter(Reservation.start_date >= start_date)
result = await dbsession.execute(stmt)
reservation_customer_pairs: list[tuple[Reservation, Customer]] = result.all() # List of (Reservation, Customer) tuples
_LOGGER.info(f"Querying reservations and customers for hotel {hotelid} from database")
for reservation, customer in reservation_customer_pairs:
_LOGGER.info(f"Reservation: {reservation.id}, Customer: {customer.given_name}")
# For demonstration, just echo back a simple XML response
response_xml = """<?xml version="1.0" encoding="UTF-8"?> response_xml = """<?xml version="1.0" encoding="UTF-8"?>
<OTA_HotelAvailNotifRS xmlns="http://www.opentravel.org/OTA/2003/05" Version="8.000"> <OTA_ReadRS xmlns="http://www.opentravel.org/OTA/2003/
<Success/> 05" Version="8.000">
</OTA_HotelAvailNotifRS>""" <Success/>
</OTA_ReadRS>"""
return AlpineBitsResponse(response_xml, HttpStatusCode.OK) return AlpineBitsResponse(response_xml, HttpStatusCode.OK)
# class HotelAvailNotifAction(AlpineBitsAction):
# """Implementation for Hotel Availability Notification action with supports."""
# def __init__(self):
# self.name = AlpineBitsActionName.OTA_HOTEL_AVAIL_NOTIF
# self.version = Version.V2022_10
# self.supports = [
# "OTA_HotelAvailNotif_accept_rooms",
# "OTA_HotelAvailNotif_accept_categories",
# "OTA_HotelAvailNotif_accept_deltas",
# "OTA_HotelAvailNotif_accept_BookingThreshold",
# ]
# async def handle(
# self, action: str, request_xml: str, version: Version
# ) -> AlpineBitsResponse:
# """Handle hotel availability notifications."""
# response_xml = """<?xml version="1.0" encoding="UTF-8"?>
# <OTA_HotelAvailNotifRS xmlns="http://www.opentravel.org/OTA/2003/05" Version="8.000">
# <Success/>
# </OTA_HotelAvailNotifRS>"""
# return AlpineBitsResponse(response_xml, HttpStatusCode.OK)
class GuestRequestsAction(AlpineBitsAction): class GuestRequestsAction(AlpineBitsAction):
"""Unimplemented action - will not appear in capabilities.""" """Unimplemented action - will not appear in capabilities."""
@@ -421,15 +552,17 @@ class AlpineBitsServer:
their capabilities, and can respond to handshake requests with its capabilities. their capabilities, and can respond to handshake requests with its capabilities.
""" """
def __init__(self): def __init__(self, config: Dict = None):
self.capabilities = ServerCapabilities() self.capabilities = ServerCapabilities()
self._action_instances = {} self._action_instances = {}
self.config = config
self._initialize_action_instances() self._initialize_action_instances()
def _initialize_action_instances(self): def _initialize_action_instances(self):
"""Initialize instances of all discovered action classes.""" """Initialize instances of all discovered action classes."""
for capability_name, action_class in self.capabilities.action_registry.items(): for capability_name, action_class in self.capabilities.action_registry.items():
self._action_instances[capability_name] = action_class() self._action_instances[capability_name] = action_class(config=self.config)
def get_capabilities(self) -> Dict: def get_capabilities(self) -> Dict:
"""Get server capabilities.""" """Get server capabilities."""
@@ -440,7 +573,7 @@ class AlpineBitsServer:
return self.capabilities.get_capabilities_json() return self.capabilities.get_capabilities_json()
async def handle_request( async def handle_request(
self, request_action_name: str, request_xml: str, version: str = "2024-10" self, request_action_name: str, request_xml: str, version: str = "2024-10", dbsession=None, username=None, password=None
) -> AlpineBitsResponse: ) -> AlpineBitsResponse:
""" """
Handle an incoming AlpineBits request by routing to appropriate action handler. Handle an incoming AlpineBits request by routing to appropriate action handler.
@@ -495,7 +628,7 @@ class AlpineBitsServer:
) )
else: else:
return await action_instance.handle( return await action_instance.handle(
request_action_name, request_xml, version_enum request_action_name, request_xml, version_enum, dbsession=dbsession, username=username, password=password
) )
except Exception as e: except Exception as e:
print(f"Error handling request {request_action_name}: {str(e)}") print(f"Error handling request {request_action_name}: {str(e)}")

View File

@@ -53,20 +53,25 @@ _LOGGER = logging.getLogger(__name__)
security_basic = HTTPBasic() security_basic = HTTPBasic()
# Load config at startup # Load config at startup
try:
config = load_config()
except Exception as e:
_LOGGER.error(f"Failed to load config: {str(e)}")
config = {}
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
# Setup DB # Setup DB
try:
config = load_config()
except Exception as e:
_LOGGER.error(f"Failed to load config: {str(e)}")
config = {}
DATABASE_URL = get_database_url(config) DATABASE_URL = get_database_url(config)
engine = create_async_engine(DATABASE_URL, echo=True) engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False) AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
app.state.engine = engine app.state.engine = engine
app.state.async_sessionmaker = AsyncSessionLocal app.state.async_sessionmaker = AsyncSessionLocal
app.state.config = config
app.state.alpine_bits_server = AlpineBitsServer(config)
# Create tables # Create tables
async with engine.begin() as conn: async with engine.begin() as conn:
@@ -424,6 +429,8 @@ async def validate_basic_auth(
headers={"WWW-Authenticate": "Basic"}, headers={"WWW-Authenticate": "Basic"},
) )
valid = False valid = False
config = app.state.config
for entry in config["alpine_bits_auth"]: for entry in config["alpine_bits_auth"]:
if ( if (
credentials.username == entry["username"] credentials.username == entry["username"]
@@ -440,7 +447,7 @@ async def validate_basic_auth(
_LOGGER.info( _LOGGER.info(
f"AlpineBits authentication successful for user: {credentials.username} (from config)" f"AlpineBits authentication successful for user: {credentials.username} (from config)"
) )
return credentials.username return credentials.username, credentials.password
def parse_multipart_data(content_type: str, body: bytes) -> Dict[str, Any]: def parse_multipart_data(content_type: str, body: bytes) -> Dict[str, Any]:
@@ -505,7 +512,7 @@ def parse_multipart_data(content_type: str, body: bytes) -> Dict[str, Any]:
@api_router.post("/alpinebits/server-2024-10") @api_router.post("/alpinebits/server-2024-10")
@limiter.limit("60/minute") @limiter.limit("60/minute")
async def alpinebits_server_handshake( async def alpinebits_server_handshake(
request: Request, username: str = Depends(validate_basic_auth) request: Request, credentials_tupel: tuple = Depends(validate_basic_auth), dbsession=Depends(get_async_session)
): ):
""" """
AlpineBits server endpoint implementing the handshake protocol. AlpineBits server endpoint implementing the handshake protocol.
@@ -608,12 +615,14 @@ async def alpinebits_server_handshake(
# Get optional request XML # Get optional request XML
request_xml = form_data.get("request") request_xml = form_data.get("request")
server = AlpineBitsServer() server = app.state.alpine_bits_server
version = Version.V2024_10 version = Version.V2024_10
username, password = credentials_tupel
# Create successful handshake response # Create successful handshake response
response = await server.handle_request(action, request_xml, version) response = await server.handle_request(action, request_xml, version, dbsession=dbsession, username=username, password=password)
response_xml = response.xml_content response_xml = response.xml_content

View File

@@ -71,6 +71,7 @@ class Reservation(Base):
customer = relationship("Customer", back_populates="reservations") customer = relationship("Customer", back_populates="reservations")
class HashedCustomer(Base): class HashedCustomer(Base):
__tablename__ = "hashed_customers" __tablename__ = "hashed_customers"
id = Column(Integer, primary_key=True) id = Column(Integer, primary_key=True)

View File

@@ -6,8 +6,20 @@ import sys
from datetime import datetime, timezone, date from datetime import datetime, timezone, date
import re import re
from xsdata_pydantic.bindings import XmlSerializer from xsdata_pydantic.bindings import XmlSerializer
from .alpine_bits_helpers import (
CustomerData,
GuestCountsFactory,
HotelReservationIdData,
AlpineBitsFactory,
OtaMessageType,
CommentData,
CommentsData,
CommentListItemData,
)
from .generated import alpinebits as ab
from datetime import datetime, timezone
from .simplified_access import ( from .alpine_bits_helpers import (
CommentData, CommentData,
CommentsData, CommentsData,
CommentListItemData, CommentListItemData,
@@ -215,18 +227,7 @@ async def main():
def create_xml_from_db(customer: DBCustomer, reservation: DBReservation): def create_xml_from_db(customer: DBCustomer, reservation: DBReservation):
from .simplified_access import (
CustomerData,
GuestCountsFactory,
HotelReservationIdData,
AlpineBitsFactory,
OtaMessageType,
CommentData,
CommentsData,
CommentListItemData,
)
from .generated import alpinebits as ab
from datetime import datetime, timezone
# Prepare data for XML # Prepare data for XML
phone_numbers = [(customer.phone, PhoneTechType.MOBILE)] if customer.phone else [] phone_numbers = [(customer.phone, PhoneTechType.MOBILE)] if customer.phone else []