Better typing + moved some code to webhook_processor
This commit is contained in:
@@ -11,7 +11,7 @@ import urllib.parse
|
|||||||
import xml.dom.minidom
|
import xml.dom.minidom
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from datetime import UTC, date, datetime, timedelta
|
from datetime import UTC, datetime, timedelta
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any
|
||||||
@@ -34,8 +34,7 @@ from sqlalchemy.exc import IntegrityError
|
|||||||
from sqlalchemy.ext.asyncio import async_sessionmaker
|
from sqlalchemy.ext.asyncio import async_sessionmaker
|
||||||
from sqlalchemy.orm import selectinload
|
from sqlalchemy.orm import selectinload
|
||||||
|
|
||||||
from alpine_bits_python.schemas import ReservationData
|
from alpine_bits_python.webhook_processor import process_generic_webhook_submission, process_wix_form_submission
|
||||||
from alpine_bits_python.webhook_processor import process_generic_webhook_submission
|
|
||||||
|
|
||||||
from .alpinebits_server import (
|
from .alpinebits_server import (
|
||||||
AlpineBitsActionName,
|
AlpineBitsActionName,
|
||||||
@@ -43,12 +42,11 @@ from .alpinebits_server import (
|
|||||||
AlpineBitsServer,
|
AlpineBitsServer,
|
||||||
Version,
|
Version,
|
||||||
)
|
)
|
||||||
from .auth import generate_unique_id, validate_api_key
|
from .auth import validate_api_key
|
||||||
from .config_loader import load_config, get_username_for_hotel
|
from .config_loader import load_config, get_username_for_hotel
|
||||||
from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT, HttpStatusCode
|
from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT, HttpStatusCode, WebhookStatus
|
||||||
from .conversion_service import ConversionService
|
from .conversion_service import ConversionService
|
||||||
from .csv_import import CSVImporter
|
from .csv_import import CSVImporter
|
||||||
from .customer_service import CustomerService
|
|
||||||
from .db import Customer as DBCustomer
|
from .db import Customer as DBCustomer
|
||||||
from .db import Reservation as DBReservation
|
from .db import Reservation as DBReservation
|
||||||
from .db import Hotel, WebhookEndpoint, WebhookRequest
|
from .db import Hotel, WebhookEndpoint, WebhookRequest
|
||||||
@@ -66,7 +64,6 @@ from .rate_limit import (
|
|||||||
limiter,
|
limiter,
|
||||||
webhook_limiter,
|
webhook_limiter,
|
||||||
)
|
)
|
||||||
from .reservation_service import ReservationService
|
|
||||||
from .webhook_processor import webhook_registry
|
from .webhook_processor import webhook_registry
|
||||||
from .worker_coordination import is_primary_worker
|
from .worker_coordination import is_primary_worker
|
||||||
|
|
||||||
@@ -700,209 +697,6 @@ async def detect_language(
|
|||||||
raise HTTPException(status_code=500, detail=f"Error detecting language: {e!s}")
|
raise HTTPException(status_code=500, detail=f"Error detecting language: {e!s}")
|
||||||
|
|
||||||
|
|
||||||
# Extracted business logic for handling Wix form submissions
|
|
||||||
async def process_wix_form_submission(request: Request, data: dict[str, Any], db):
|
|
||||||
"""Shared business logic for handling Wix form submissions (test and production)."""
|
|
||||||
timestamp = datetime.now().isoformat()
|
|
||||||
|
|
||||||
_LOGGER.info("Received Wix form data at %s", timestamp)
|
|
||||||
|
|
||||||
data = data.get("data") # Handle nested "data" key if present
|
|
||||||
|
|
||||||
# save customer and reservation to DB
|
|
||||||
|
|
||||||
contact_info = data.get("contact", {})
|
|
||||||
first_name = contact_info.get("name", {}).get("first")
|
|
||||||
last_name = contact_info.get("name", {}).get("last")
|
|
||||||
email = contact_info.get("email")
|
|
||||||
phone_number = contact_info.get("phones", [{}])[0].get("e164Phone")
|
|
||||||
contact_info.get("locale", "de-de")
|
|
||||||
contact_id = contact_info.get("contactId")
|
|
||||||
|
|
||||||
name_prefix = data.get("field:anrede")
|
|
||||||
|
|
||||||
email_newsletter = data.get("field:form_field_5a7b", False)
|
|
||||||
|
|
||||||
# if email_newsletter is a string, attempt to convert to boolean, else false
|
|
||||||
if isinstance(email_newsletter, str):
|
|
||||||
email_newsletter = email_newsletter.lower() in [
|
|
||||||
"yes",
|
|
||||||
"true",
|
|
||||||
"1",
|
|
||||||
"on",
|
|
||||||
"selezionato",
|
|
||||||
"angekreuzt",
|
|
||||||
]
|
|
||||||
|
|
||||||
address_line = None
|
|
||||||
city_name = None
|
|
||||||
postal_code = None
|
|
||||||
country_code = None
|
|
||||||
gender = None
|
|
||||||
birth_date = None
|
|
||||||
language = data.get("contact", {}).get("locale", "en")[:2]
|
|
||||||
|
|
||||||
# Dates
|
|
||||||
start_date = (
|
|
||||||
data.get("field:date_picker_a7c8")
|
|
||||||
or data.get("Anreisedatum")
|
|
||||||
or data.get("submissions", [{}])[1].get("value")
|
|
||||||
)
|
|
||||||
end_date = (
|
|
||||||
data.get("field:date_picker_7e65")
|
|
||||||
or data.get("Abreisedatum")
|
|
||||||
or data.get("submissions", [{}])[2].get("value")
|
|
||||||
)
|
|
||||||
|
|
||||||
# Room/guest info
|
|
||||||
num_adults = int(data.get("field:number_7cf5") or 1)
|
|
||||||
num_children = int(data.get("field:anzahl_kinder") or 0)
|
|
||||||
children_ages = []
|
|
||||||
if num_children > 0:
|
|
||||||
# Collect all child age fields, then take only the first num_children
|
|
||||||
# This handles form updates that may send extra padded/zero fields
|
|
||||||
temp_ages = []
|
|
||||||
for k in data:
|
|
||||||
if k.startswith("field:alter_kind_"):
|
|
||||||
if data[k] is None or data[k] == "":
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
age = int(data[k])
|
|
||||||
temp_ages.append(age)
|
|
||||||
except ValueError:
|
|
||||||
_LOGGER.warning("Invalid age value for %s: %s", k, data[k])
|
|
||||||
|
|
||||||
# Only keep the first num_children ages, regardless of their values
|
|
||||||
children_ages = temp_ages[:num_children]
|
|
||||||
|
|
||||||
offer = data.get("field:angebot_auswaehlen")
|
|
||||||
|
|
||||||
# get submissionId and ensure max length 35. Generate one if not present
|
|
||||||
|
|
||||||
unique_id = data.get("submissionId", generate_unique_id())
|
|
||||||
|
|
||||||
# Use CustomerService to handle customer creation/update with hashing
|
|
||||||
customer_service = CustomerService(db)
|
|
||||||
|
|
||||||
customer_data = {
|
|
||||||
"given_name": first_name,
|
|
||||||
"surname": last_name,
|
|
||||||
"contact_id": contact_id,
|
|
||||||
"name_prefix": name_prefix,
|
|
||||||
"email_address": email,
|
|
||||||
"phone": phone_number,
|
|
||||||
"email_newsletter": email_newsletter,
|
|
||||||
"address_line": address_line,
|
|
||||||
"city_name": city_name,
|
|
||||||
"postal_code": postal_code,
|
|
||||||
"country_code": country_code,
|
|
||||||
"gender": gender,
|
|
||||||
"birth_date": birth_date,
|
|
||||||
"language": language,
|
|
||||||
"address_catalog": False,
|
|
||||||
"name_title": None,
|
|
||||||
}
|
|
||||||
|
|
||||||
# This automatically creates/updates both Customer and HashedCustomer
|
|
||||||
db_customer = await customer_service.get_or_create_customer(customer_data)
|
|
||||||
|
|
||||||
# Determine hotel_code and hotel_name
|
|
||||||
# Priority: 1) Form field, 2) Configuration default, 3) Hardcoded fallback
|
|
||||||
hotel_code = data.get("field:hotelid", None)
|
|
||||||
|
|
||||||
if hotel_code is None:
|
|
||||||
_LOGGER.warning("No hotel_code provided in form data, using default")
|
|
||||||
|
|
||||||
hotel_code = request.app.state.config.get("default_hotel_code", "123")
|
|
||||||
|
|
||||||
hotel_name = (
|
|
||||||
data.get("field:hotelname")
|
|
||||||
or data.get("hotelname")
|
|
||||||
or request.app.state.config.get("default_hotel_name")
|
|
||||||
or "Frangart Inn" # fallback
|
|
||||||
)
|
|
||||||
|
|
||||||
submissionTime = data.get("submissionTime") # 2025-10-07T05:48:41.855Z
|
|
||||||
try:
|
|
||||||
if submissionTime:
|
|
||||||
submissionTime = datetime.fromisoformat(
|
|
||||||
submissionTime[:-1]
|
|
||||||
) # Remove Z and convert
|
|
||||||
except Exception as e:
|
|
||||||
_LOGGER.exception("Error parsing submissionTime: %s", e)
|
|
||||||
submissionTime = None
|
|
||||||
|
|
||||||
# Extract fbclid and gclid for conditional account ID lookup
|
|
||||||
fbclid = data.get("field:fbclid")
|
|
||||||
gclid = data.get("field:gclid")
|
|
||||||
|
|
||||||
# Get advertising account IDs conditionally based on fbclid/gclid presence
|
|
||||||
meta_account_id, google_account_id = get_advertising_account_ids(
|
|
||||||
request.app.state.config, hotel_code, fbclid, gclid
|
|
||||||
)
|
|
||||||
|
|
||||||
reservation = ReservationData(
|
|
||||||
unique_id=unique_id,
|
|
||||||
start_date=date.fromisoformat(start_date),
|
|
||||||
end_date=date.fromisoformat(end_date),
|
|
||||||
num_adults=num_adults,
|
|
||||||
num_children=num_children,
|
|
||||||
children_ages=children_ages,
|
|
||||||
hotel_code=hotel_code,
|
|
||||||
hotel_name=hotel_name,
|
|
||||||
offer=offer,
|
|
||||||
created_at=submissionTime,
|
|
||||||
utm_source=data.get("field:utm_source"),
|
|
||||||
utm_medium=data.get("field:utm_medium"),
|
|
||||||
utm_campaign=data.get("field:utm_campaign"),
|
|
||||||
utm_term=data.get("field:utm_term"),
|
|
||||||
utm_content=data.get("field:utm_content"),
|
|
||||||
user_comment=data.get("field:long_answer_3524", ""),
|
|
||||||
fbclid=fbclid,
|
|
||||||
gclid=gclid,
|
|
||||||
meta_account_id=meta_account_id,
|
|
||||||
google_account_id=google_account_id,
|
|
||||||
)
|
|
||||||
|
|
||||||
if reservation.md5_unique_id is None:
|
|
||||||
raise HTTPException(status_code=400, detail="Failed to generate md5_unique_id")
|
|
||||||
|
|
||||||
# Use ReservationService to create reservation
|
|
||||||
reservation_service = ReservationService(db)
|
|
||||||
db_reservation = await reservation_service.create_reservation(
|
|
||||||
reservation, db_customer.id
|
|
||||||
)
|
|
||||||
|
|
||||||
async def push_event():
|
|
||||||
# Fire event for listeners (push, etc.) - hotel-specific dispatch
|
|
||||||
dispatcher = getattr(request.app.state, "event_dispatcher", None)
|
|
||||||
if dispatcher:
|
|
||||||
# Get hotel_code from reservation to target the right listeners
|
|
||||||
hotel_code = getattr(db_reservation, "hotel_code", None)
|
|
||||||
if hotel_code and hotel_code.strip():
|
|
||||||
await dispatcher.dispatch_for_hotel(
|
|
||||||
"form_processed", hotel_code, db_customer, db_reservation
|
|
||||||
)
|
|
||||||
_LOGGER.info("Dispatched form_processed event for hotel %s", hotel_code)
|
|
||||||
else:
|
|
||||||
_LOGGER.warning(
|
|
||||||
"No hotel_code in reservation, skipping push notifications"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Create task and store reference to prevent it from being garbage collected
|
|
||||||
# The task runs independently and we don't need to await it here
|
|
||||||
task = asyncio.create_task(push_event())
|
|
||||||
# Add done callback to log any exceptions that occur in the background task
|
|
||||||
task.add_done_callback(lambda t: t.exception() if not t.cancelled() else None)
|
|
||||||
|
|
||||||
return {
|
|
||||||
"status": "success",
|
|
||||||
"message": "Wix form data received successfully",
|
|
||||||
"received_keys": list(data.keys()),
|
|
||||||
"timestamp": timestamp,
|
|
||||||
"note": "No authentication required for this endpoint",
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
async def validate_basic_auth(
|
async def validate_basic_auth(
|
||||||
credentials: HTTPBasicCredentials = Depends(security_basic),
|
credentials: HTTPBasicCredentials = Depends(security_basic),
|
||||||
@@ -974,7 +768,7 @@ async def handle_webhook_unified(
|
|||||||
)
|
)
|
||||||
.options(selectinload(WebhookEndpoint.hotel))
|
.options(selectinload(WebhookEndpoint.hotel))
|
||||||
)
|
)
|
||||||
webhook_endpoint = result.scalar_one_or_none()
|
webhook_endpoint: WebhookEndpoint | None = result.scalar_one_or_none()
|
||||||
|
|
||||||
if not webhook_endpoint or not webhook_endpoint.hotel.is_active:
|
if not webhook_endpoint or not webhook_endpoint.hotel.is_active:
|
||||||
raise HTTPException(status_code=404, detail="Webhook not found")
|
raise HTTPException(status_code=404, detail="Webhook not found")
|
||||||
@@ -1015,7 +809,7 @@ async def handle_webhook_unified(
|
|||||||
existing = duplicate.scalar_one_or_none()
|
existing = duplicate.scalar_one_or_none()
|
||||||
|
|
||||||
if existing:
|
if existing:
|
||||||
if existing.status == 'completed':
|
if existing.status == WebhookStatus.COMPLETED:
|
||||||
# Already processed successfully
|
# Already processed successfully
|
||||||
_LOGGER.info(
|
_LOGGER.info(
|
||||||
"Webhook already processed (webhook_id=%d, hotel=%s)",
|
"Webhook already processed (webhook_id=%d, hotel=%s)",
|
||||||
@@ -1028,7 +822,7 @@ async def handle_webhook_unified(
|
|||||||
"webhook_id": existing.id,
|
"webhook_id": existing.id,
|
||||||
"duplicate": True,
|
"duplicate": True,
|
||||||
}
|
}
|
||||||
elif existing.status == 'processing':
|
elif existing.status == WebhookStatus.PROCESSING:
|
||||||
# Another worker is processing right now
|
# Another worker is processing right now
|
||||||
_LOGGER.info(
|
_LOGGER.info(
|
||||||
"Webhook is being processed by another worker (webhook_id=%d)",
|
"Webhook is being processed by another worker (webhook_id=%d)",
|
||||||
@@ -1040,7 +834,7 @@ async def handle_webhook_unified(
|
|||||||
"webhook_id": existing.id,
|
"webhook_id": existing.id,
|
||||||
"duplicate": True,
|
"duplicate": True,
|
||||||
}
|
}
|
||||||
elif existing.status == 'failed':
|
elif existing.status == WebhookStatus.FAILED:
|
||||||
# Retry failed webhook
|
# Retry failed webhook
|
||||||
_LOGGER.info(
|
_LOGGER.info(
|
||||||
"Retrying failed webhook (webhook_id=%d, retry_count=%d)",
|
"Retrying failed webhook (webhook_id=%d, retry_count=%d)",
|
||||||
@@ -1049,7 +843,7 @@ async def handle_webhook_unified(
|
|||||||
)
|
)
|
||||||
webhook_request = existing
|
webhook_request = existing
|
||||||
webhook_request.retry_count += 1
|
webhook_request.retry_count += 1
|
||||||
webhook_request.status = 'processing'
|
webhook_request.status = WebhookStatus.PROCESSING
|
||||||
webhook_request.processing_started_at = timestamp
|
webhook_request.processing_started_at = timestamp
|
||||||
else:
|
else:
|
||||||
# 5. Create new webhook_request
|
# 5. Create new webhook_request
|
||||||
@@ -1057,9 +851,8 @@ async def handle_webhook_unified(
|
|||||||
payload_hash=payload_hash,
|
payload_hash=payload_hash,
|
||||||
webhook_endpoint_id=webhook_endpoint.id,
|
webhook_endpoint_id=webhook_endpoint.id,
|
||||||
hotel_id=webhook_endpoint.hotel_id,
|
hotel_id=webhook_endpoint.hotel_id,
|
||||||
status='processing',
|
status=WebhookStatus.PROCESSING,
|
||||||
payload_json=payload,
|
payload_json=payload,
|
||||||
payload_size_bytes=payload_size,
|
|
||||||
processing_started_at=timestamp,
|
processing_started_at=timestamp,
|
||||||
created_at=timestamp,
|
created_at=timestamp,
|
||||||
source_ip=request.client.host if request.client else None,
|
source_ip=request.client.host if request.client else None,
|
||||||
@@ -1084,7 +877,7 @@ async def handle_webhook_unified(
|
|||||||
)
|
)
|
||||||
|
|
||||||
# 8. Update status
|
# 8. Update status
|
||||||
webhook_request.status = 'completed'
|
webhook_request.status = WebhookStatus.COMPLETED
|
||||||
webhook_request.processing_completed_at = datetime.now(UTC)
|
webhook_request.processing_completed_at = datetime.now(UTC)
|
||||||
|
|
||||||
await db_session.commit()
|
await db_session.commit()
|
||||||
|
|||||||
Reference in New Issue
Block a user