Compare commits
7 Commits
b77a0be80f
...
454b524077
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
454b524077 | ||
|
|
aec8a99b71 | ||
|
|
4ab0888508 | ||
|
|
2941519899 | ||
|
|
97994c5a38 | ||
|
|
556eef529a | ||
|
|
7b010ce65e |
@@ -10,6 +10,8 @@ from typing import Sequence, Union
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
from alpine_bits_python.const import WebhookStatus
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = 'e7ee03d8f430'
|
||||
@@ -66,13 +68,12 @@ def upgrade() -> None:
|
||||
sa.Column('payload_hash', sa.String(length=64), nullable=False),
|
||||
sa.Column('webhook_endpoint_id', sa.Integer(), nullable=True),
|
||||
sa.Column('hotel_id', sa.String(length=50), nullable=True),
|
||||
sa.Column('status', sa.String(length=20), nullable=False, default='pending'),
|
||||
sa.Column('status', sa.String(length=20), nullable=False, default=WebhookStatus.PENDING.value),
|
||||
sa.Column('processing_started_at', sa.DateTime(timezone=True), nullable=True),
|
||||
sa.Column('processing_completed_at', sa.DateTime(timezone=True), nullable=True),
|
||||
sa.Column('retry_count', sa.Integer(), nullable=True, default=0),
|
||||
sa.Column('last_error', sa.String(length=2000), nullable=True),
|
||||
sa.Column('payload_json', sa.JSON(), nullable=True),
|
||||
sa.Column('payload_size_bytes', sa.Integer(), nullable=True),
|
||||
sa.Column('purged_at', sa.DateTime(timezone=True), nullable=True),
|
||||
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
|
||||
sa.Column('source_ip', sa.String(length=45), nullable=True),
|
||||
|
||||
@@ -11,7 +11,7 @@ import urllib.parse
|
||||
import xml.dom.minidom
|
||||
from collections import defaultdict
|
||||
from contextlib import asynccontextmanager
|
||||
from datetime import UTC, date, datetime, timedelta
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from functools import partial
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
@@ -34,8 +34,7 @@ from sqlalchemy.exc import IntegrityError
|
||||
from sqlalchemy.ext.asyncio import async_sessionmaker
|
||||
from sqlalchemy.orm import selectinload
|
||||
|
||||
from alpine_bits_python.schemas import ReservationData
|
||||
from alpine_bits_python.webhook_processor import process_generic_webhook_submission
|
||||
from alpine_bits_python.webhook_processor import process_generic_webhook_submission, process_wix_form_submission
|
||||
|
||||
from .alpinebits_server import (
|
||||
AlpineBitsActionName,
|
||||
@@ -43,12 +42,11 @@ from .alpinebits_server import (
|
||||
AlpineBitsServer,
|
||||
Version,
|
||||
)
|
||||
from .auth import generate_unique_id, validate_api_key
|
||||
from .auth import validate_api_key
|
||||
from .config_loader import load_config, get_username_for_hotel
|
||||
from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT, HttpStatusCode
|
||||
from .const import HttpStatusCode, WebhookStatus
|
||||
from .conversion_service import ConversionService
|
||||
from .csv_import import CSVImporter
|
||||
from .customer_service import CustomerService
|
||||
from .db import Customer as DBCustomer
|
||||
from .db import Reservation as DBReservation
|
||||
from .db import Hotel, WebhookEndpoint, WebhookRequest
|
||||
@@ -66,7 +64,6 @@ from .rate_limit import (
|
||||
limiter,
|
||||
webhook_limiter,
|
||||
)
|
||||
from .reservation_service import ReservationService
|
||||
from .webhook_processor import webhook_registry
|
||||
from .worker_coordination import is_primary_worker
|
||||
|
||||
@@ -82,45 +79,6 @@ security_bearer = HTTPBearer()
|
||||
TOKEN_LOG_LENGTH = 10
|
||||
|
||||
|
||||
def get_advertising_account_ids(
|
||||
config: dict[str, Any], hotel_code: str, fbclid: str | None, gclid: str | None
|
||||
) -> tuple[str | None, str | None]:
|
||||
"""Get advertising account IDs based on hotel config and click IDs.
|
||||
|
||||
Args:
|
||||
config: Application configuration dict
|
||||
hotel_code: Hotel identifier to look up in config
|
||||
fbclid: Facebook click ID (if present, meta_account_id will be returned)
|
||||
gclid: Google click ID (if present, google_account_id will be returned)
|
||||
|
||||
Returns:
|
||||
Tuple of (meta_account_id, google_account_id) based on conditional logic:
|
||||
- meta_account_id is set only if fbclid is present AND hotel has
|
||||
meta_account configured
|
||||
- google_account_id is set only if gclid is present AND hotel has
|
||||
google_account configured
|
||||
|
||||
"""
|
||||
meta_account_id = None
|
||||
google_account_id = None
|
||||
|
||||
# Look up hotel in config
|
||||
alpine_bits_auth = config.get("alpine_bits_auth", [])
|
||||
for hotel in alpine_bits_auth:
|
||||
if hotel.get(CONF_HOTEL_ID) == hotel_code:
|
||||
# Conditionally set meta_account_id if fbclid is present
|
||||
if fbclid:
|
||||
meta_account_id = hotel.get(CONF_META_ACCOUNT)
|
||||
|
||||
# Conditionally set google_account_id if gclid is present
|
||||
if gclid:
|
||||
google_account_id = hotel.get(CONF_GOOGLE_ACCOUNT)
|
||||
|
||||
break
|
||||
|
||||
return meta_account_id, google_account_id
|
||||
|
||||
|
||||
# Pydantic models for language detection
|
||||
class LanguageDetectionRequest(BaseModel):
|
||||
text: str
|
||||
@@ -279,12 +237,12 @@ async def cleanup_stale_webhooks(
|
||||
update(WebhookRequest)
|
||||
.where(
|
||||
and_(
|
||||
WebhookRequest.status == 'processing',
|
||||
WebhookRequest.status == WebhookStatus.PROCESSING,
|
||||
WebhookRequest.processing_started_at < timeout_threshold
|
||||
)
|
||||
)
|
||||
.values(
|
||||
status='failed',
|
||||
status=WebhookStatus.FAILED,
|
||||
last_error='Processing timeout - worker may have crashed'
|
||||
)
|
||||
)
|
||||
@@ -319,7 +277,7 @@ async def purge_old_webhook_payloads(
|
||||
update(WebhookRequest)
|
||||
.where(
|
||||
and_(
|
||||
WebhookRequest.status == 'completed',
|
||||
WebhookRequest.status == WebhookStatus.COMPLETED,
|
||||
WebhookRequest.created_at < cutoff_date,
|
||||
WebhookRequest.purged_at.is_(None) # Not already purged
|
||||
)
|
||||
@@ -700,209 +658,6 @@ async def detect_language(
|
||||
raise HTTPException(status_code=500, detail=f"Error detecting language: {e!s}")
|
||||
|
||||
|
||||
# Extracted business logic for handling Wix form submissions
|
||||
async def process_wix_form_submission(request: Request, data: dict[str, Any], db):
|
||||
"""Shared business logic for handling Wix form submissions (test and production)."""
|
||||
timestamp = datetime.now().isoformat()
|
||||
|
||||
_LOGGER.info("Received Wix form data at %s", timestamp)
|
||||
|
||||
data = data.get("data") # Handle nested "data" key if present
|
||||
|
||||
# save customer and reservation to DB
|
||||
|
||||
contact_info = data.get("contact", {})
|
||||
first_name = contact_info.get("name", {}).get("first")
|
||||
last_name = contact_info.get("name", {}).get("last")
|
||||
email = contact_info.get("email")
|
||||
phone_number = contact_info.get("phones", [{}])[0].get("e164Phone")
|
||||
contact_info.get("locale", "de-de")
|
||||
contact_id = contact_info.get("contactId")
|
||||
|
||||
name_prefix = data.get("field:anrede")
|
||||
|
||||
email_newsletter = data.get("field:form_field_5a7b", False)
|
||||
|
||||
# if email_newsletter is a string, attempt to convert to boolean, else false
|
||||
if isinstance(email_newsletter, str):
|
||||
email_newsletter = email_newsletter.lower() in [
|
||||
"yes",
|
||||
"true",
|
||||
"1",
|
||||
"on",
|
||||
"selezionato",
|
||||
"angekreuzt",
|
||||
]
|
||||
|
||||
address_line = None
|
||||
city_name = None
|
||||
postal_code = None
|
||||
country_code = None
|
||||
gender = None
|
||||
birth_date = None
|
||||
language = data.get("contact", {}).get("locale", "en")[:2]
|
||||
|
||||
# Dates
|
||||
start_date = (
|
||||
data.get("field:date_picker_a7c8")
|
||||
or data.get("Anreisedatum")
|
||||
or data.get("submissions", [{}])[1].get("value")
|
||||
)
|
||||
end_date = (
|
||||
data.get("field:date_picker_7e65")
|
||||
or data.get("Abreisedatum")
|
||||
or data.get("submissions", [{}])[2].get("value")
|
||||
)
|
||||
|
||||
# Room/guest info
|
||||
num_adults = int(data.get("field:number_7cf5") or 1)
|
||||
num_children = int(data.get("field:anzahl_kinder") or 0)
|
||||
children_ages = []
|
||||
if num_children > 0:
|
||||
# Collect all child age fields, then take only the first num_children
|
||||
# This handles form updates that may send extra padded/zero fields
|
||||
temp_ages = []
|
||||
for k in data:
|
||||
if k.startswith("field:alter_kind_"):
|
||||
if data[k] is None or data[k] == "":
|
||||
continue
|
||||
try:
|
||||
age = int(data[k])
|
||||
temp_ages.append(age)
|
||||
except ValueError:
|
||||
_LOGGER.warning("Invalid age value for %s: %s", k, data[k])
|
||||
|
||||
# Only keep the first num_children ages, regardless of their values
|
||||
children_ages = temp_ages[:num_children]
|
||||
|
||||
offer = data.get("field:angebot_auswaehlen")
|
||||
|
||||
# get submissionId and ensure max length 35. Generate one if not present
|
||||
|
||||
unique_id = data.get("submissionId", generate_unique_id())
|
||||
|
||||
# Use CustomerService to handle customer creation/update with hashing
|
||||
customer_service = CustomerService(db)
|
||||
|
||||
customer_data = {
|
||||
"given_name": first_name,
|
||||
"surname": last_name,
|
||||
"contact_id": contact_id,
|
||||
"name_prefix": name_prefix,
|
||||
"email_address": email,
|
||||
"phone": phone_number,
|
||||
"email_newsletter": email_newsletter,
|
||||
"address_line": address_line,
|
||||
"city_name": city_name,
|
||||
"postal_code": postal_code,
|
||||
"country_code": country_code,
|
||||
"gender": gender,
|
||||
"birth_date": birth_date,
|
||||
"language": language,
|
||||
"address_catalog": False,
|
||||
"name_title": None,
|
||||
}
|
||||
|
||||
# This automatically creates/updates both Customer and HashedCustomer
|
||||
db_customer = await customer_service.get_or_create_customer(customer_data)
|
||||
|
||||
# Determine hotel_code and hotel_name
|
||||
# Priority: 1) Form field, 2) Configuration default, 3) Hardcoded fallback
|
||||
hotel_code = data.get("field:hotelid", None)
|
||||
|
||||
if hotel_code is None:
|
||||
_LOGGER.warning("No hotel_code provided in form data, using default")
|
||||
|
||||
hotel_code = request.app.state.config.get("default_hotel_code", "123")
|
||||
|
||||
hotel_name = (
|
||||
data.get("field:hotelname")
|
||||
or data.get("hotelname")
|
||||
or request.app.state.config.get("default_hotel_name")
|
||||
or "Frangart Inn" # fallback
|
||||
)
|
||||
|
||||
submissionTime = data.get("submissionTime") # 2025-10-07T05:48:41.855Z
|
||||
try:
|
||||
if submissionTime:
|
||||
submissionTime = datetime.fromisoformat(
|
||||
submissionTime[:-1]
|
||||
) # Remove Z and convert
|
||||
except Exception as e:
|
||||
_LOGGER.exception("Error parsing submissionTime: %s", e)
|
||||
submissionTime = None
|
||||
|
||||
# Extract fbclid and gclid for conditional account ID lookup
|
||||
fbclid = data.get("field:fbclid")
|
||||
gclid = data.get("field:gclid")
|
||||
|
||||
# Get advertising account IDs conditionally based on fbclid/gclid presence
|
||||
meta_account_id, google_account_id = get_advertising_account_ids(
|
||||
request.app.state.config, hotel_code, fbclid, gclid
|
||||
)
|
||||
|
||||
reservation = ReservationData(
|
||||
unique_id=unique_id,
|
||||
start_date=date.fromisoformat(start_date),
|
||||
end_date=date.fromisoformat(end_date),
|
||||
num_adults=num_adults,
|
||||
num_children=num_children,
|
||||
children_ages=children_ages,
|
||||
hotel_code=hotel_code,
|
||||
hotel_name=hotel_name,
|
||||
offer=offer,
|
||||
created_at=submissionTime,
|
||||
utm_source=data.get("field:utm_source"),
|
||||
utm_medium=data.get("field:utm_medium"),
|
||||
utm_campaign=data.get("field:utm_campaign"),
|
||||
utm_term=data.get("field:utm_term"),
|
||||
utm_content=data.get("field:utm_content"),
|
||||
user_comment=data.get("field:long_answer_3524", ""),
|
||||
fbclid=fbclid,
|
||||
gclid=gclid,
|
||||
meta_account_id=meta_account_id,
|
||||
google_account_id=google_account_id,
|
||||
)
|
||||
|
||||
if reservation.md5_unique_id is None:
|
||||
raise HTTPException(status_code=400, detail="Failed to generate md5_unique_id")
|
||||
|
||||
# Use ReservationService to create reservation
|
||||
reservation_service = ReservationService(db)
|
||||
db_reservation = await reservation_service.create_reservation(
|
||||
reservation, db_customer.id
|
||||
)
|
||||
|
||||
async def push_event():
|
||||
# Fire event for listeners (push, etc.) - hotel-specific dispatch
|
||||
dispatcher = getattr(request.app.state, "event_dispatcher", None)
|
||||
if dispatcher:
|
||||
# Get hotel_code from reservation to target the right listeners
|
||||
hotel_code = getattr(db_reservation, "hotel_code", None)
|
||||
if hotel_code and hotel_code.strip():
|
||||
await dispatcher.dispatch_for_hotel(
|
||||
"form_processed", hotel_code, db_customer, db_reservation
|
||||
)
|
||||
_LOGGER.info("Dispatched form_processed event for hotel %s", hotel_code)
|
||||
else:
|
||||
_LOGGER.warning(
|
||||
"No hotel_code in reservation, skipping push notifications"
|
||||
)
|
||||
|
||||
# Create task and store reference to prevent it from being garbage collected
|
||||
# The task runs independently and we don't need to await it here
|
||||
task = asyncio.create_task(push_event())
|
||||
# Add done callback to log any exceptions that occur in the background task
|
||||
task.add_done_callback(lambda t: t.exception() if not t.cancelled() else None)
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"message": "Wix form data received successfully",
|
||||
"received_keys": list(data.keys()),
|
||||
"timestamp": timestamp,
|
||||
"note": "No authentication required for this endpoint",
|
||||
}
|
||||
|
||||
|
||||
async def validate_basic_auth(
|
||||
credentials: HTTPBasicCredentials = Depends(security_basic),
|
||||
@@ -950,8 +705,13 @@ async def handle_webhook_unified(
|
||||
):
|
||||
"""Unified webhook handler with deduplication and routing.
|
||||
|
||||
Supports both new secure webhook URLs and legacy endpoints:
|
||||
- /webhook/{64-char-secret} - New secure endpoints
|
||||
- /webhook/wix-form - Legacy Wix form endpoint (extracts hotel from payload)
|
||||
- /webhook/generic - Legacy generic webhook endpoint (extracts hotel from payload)
|
||||
|
||||
Flow:
|
||||
1. Look up webhook_endpoint by webhook_secret
|
||||
1. Look up webhook_endpoint by webhook_secret (or detect legacy endpoint)
|
||||
2. Parse and hash payload (SHA256)
|
||||
3. Check for duplicate using SELECT FOR UPDATE SKIP LOCKED
|
||||
4. If duplicate and completed: return success (idempotent)
|
||||
@@ -963,23 +723,7 @@ async def handle_webhook_unified(
|
||||
"""
|
||||
timestamp = datetime.now(UTC)
|
||||
|
||||
# 1. Look up webhook_endpoint
|
||||
result = await db_session.execute(
|
||||
select(WebhookEndpoint)
|
||||
.where(
|
||||
and_(
|
||||
WebhookEndpoint.webhook_secret == webhook_secret,
|
||||
WebhookEndpoint.is_enabled == True
|
||||
)
|
||||
)
|
||||
.options(selectinload(WebhookEndpoint.hotel))
|
||||
)
|
||||
webhook_endpoint = result.scalar_one_or_none()
|
||||
|
||||
if not webhook_endpoint or not webhook_endpoint.hotel.is_active:
|
||||
raise HTTPException(status_code=404, detail="Webhook not found")
|
||||
|
||||
# 2. Parse payload
|
||||
# 2. Parse payload first (needed for legacy endpoint detection)
|
||||
body = await request.body()
|
||||
|
||||
# Handle gzip compression
|
||||
@@ -996,6 +740,84 @@ async def handle_webhook_unified(
|
||||
_LOGGER.error("Failed to parse JSON payload: %s", e)
|
||||
raise HTTPException(status_code=400, detail="Invalid JSON payload")
|
||||
|
||||
# 1. Detect if this is a legacy endpoint or look up webhook_endpoint
|
||||
webhook_endpoint: WebhookEndpoint | None = None
|
||||
is_legacy = False
|
||||
webhook_type = None
|
||||
hotel_id_from_payload = None
|
||||
|
||||
# Check if webhook_secret looks like a legacy endpoint name
|
||||
if webhook_secret in ("wix-form", "generic"):
|
||||
is_legacy = True
|
||||
webhook_type = "wix_form" if webhook_secret == "wix-form" else "generic"
|
||||
|
||||
# Extract hotel_id from payload based on webhook type
|
||||
if webhook_type == "wix_form":
|
||||
# Wix forms: field:hotelid or use default
|
||||
hotel_id_from_payload = payload.get("data", {}).get("field:hotelid") if isinstance(payload.get("data"), dict) else payload.get("field:hotelid")
|
||||
if not hotel_id_from_payload:
|
||||
hotel_id_from_payload = request.app.state.config.get("default_hotel_code", "123")
|
||||
_LOGGER.info("Legacy wix-form endpoint: using default hotel_code=%s", hotel_id_from_payload)
|
||||
elif webhook_type == "generic":
|
||||
# Generic webhooks: hotel_data.hotelcode or use default
|
||||
hotel_data = payload.get("hotel_data", {})
|
||||
hotel_id_from_payload = hotel_data.get("hotelcode")
|
||||
if not hotel_id_from_payload:
|
||||
hotel_id_from_payload = request.app.state.config.get("default_hotel_code", "123")
|
||||
_LOGGER.info("Legacy generic endpoint: using default hotel_code=%s", hotel_id_from_payload)
|
||||
|
||||
_LOGGER.info(
|
||||
"Legacy endpoint detected: %s, webhook_type=%s, hotel_id=%s",
|
||||
webhook_secret,
|
||||
webhook_type,
|
||||
hotel_id_from_payload
|
||||
)
|
||||
|
||||
# Look up the webhook endpoint for this hotel and type
|
||||
result = await db_session.execute(
|
||||
select(WebhookEndpoint)
|
||||
.where(
|
||||
and_(
|
||||
WebhookEndpoint.hotel_id == hotel_id_from_payload,
|
||||
WebhookEndpoint.webhook_type == webhook_type,
|
||||
WebhookEndpoint.is_enabled == True
|
||||
)
|
||||
)
|
||||
.options(selectinload(WebhookEndpoint.hotel))
|
||||
)
|
||||
webhook_endpoint = result.scalar_one_or_none()
|
||||
|
||||
if not webhook_endpoint:
|
||||
_LOGGER.error(
|
||||
"No webhook endpoint found for legacy endpoint: hotel_id=%s, type=%s",
|
||||
hotel_id_from_payload,
|
||||
webhook_type
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail=f"No webhook configuration found for hotel {hotel_id_from_payload}"
|
||||
)
|
||||
else:
|
||||
# New secure endpoint - look up by webhook_secret
|
||||
result = await db_session.execute(
|
||||
select(WebhookEndpoint)
|
||||
.where(
|
||||
and_(
|
||||
WebhookEndpoint.webhook_secret == webhook_secret,
|
||||
WebhookEndpoint.is_enabled == True
|
||||
)
|
||||
)
|
||||
.options(selectinload(WebhookEndpoint.hotel))
|
||||
)
|
||||
webhook_endpoint = result.scalar_one_or_none()
|
||||
|
||||
if not webhook_endpoint:
|
||||
raise HTTPException(status_code=404, detail="Webhook not found")
|
||||
|
||||
# Verify hotel is active
|
||||
if not webhook_endpoint.hotel.is_active:
|
||||
raise HTTPException(status_code=404, detail="Hotel is not active")
|
||||
|
||||
# 3. Hash payload (canonical JSON for consistent hashing)
|
||||
payload_json_str = json.dumps(payload, sort_keys=True)
|
||||
payload_hash = hashlib.sha256(payload_json_str.encode("utf-8")).hexdigest()
|
||||
@@ -1015,7 +837,7 @@ async def handle_webhook_unified(
|
||||
existing = duplicate.scalar_one_or_none()
|
||||
|
||||
if existing:
|
||||
if existing.status == 'completed':
|
||||
if existing.status == WebhookStatus.COMPLETED:
|
||||
# Already processed successfully
|
||||
_LOGGER.info(
|
||||
"Webhook already processed (webhook_id=%d, hotel=%s)",
|
||||
@@ -1028,7 +850,7 @@ async def handle_webhook_unified(
|
||||
"webhook_id": existing.id,
|
||||
"duplicate": True,
|
||||
}
|
||||
elif existing.status == 'processing':
|
||||
elif existing.status == WebhookStatus.PROCESSING:
|
||||
# Another worker is processing right now
|
||||
_LOGGER.info(
|
||||
"Webhook is being processed by another worker (webhook_id=%d)",
|
||||
@@ -1040,7 +862,7 @@ async def handle_webhook_unified(
|
||||
"webhook_id": existing.id,
|
||||
"duplicate": True,
|
||||
}
|
||||
elif existing.status == 'failed':
|
||||
elif existing.status == WebhookStatus.FAILED:
|
||||
# Retry failed webhook
|
||||
_LOGGER.info(
|
||||
"Retrying failed webhook (webhook_id=%d, retry_count=%d)",
|
||||
@@ -1049,7 +871,7 @@ async def handle_webhook_unified(
|
||||
)
|
||||
webhook_request = existing
|
||||
webhook_request.retry_count += 1
|
||||
webhook_request.status = 'processing'
|
||||
webhook_request.status = WebhookStatus.PROCESSING
|
||||
webhook_request.processing_started_at = timestamp
|
||||
else:
|
||||
# 5. Create new webhook_request
|
||||
@@ -1057,9 +879,8 @@ async def handle_webhook_unified(
|
||||
payload_hash=payload_hash,
|
||||
webhook_endpoint_id=webhook_endpoint.id,
|
||||
hotel_id=webhook_endpoint.hotel_id,
|
||||
status='processing',
|
||||
status=WebhookStatus.PROCESSING,
|
||||
payload_json=payload,
|
||||
payload_size_bytes=payload_size,
|
||||
processing_started_at=timestamp,
|
||||
created_at=timestamp,
|
||||
source_ip=request.client.host if request.client else None,
|
||||
@@ -1084,7 +905,7 @@ async def handle_webhook_unified(
|
||||
)
|
||||
|
||||
# 8. Update status
|
||||
webhook_request.status = 'completed'
|
||||
webhook_request.status = WebhookStatus.COMPLETED
|
||||
webhook_request.processing_completed_at = datetime.now(UTC)
|
||||
|
||||
await db_session.commit()
|
||||
@@ -1098,7 +919,7 @@ async def handle_webhook_unified(
|
||||
except Exception as e:
|
||||
_LOGGER.exception("Error processing webhook: %s", e)
|
||||
|
||||
webhook_request.status = 'failed'
|
||||
webhook_request.status = WebhookStatus.FAILED
|
||||
webhook_request.last_error = str(e)[:2000]
|
||||
webhook_request.processing_completed_at = datetime.now(UTC)
|
||||
await db_session.commit()
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from annotatedyaml.loader import Secrets
|
||||
from annotatedyaml.loader import load_yaml as load_annotated_yaml
|
||||
@@ -334,3 +335,42 @@ def load_config():
|
||||
def get_username_for_hotel(config: dict, hotel_code: str) -> str:
|
||||
"""Get the username associated with a hotel_code from config."""
|
||||
return next(h.get("username") for h in config.get("alpine_bits_auth", []) if h.get("hotel_id") == hotel_code)
|
||||
|
||||
|
||||
def get_advertising_account_ids(
|
||||
config: dict[str, Any], hotel_code: str, fbclid: str | None, gclid: str | None
|
||||
) -> tuple[str | None, str | None]:
|
||||
"""Get advertising account IDs based on hotel config and click IDs.
|
||||
|
||||
Args:
|
||||
config: Application configuration dict
|
||||
hotel_code: Hotel identifier to look up in config
|
||||
fbclid: Facebook click ID (if present, meta_account_id will be returned)
|
||||
gclid: Google click ID (if present, google_account_id will be returned)
|
||||
|
||||
Returns:
|
||||
Tuple of (meta_account_id, google_account_id) based on conditional logic:
|
||||
- meta_account_id is set only if fbclid is present AND hotel has
|
||||
meta_account configured
|
||||
- google_account_id is set only if gclid is present AND hotel has
|
||||
google_account configured
|
||||
|
||||
"""
|
||||
meta_account_id = None
|
||||
google_account_id = None
|
||||
|
||||
# Look up hotel in config
|
||||
alpine_bits_auth = config.get("alpine_bits_auth", [])
|
||||
for hotel in alpine_bits_auth:
|
||||
if hotel.get(CONF_HOTEL_ID) == hotel_code:
|
||||
# Conditionally set meta_account_id if fbclid is present
|
||||
if fbclid:
|
||||
meta_account_id = hotel.get(CONF_META_ACCOUNT)
|
||||
|
||||
# Conditionally set google_account_id if gclid is present
|
||||
if gclid:
|
||||
google_account_id = hotel.get(CONF_GOOGLE_ACCOUNT)
|
||||
|
||||
break
|
||||
|
||||
return meta_account_id, google_account_id
|
||||
|
||||
@@ -1,7 +1,16 @@
|
||||
from enum import IntEnum
|
||||
from enum import IntEnum, StrEnum
|
||||
from typing import Final
|
||||
|
||||
|
||||
class WebhookStatus(StrEnum):
|
||||
"""Allowed webhook processing statuses for AlpineBits."""
|
||||
|
||||
PROCESSING = "processing"
|
||||
COMPLETED = "completed"
|
||||
FAILED = "failed"
|
||||
PENDING = "pending"
|
||||
|
||||
|
||||
class HttpStatusCode(IntEnum):
|
||||
"""Allowed HTTP status codes for AlpineBits responses."""
|
||||
|
||||
|
||||
@@ -4,6 +4,8 @@ import os
|
||||
from collections.abc import AsyncGenerator, Callable
|
||||
from typing import TypeVar
|
||||
|
||||
from .const import WebhookStatus
|
||||
|
||||
from sqlalchemy import (
|
||||
JSON,
|
||||
Boolean,
|
||||
@@ -751,8 +753,8 @@ class WebhookRequest(Base):
|
||||
hotel_id = Column(String(50), ForeignKey("hotels.hotel_id"), nullable=True, index=True)
|
||||
|
||||
# Processing tracking
|
||||
status = Column(String(20), nullable=False, default='pending', index=True)
|
||||
# Status values: 'pending', 'processing', 'completed', 'failed'
|
||||
status = Column(String(20), nullable=False, default=WebhookStatus.PENDING.value, index=True)
|
||||
# Status values: 'pending', 'processing', 'completed', 'failed' set by Enum WebhookStatus
|
||||
|
||||
processing_started_at = Column(DateTime(timezone=True), nullable=True)
|
||||
processing_completed_at = Column(DateTime(timezone=True), nullable=True)
|
||||
@@ -763,7 +765,6 @@ class WebhookRequest(Base):
|
||||
|
||||
# Payload storage
|
||||
payload_json = Column(JSON, nullable=True) # NULL after purge, kept for retries
|
||||
payload_size_bytes = Column(Integer, nullable=True) # Track original size
|
||||
purged_at = Column(DateTime(timezone=True), nullable=True) # When JSON was purged
|
||||
|
||||
# Metadata
|
||||
|
||||
@@ -1,18 +1,21 @@
|
||||
"""Webhook processor interface and implementations."""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from datetime import datetime
|
||||
import asyncio
|
||||
from datetime import date, datetime
|
||||
from typing import Any, Protocol
|
||||
|
||||
from fastapi import HTTPException, Request
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from alpine_bits_python.api import _LOGGER, get_advertising_account_ids
|
||||
from alpine_bits_python.config_loader import get_advertising_account_ids
|
||||
from alpine_bits_python.auth import generate_unique_id
|
||||
from alpine_bits_python.customer_service import CustomerService
|
||||
from alpine_bits_python.reservation_service import ReservationService
|
||||
from alpine_bits_python.schemas import ReservationData
|
||||
|
||||
|
||||
|
||||
from .db import Hotel, WebhookRequest
|
||||
from .logging_config import get_logger
|
||||
|
||||
@@ -81,6 +84,209 @@ class WebhookProcessorRegistry:
|
||||
return self._processors.get(webhook_type)
|
||||
|
||||
|
||||
async def process_wix_form_submission(request: Request, data: dict[str, Any], db):
|
||||
"""Shared business logic for handling Wix form submissions (test and production)."""
|
||||
timestamp = datetime.now().isoformat()
|
||||
|
||||
_LOGGER.info("Received Wix form data at %s", timestamp)
|
||||
|
||||
data = data.get("data") # Handle nested "data" key if present
|
||||
|
||||
# save customer and reservation to DB
|
||||
|
||||
contact_info = data.get("contact", {})
|
||||
first_name = contact_info.get("name", {}).get("first")
|
||||
last_name = contact_info.get("name", {}).get("last")
|
||||
email = contact_info.get("email")
|
||||
phone_number = contact_info.get("phones", [{}])[0].get("e164Phone")
|
||||
contact_info.get("locale", "de-de")
|
||||
contact_id = contact_info.get("contactId")
|
||||
|
||||
name_prefix = data.get("field:anrede")
|
||||
|
||||
email_newsletter = data.get("field:form_field_5a7b", False)
|
||||
|
||||
# if email_newsletter is a string, attempt to convert to boolean, else false
|
||||
if isinstance(email_newsletter, str):
|
||||
email_newsletter = email_newsletter.lower() in [
|
||||
"yes",
|
||||
"true",
|
||||
"1",
|
||||
"on",
|
||||
"selezionato",
|
||||
"angekreuzt",
|
||||
]
|
||||
|
||||
address_line = None
|
||||
city_name = None
|
||||
postal_code = None
|
||||
country_code = None
|
||||
gender = None
|
||||
birth_date = None
|
||||
language = data.get("contact", {}).get("locale", "en")[:2]
|
||||
|
||||
# Dates
|
||||
start_date = (
|
||||
data.get("field:date_picker_a7c8")
|
||||
or data.get("Anreisedatum")
|
||||
or data.get("submissions", [{}])[1].get("value")
|
||||
)
|
||||
end_date = (
|
||||
data.get("field:date_picker_7e65")
|
||||
or data.get("Abreisedatum")
|
||||
or data.get("submissions", [{}])[2].get("value")
|
||||
)
|
||||
|
||||
# Room/guest info
|
||||
num_adults = int(data.get("field:number_7cf5") or 1)
|
||||
num_children = int(data.get("field:anzahl_kinder") or 0)
|
||||
children_ages = []
|
||||
if num_children > 0:
|
||||
# Collect all child age fields, then take only the first num_children
|
||||
# This handles form updates that may send extra padded/zero fields
|
||||
temp_ages = []
|
||||
for k in data:
|
||||
if k.startswith("field:alter_kind_"):
|
||||
if data[k] is None or data[k] == "":
|
||||
continue
|
||||
try:
|
||||
age = int(data[k])
|
||||
temp_ages.append(age)
|
||||
except ValueError:
|
||||
_LOGGER.warning("Invalid age value for %s: %s", k, data[k])
|
||||
|
||||
# Only keep the first num_children ages, regardless of their values
|
||||
children_ages = temp_ages[:num_children]
|
||||
|
||||
offer = data.get("field:angebot_auswaehlen")
|
||||
|
||||
# get submissionId and ensure max length 35. Generate one if not present
|
||||
|
||||
unique_id = data.get("submissionId", generate_unique_id())
|
||||
|
||||
# Use CustomerService to handle customer creation/update with hashing
|
||||
customer_service = CustomerService(db)
|
||||
|
||||
customer_data = {
|
||||
"given_name": first_name,
|
||||
"surname": last_name,
|
||||
"contact_id": contact_id,
|
||||
"name_prefix": name_prefix,
|
||||
"email_address": email,
|
||||
"phone": phone_number,
|
||||
"email_newsletter": email_newsletter,
|
||||
"address_line": address_line,
|
||||
"city_name": city_name,
|
||||
"postal_code": postal_code,
|
||||
"country_code": country_code,
|
||||
"gender": gender,
|
||||
"birth_date": birth_date,
|
||||
"language": language,
|
||||
"address_catalog": False,
|
||||
"name_title": None,
|
||||
}
|
||||
|
||||
# This automatically creates/updates both Customer and HashedCustomer
|
||||
db_customer = await customer_service.get_or_create_customer(customer_data)
|
||||
|
||||
# Determine hotel_code and hotel_name
|
||||
# Priority: 1) Form field, 2) Configuration default, 3) Hardcoded fallback
|
||||
hotel_code = data.get("field:hotelid", None)
|
||||
|
||||
if hotel_code is None:
|
||||
_LOGGER.warning("No hotel_code provided in form data, using default")
|
||||
|
||||
hotel_code = request.app.state.config.get("default_hotel_code", "123")
|
||||
|
||||
hotel_name = (
|
||||
data.get("field:hotelname")
|
||||
or data.get("hotelname")
|
||||
or request.app.state.config.get("default_hotel_name")
|
||||
or "Frangart Inn" # fallback
|
||||
)
|
||||
|
||||
submissionTime = data.get("submissionTime") # 2025-10-07T05:48:41.855Z
|
||||
try:
|
||||
if submissionTime:
|
||||
submissionTime = datetime.fromisoformat(
|
||||
submissionTime[:-1]
|
||||
) # Remove Z and convert
|
||||
except Exception as e:
|
||||
_LOGGER.exception("Error parsing submissionTime: %s", e)
|
||||
submissionTime = None
|
||||
|
||||
# Extract fbclid and gclid for conditional account ID lookup
|
||||
fbclid = data.get("field:fbclid")
|
||||
gclid = data.get("field:gclid")
|
||||
|
||||
# Get advertising account IDs conditionally based on fbclid/gclid presence
|
||||
meta_account_id, google_account_id = get_advertising_account_ids(
|
||||
request.app.state.config, hotel_code, fbclid, gclid
|
||||
)
|
||||
|
||||
reservation = ReservationData(
|
||||
unique_id=unique_id,
|
||||
start_date=date.fromisoformat(start_date),
|
||||
end_date=date.fromisoformat(end_date),
|
||||
num_adults=num_adults,
|
||||
num_children=num_children,
|
||||
children_ages=children_ages,
|
||||
hotel_code=hotel_code,
|
||||
hotel_name=hotel_name,
|
||||
offer=offer,
|
||||
created_at=submissionTime,
|
||||
utm_source=data.get("field:utm_source"),
|
||||
utm_medium=data.get("field:utm_medium"),
|
||||
utm_campaign=data.get("field:utm_campaign"),
|
||||
utm_term=data.get("field:utm_term"),
|
||||
utm_content=data.get("field:utm_content"),
|
||||
user_comment=data.get("field:long_answer_3524", ""),
|
||||
fbclid=fbclid,
|
||||
gclid=gclid,
|
||||
meta_account_id=meta_account_id,
|
||||
google_account_id=google_account_id,
|
||||
)
|
||||
|
||||
if reservation.md5_unique_id is None:
|
||||
raise HTTPException(status_code=400, detail="Failed to generate md5_unique_id")
|
||||
|
||||
# Use ReservationService to create reservation
|
||||
reservation_service = ReservationService(db)
|
||||
db_reservation = await reservation_service.create_reservation(
|
||||
reservation, db_customer.id
|
||||
)
|
||||
|
||||
async def push_event():
|
||||
# Fire event for listeners (push, etc.) - hotel-specific dispatch
|
||||
dispatcher = getattr(request.app.state, "event_dispatcher", None)
|
||||
if dispatcher:
|
||||
# Get hotel_code from reservation to target the right listeners
|
||||
hotel_code = getattr(db_reservation, "hotel_code", None)
|
||||
if hotel_code and hotel_code.strip():
|
||||
await dispatcher.dispatch_for_hotel(
|
||||
"form_processed", hotel_code, db_customer, db_reservation
|
||||
)
|
||||
_LOGGER.info("Dispatched form_processed event for hotel %s", hotel_code)
|
||||
else:
|
||||
_LOGGER.warning(
|
||||
"No hotel_code in reservation, skipping push notifications"
|
||||
)
|
||||
|
||||
# Create task and store reference to prevent it from being garbage collected
|
||||
# The task runs independently and we don't need to await it here
|
||||
task = asyncio.create_task(push_event())
|
||||
# Add done callback to log any exceptions that occur in the background task
|
||||
task.add_done_callback(lambda t: t.exception() if not t.cancelled() else None)
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"message": "Wix form data received successfully",
|
||||
"received_keys": list(data.keys()),
|
||||
"timestamp": timestamp,
|
||||
"note": "No authentication required for this endpoint",
|
||||
}
|
||||
|
||||
|
||||
class WixFormProcessor:
|
||||
"""Processor for Wix form webhooks."""
|
||||
|
||||
@@ -110,7 +316,6 @@ class WixFormProcessor:
|
||||
Response dict with status and details
|
||||
"""
|
||||
# Import here to avoid circular dependency
|
||||
from .api import process_wix_form_submission
|
||||
|
||||
# Call existing processing function
|
||||
result = await process_wix_form_submission(request, payload, db_session)
|
||||
|
||||
@@ -417,7 +417,7 @@ class TestGenericWebhookEndpoint:
|
||||
"""Test successful generic webhook submission with real form data."""
|
||||
unique_id = uuid.uuid4().hex[:8]
|
||||
test_data = {
|
||||
"hotel_data": {"hotelname": "Bemelmans", "hotelcode": "39054_001"},
|
||||
"hotel_data": {"hotelname": "Bemelmans", "hotelcode": "HOTEL123"},
|
||||
"form_data": {
|
||||
"sprache": "it",
|
||||
"anreise": "14.10.2025",
|
||||
@@ -451,14 +451,14 @@ class TestGenericWebhookEndpoint:
|
||||
assert "timestamp" in data
|
||||
assert (
|
||||
data["message"]
|
||||
== "Generic webhook data received and processed successfully"
|
||||
== "Generic webhook data processed successfully"
|
||||
)
|
||||
|
||||
def test_generic_webhook_creates_customer_and_reservation(self, client):
|
||||
"""Test that webhook creates customer and reservation in database."""
|
||||
unique_id = uuid.uuid4().hex[:8]
|
||||
test_data = {
|
||||
"hotel_data": {"hotelname": "Test Hotel", "hotelcode": "TEST123"},
|
||||
"hotel_data": {"hotelname": "Test Hotel", "hotelcode": "HOTEL123"},
|
||||
"form_data": {
|
||||
"sprache": "de",
|
||||
"anreise": "25.12.2025",
|
||||
@@ -517,7 +517,7 @@ class TestGenericWebhookEndpoint:
|
||||
(r for r in reservations if r.customer_id == customer.id), None
|
||||
)
|
||||
assert reservation is not None, "Reservation should be created"
|
||||
assert reservation.hotel_code == "TEST123"
|
||||
assert reservation.hotel_code == "HOTEL123"
|
||||
assert reservation.hotel_name == "Test Hotel"
|
||||
assert reservation.num_adults == 2
|
||||
assert reservation.num_children == 1
|
||||
@@ -537,7 +537,7 @@ class TestGenericWebhookEndpoint:
|
||||
def test_generic_webhook_missing_dates(self, client):
|
||||
"""Test webhook with missing required dates."""
|
||||
test_data = {
|
||||
"hotel_data": {"hotelname": "Test", "hotelcode": "123"},
|
||||
"hotel_data": {"hotelname": "Test", "hotelcode": "HOTEL123"},
|
||||
"form_data": {
|
||||
"sprache": "de",
|
||||
"name": "John",
|
||||
@@ -555,7 +555,7 @@ class TestGenericWebhookEndpoint:
|
||||
def test_generic_webhook_invalid_date_format(self, client):
|
||||
"""Test webhook with invalid date format."""
|
||||
test_data = {
|
||||
"hotel_data": {"hotelname": "Test", "hotelcode": "123"},
|
||||
"hotel_data": {"hotelname": "Test", "hotelcode": "HOTEL123"},
|
||||
"form_data": {
|
||||
"sprache": "en",
|
||||
"anreise": "2025-10-14", # Wrong format, should be DD.MM.YYYY
|
||||
@@ -577,7 +577,7 @@ class TestGenericWebhookEndpoint:
|
||||
"""Test webhook properly handles children ages."""
|
||||
unique_id = uuid.uuid4().hex[:8]
|
||||
test_data = {
|
||||
"hotel_data": {"hotelname": "Family Hotel", "hotelcode": "FAM001"},
|
||||
"hotel_data": {"hotelname": "Family Hotel", "hotelcode": "HOTEL123"},
|
||||
"form_data": {
|
||||
"sprache": "it",
|
||||
"anreise": "01.08.2025",
|
||||
@@ -608,9 +608,9 @@ class TestGenericWebhookEndpoint:
|
||||
result = await session.execute(select(Reservation))
|
||||
reservations = result.scalars().all()
|
||||
reservation = next(
|
||||
(r for r in reservations if r.hotel_code == "FAM001"), None
|
||||
(r for r in reservations if r.hotel_code == "HOTEL123"), None
|
||||
)
|
||||
assert reservation is not None
|
||||
assert reservation is not None, "Reservation should be created"
|
||||
assert reservation.num_children == 3
|
||||
# children_ages is stored as CSV string
|
||||
children_ages = [
|
||||
@@ -631,9 +631,9 @@ class TestGenericWebhookEndpoint:
|
||||
),
|
||||
None,
|
||||
)
|
||||
assert customer is not None
|
||||
assert customer.phone is None # Empty phone should be None
|
||||
assert customer.name_prefix is None # -- should be filtered out
|
||||
assert customer is not None, "Customer should be created"
|
||||
assert customer.phone is None, "Empty phone should be None"
|
||||
assert customer.name_prefix is None, "Name prefix '--' should be filtered out"
|
||||
|
||||
import asyncio
|
||||
|
||||
@@ -914,7 +914,7 @@ class TestErrorHandling:
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
|
||||
assert response.status_code == 422
|
||||
assert response.status_code == 400
|
||||
|
||||
def test_wix_webhook_missing_required_fields(self, client):
|
||||
"""Test webhook with missing required fields."""
|
||||
|
||||
Reference in New Issue
Block a user