diff --git a/99Tales_Testexport.xml b/99Tales_Testexport.xml
new file mode 100644
index 0000000..de8e43d
--- /dev/null
+++ b/99Tales_Testexport.xml
@@ -0,0 +1,24 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/logs/push_requests/alpinebits_push_12345_c52702c9-55b9-44e1-b158-ec9544c7_20251007_113838.xml b/logs/push_requests/alpinebits_push_12345_c52702c9-55b9-44e1-b158-ec9544c7_20251007_113838.xml
new file mode 100644
index 0000000..35e5211
--- /dev/null
+++ b/logs/push_requests/alpinebits_push_12345_c52702c9-55b9-44e1-b158-ec9544c7_20251007_113838.xml
@@ -0,0 +1,51 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Frau
+ Genesia
+ Supino
+
+
+ supinogenesia@gmail.com
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 99tales GmbH
+
+
+
+
+
+
+
+
+
diff --git a/src/alpine_bits_python/alpine_bits_helpers.py b/src/alpine_bits_python/alpine_bits_helpers.py
index 0f3c641..1f865f7 100644
--- a/src/alpine_bits_python/alpine_bits_helpers.py
+++ b/src/alpine_bits_python/alpine_bits_helpers.py
@@ -599,9 +599,7 @@ class AlpineBitsFactory:
if isinstance(data, HotelReservationIdData):
if message_type == OtaMessageType.NOTIF:
return HotelReservationIdFactory.create_notif_hotel_reservation_id(data)
- return HotelReservationIdFactory.create_retrieve_hotel_reservation_id(
- data
- )
+ return HotelReservationIdFactory.create_retrieve_hotel_reservation_id(data)
if isinstance(data, CommentsData):
if message_type == OtaMessageType.NOTIF:
@@ -668,6 +666,7 @@ class AlpineBitsFactory:
else:
raise ValueError(f"Unsupported object type: {type(obj)}")
+ return None
def create_res_retrieve_response(list: list[tuple[Reservation, Customer]]):
@@ -726,7 +725,7 @@ def _process_single_reservation(
HotelReservation = RetrieveHotelReservation
Profile = OtaResRetrieveRs.ReservationsList.HotelReservation.ResGlobalInfo.Profiles.ProfileInfo.Profile
else:
- raise ValueError(f"Unsupported message type: {message_type}")
+ raise ValueError("Unsupported message type: %s", message_type.value)
# UniqueID
unique_id = UniqueId(type_value=UniqueIdType2.VALUE_14, id=unique_id_string)
@@ -779,6 +778,7 @@ def _process_single_reservation(
)
# shorten klick_id if longer than 64 characters
+ # TODO MAGIC SHORTENING
if klick_id is not None and len(klick_id) > 64:
klick_id = klick_id[:64]
diff --git a/src/alpine_bits_python/api.py b/src/alpine_bits_python/api.py
index 7865c25..f767e89 100644
--- a/src/alpine_bits_python/api.py
+++ b/src/alpine_bits_python/api.py
@@ -24,10 +24,9 @@ from .alpinebits_server import (
)
from .auth import generate_api_key, generate_unique_id, validate_api_key
from .config_loader import load_config
-from .db import Base
+from .db import Base, get_database_url
from .db import Customer as DBCustomer
from .db import Reservation as DBReservation
-from .db import get_database_url
from .rate_limit import (
BURST_RATE_LIMIT,
DEFAULT_RATE_LIMIT,
@@ -406,6 +405,7 @@ async def process_wix_form_submission(request: Request, data: dict[str, Any], db
unique_id = data.get("submissionId", generate_unique_id())
+ # TODO MAGIC shortening
if len(unique_id) > 32:
# strip to first 35 chars
unique_id = unique_id[:32]
@@ -486,7 +486,7 @@ async def process_wix_form_submission(request: Request, data: dict[str, Any], db
await dispatcher.dispatch_for_hotel(
"form_processed", hotel_code, db_customer, db_reservation
)
- _LOGGER.info(f"Dispatched form_processed event for hotel {hotel_code}")
+ _LOGGER.info("Dispatched form_processed event for hotel %s", hotel_code)
else:
_LOGGER.warning(
"No hotel_code in reservation, skipping push notifications"
@@ -539,6 +539,7 @@ async def handle_wix_form_test(
raise HTTPException(status_code=500, detail="Error processing test data")
+# UNUSED
@api_router.post("/admin/generate-api-key")
@limiter.limit("5/hour") # Very restrictive for admin operations
async def generate_new_api_key(
@@ -566,6 +567,7 @@ async def validate_basic_auth(
credentials: HTTPBasicCredentials = Depends(security_basic),
) -> str:
"""Validate basic authentication for AlpineBits protocol.
+
Returns username if valid, raises HTTPException if not.
"""
# Accept any username/password pair present in config['alpine_bits_auth']
@@ -592,11 +594,13 @@ async def validate_basic_auth(
headers={"WWW-Authenticate": "Basic"},
)
_LOGGER.info(
- f"AlpineBits authentication successful for user: {credentials.username} (from config)"
+ "AlpineBits authentication successful for user: %s (from config)",
+ credentials.username,
)
return credentials.username, credentials.password
+# TODO Bit sketchy. May need requests-toolkit in the future
def parse_multipart_data(content_type: str, body: bytes) -> dict[str, Any]:
"""Parse multipart/form-data from raw request body.
This is a simplified parser for the AlpineBits use case.
@@ -688,12 +692,12 @@ async def alpinebits_server_handshake(
"No X-AlpineBits-ClientProtocolVersion header found, assuming pre-2013-04"
)
else:
- _LOGGER.info(f"Client protocol version: {client_protocol_version}")
+ _LOGGER.info("Client protocol version: %s", client_protocol_version)
# Optional client ID
client_id = request.headers.get("X-AlpineBits-ClientID")
if client_id:
- _LOGGER.info(f"Client ID: {client_id}")
+ _LOGGER.info("Client ID: %s", client_id)
# Check content encoding
content_encoding = request.headers.get("Content-Encoding")
@@ -705,50 +709,14 @@ async def alpinebits_server_handshake(
# Get content type before processing
content_type = request.headers.get("Content-Type", "")
- _LOGGER.info(f"Content-Type: {content_type}")
- _LOGGER.info(f"Content-Encoding: {content_encoding}")
+ _LOGGER.info("Content-Type: %s", content_type)
+ _LOGGER.info("Content-Encoding: %s", content_encoding)
# Get request body
body = await request.body()
# Decompress if needed
- if is_compressed:
- try:
- body = gzip.decompress(body)
-
- except Exception:
- raise HTTPException(
- status_code=400,
- detail="ERROR: Failed to decompress gzip content",
- )
-
- # Check content type (after decompression)
- if (
- "multipart/form-data" not in content_type
- and "application/x-www-form-urlencoded" not in content_type
- ):
- raise HTTPException(
- status_code=400,
- detail="ERROR: Content-Type must be multipart/form-data or application/x-www-form-urlencoded",
- )
-
- # Parse multipart data
- if "multipart/form-data" in content_type:
- try:
- form_data = parse_multipart_data(content_type, body)
- except Exception:
- raise HTTPException(
- status_code=400,
- detail="ERROR: Failed to parse multipart/form-data",
- )
- elif "application/x-www-form-urlencoded" in content_type:
- # Parse as urlencoded
- form_data = dict(urllib.parse.parse_qsl(body.decode("utf-8")))
- else:
- raise HTTPException(
- status_code=400,
- detail="ERROR: Content-Type must be multipart/form-data or application/x-www-form-urlencoded",
- )
+ form_data = validate_alpinebits_body(is_compressed, content_type, body)
# Check for required action parameter
action = form_data.get("action")
@@ -807,6 +775,49 @@ async def alpinebits_server_handshake(
raise HTTPException(status_code=500, detail="Internal server error")
+def validate_alpinebits_body(is_compressed, content_type, body):
+ """Check if the body conforms to AlpineBits expectations."""
+ if is_compressed:
+ try:
+ body = gzip.decompress(body)
+
+ except Exception:
+ raise HTTPException(
+ status_code=400,
+ detail="ERROR: Failed to decompress gzip content",
+ )
+
+ # Check content type (after decompression)
+ if (
+ "multipart/form-data" not in content_type
+ and "application/x-www-form-urlencoded" not in content_type
+ ):
+ raise HTTPException(
+ status_code=400,
+ detail="ERROR: Content-Type must be multipart/form-data or application/x-www-form-urlencoded",
+ )
+
+ # Parse multipart data
+ if "multipart/form-data" in content_type:
+ try:
+ form_data = parse_multipart_data(content_type, body)
+ except Exception:
+ raise HTTPException(
+ status_code=400,
+ detail="ERROR: Failed to parse multipart/form-data",
+ )
+ elif "application/x-www-form-urlencoded" in content_type:
+ # Parse as urlencoded
+ form_data = dict(urllib.parse.parse_qsl(body.decode("utf-8")))
+ else:
+ raise HTTPException(
+ status_code=400,
+ detail="ERROR: Content-Type must be multipart/form-data or application/x-www-form-urlencoded",
+ )
+
+ return form_data
+
+
@api_router.get("/admin/stats")
@limiter.limit("10/minute")
async def get_api_stats(request: Request, admin_key: str = Depends(validate_api_key)):