Better typing + moved some code to webhook_processor

This commit is contained in:
Jonas Linter
2025-11-25 20:20:40 +01:00
parent 556eef529a
commit 97994c5a38

View File

@@ -11,7 +11,7 @@ import urllib.parse
import xml.dom.minidom
from collections import defaultdict
from contextlib import asynccontextmanager
from datetime import UTC, date, datetime, timedelta
from datetime import UTC, datetime, timedelta
from functools import partial
from pathlib import Path
from typing import Any
@@ -34,8 +34,7 @@ from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.orm import selectinload
from alpine_bits_python.schemas import ReservationData
from alpine_bits_python.webhook_processor import process_generic_webhook_submission
from alpine_bits_python.webhook_processor import process_generic_webhook_submission, process_wix_form_submission
from .alpinebits_server import (
AlpineBitsActionName,
@@ -43,12 +42,11 @@ from .alpinebits_server import (
AlpineBitsServer,
Version,
)
from .auth import generate_unique_id, validate_api_key
from .auth import validate_api_key
from .config_loader import load_config, get_username_for_hotel
from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT, HttpStatusCode
from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT, HttpStatusCode, WebhookStatus
from .conversion_service import ConversionService
from .csv_import import CSVImporter
from .customer_service import CustomerService
from .db import Customer as DBCustomer
from .db import Reservation as DBReservation
from .db import Hotel, WebhookEndpoint, WebhookRequest
@@ -66,7 +64,6 @@ from .rate_limit import (
limiter,
webhook_limiter,
)
from .reservation_service import ReservationService
from .webhook_processor import webhook_registry
from .worker_coordination import is_primary_worker
@@ -700,209 +697,6 @@ async def detect_language(
raise HTTPException(status_code=500, detail=f"Error detecting language: {e!s}")
# Extracted business logic for handling Wix form submissions
async def process_wix_form_submission(request: Request, data: dict[str, Any], db):
"""Shared business logic for handling Wix form submissions (test and production)."""
timestamp = datetime.now().isoformat()
_LOGGER.info("Received Wix form data at %s", timestamp)
data = data.get("data") # Handle nested "data" key if present
# save customer and reservation to DB
contact_info = data.get("contact", {})
first_name = contact_info.get("name", {}).get("first")
last_name = contact_info.get("name", {}).get("last")
email = contact_info.get("email")
phone_number = contact_info.get("phones", [{}])[0].get("e164Phone")
contact_info.get("locale", "de-de")
contact_id = contact_info.get("contactId")
name_prefix = data.get("field:anrede")
email_newsletter = data.get("field:form_field_5a7b", False)
# if email_newsletter is a string, attempt to convert to boolean, else false
if isinstance(email_newsletter, str):
email_newsletter = email_newsletter.lower() in [
"yes",
"true",
"1",
"on",
"selezionato",
"angekreuzt",
]
address_line = None
city_name = None
postal_code = None
country_code = None
gender = None
birth_date = None
language = data.get("contact", {}).get("locale", "en")[:2]
# Dates
start_date = (
data.get("field:date_picker_a7c8")
or data.get("Anreisedatum")
or data.get("submissions", [{}])[1].get("value")
)
end_date = (
data.get("field:date_picker_7e65")
or data.get("Abreisedatum")
or data.get("submissions", [{}])[2].get("value")
)
# Room/guest info
num_adults = int(data.get("field:number_7cf5") or 1)
num_children = int(data.get("field:anzahl_kinder") or 0)
children_ages = []
if num_children > 0:
# Collect all child age fields, then take only the first num_children
# This handles form updates that may send extra padded/zero fields
temp_ages = []
for k in data:
if k.startswith("field:alter_kind_"):
if data[k] is None or data[k] == "":
continue
try:
age = int(data[k])
temp_ages.append(age)
except ValueError:
_LOGGER.warning("Invalid age value for %s: %s", k, data[k])
# Only keep the first num_children ages, regardless of their values
children_ages = temp_ages[:num_children]
offer = data.get("field:angebot_auswaehlen")
# get submissionId and ensure max length 35. Generate one if not present
unique_id = data.get("submissionId", generate_unique_id())
# Use CustomerService to handle customer creation/update with hashing
customer_service = CustomerService(db)
customer_data = {
"given_name": first_name,
"surname": last_name,
"contact_id": contact_id,
"name_prefix": name_prefix,
"email_address": email,
"phone": phone_number,
"email_newsletter": email_newsletter,
"address_line": address_line,
"city_name": city_name,
"postal_code": postal_code,
"country_code": country_code,
"gender": gender,
"birth_date": birth_date,
"language": language,
"address_catalog": False,
"name_title": None,
}
# This automatically creates/updates both Customer and HashedCustomer
db_customer = await customer_service.get_or_create_customer(customer_data)
# Determine hotel_code and hotel_name
# Priority: 1) Form field, 2) Configuration default, 3) Hardcoded fallback
hotel_code = data.get("field:hotelid", None)
if hotel_code is None:
_LOGGER.warning("No hotel_code provided in form data, using default")
hotel_code = request.app.state.config.get("default_hotel_code", "123")
hotel_name = (
data.get("field:hotelname")
or data.get("hotelname")
or request.app.state.config.get("default_hotel_name")
or "Frangart Inn" # fallback
)
submissionTime = data.get("submissionTime") # 2025-10-07T05:48:41.855Z
try:
if submissionTime:
submissionTime = datetime.fromisoformat(
submissionTime[:-1]
) # Remove Z and convert
except Exception as e:
_LOGGER.exception("Error parsing submissionTime: %s", e)
submissionTime = None
# Extract fbclid and gclid for conditional account ID lookup
fbclid = data.get("field:fbclid")
gclid = data.get("field:gclid")
# Get advertising account IDs conditionally based on fbclid/gclid presence
meta_account_id, google_account_id = get_advertising_account_ids(
request.app.state.config, hotel_code, fbclid, gclid
)
reservation = ReservationData(
unique_id=unique_id,
start_date=date.fromisoformat(start_date),
end_date=date.fromisoformat(end_date),
num_adults=num_adults,
num_children=num_children,
children_ages=children_ages,
hotel_code=hotel_code,
hotel_name=hotel_name,
offer=offer,
created_at=submissionTime,
utm_source=data.get("field:utm_source"),
utm_medium=data.get("field:utm_medium"),
utm_campaign=data.get("field:utm_campaign"),
utm_term=data.get("field:utm_term"),
utm_content=data.get("field:utm_content"),
user_comment=data.get("field:long_answer_3524", ""),
fbclid=fbclid,
gclid=gclid,
meta_account_id=meta_account_id,
google_account_id=google_account_id,
)
if reservation.md5_unique_id is None:
raise HTTPException(status_code=400, detail="Failed to generate md5_unique_id")
# Use ReservationService to create reservation
reservation_service = ReservationService(db)
db_reservation = await reservation_service.create_reservation(
reservation, db_customer.id
)
async def push_event():
# Fire event for listeners (push, etc.) - hotel-specific dispatch
dispatcher = getattr(request.app.state, "event_dispatcher", None)
if dispatcher:
# Get hotel_code from reservation to target the right listeners
hotel_code = getattr(db_reservation, "hotel_code", None)
if hotel_code and hotel_code.strip():
await dispatcher.dispatch_for_hotel(
"form_processed", hotel_code, db_customer, db_reservation
)
_LOGGER.info("Dispatched form_processed event for hotel %s", hotel_code)
else:
_LOGGER.warning(
"No hotel_code in reservation, skipping push notifications"
)
# Create task and store reference to prevent it from being garbage collected
# The task runs independently and we don't need to await it here
task = asyncio.create_task(push_event())
# Add done callback to log any exceptions that occur in the background task
task.add_done_callback(lambda t: t.exception() if not t.cancelled() else None)
return {
"status": "success",
"message": "Wix form data received successfully",
"received_keys": list(data.keys()),
"timestamp": timestamp,
"note": "No authentication required for this endpoint",
}
async def validate_basic_auth(
credentials: HTTPBasicCredentials = Depends(security_basic),
@@ -974,7 +768,7 @@ async def handle_webhook_unified(
)
.options(selectinload(WebhookEndpoint.hotel))
)
webhook_endpoint = result.scalar_one_or_none()
webhook_endpoint: WebhookEndpoint | None = result.scalar_one_or_none()
if not webhook_endpoint or not webhook_endpoint.hotel.is_active:
raise HTTPException(status_code=404, detail="Webhook not found")
@@ -1015,7 +809,7 @@ async def handle_webhook_unified(
existing = duplicate.scalar_one_or_none()
if existing:
if existing.status == 'completed':
if existing.status == WebhookStatus.COMPLETED:
# Already processed successfully
_LOGGER.info(
"Webhook already processed (webhook_id=%d, hotel=%s)",
@@ -1028,7 +822,7 @@ async def handle_webhook_unified(
"webhook_id": existing.id,
"duplicate": True,
}
elif existing.status == 'processing':
elif existing.status == WebhookStatus.PROCESSING:
# Another worker is processing right now
_LOGGER.info(
"Webhook is being processed by another worker (webhook_id=%d)",
@@ -1040,7 +834,7 @@ async def handle_webhook_unified(
"webhook_id": existing.id,
"duplicate": True,
}
elif existing.status == 'failed':
elif existing.status == WebhookStatus.FAILED:
# Retry failed webhook
_LOGGER.info(
"Retrying failed webhook (webhook_id=%d, retry_count=%d)",
@@ -1049,7 +843,7 @@ async def handle_webhook_unified(
)
webhook_request = existing
webhook_request.retry_count += 1
webhook_request.status = 'processing'
webhook_request.status = WebhookStatus.PROCESSING
webhook_request.processing_started_at = timestamp
else:
# 5. Create new webhook_request
@@ -1057,9 +851,8 @@ async def handle_webhook_unified(
payload_hash=payload_hash,
webhook_endpoint_id=webhook_endpoint.id,
hotel_id=webhook_endpoint.hotel_id,
status='processing',
status=WebhookStatus.PROCESSING,
payload_json=payload,
payload_size_bytes=payload_size,
processing_started_at=timestamp,
created_at=timestamp,
source_ip=request.client.host if request.client else None,
@@ -1084,7 +877,7 @@ async def handle_webhook_unified(
)
# 8. Update status
webhook_request.status = 'completed'
webhook_request.status = WebhookStatus.COMPLETED
webhook_request.processing_completed_at = datetime.now(UTC)
await db_session.commit()