7 Commits

Author SHA1 Message Date
Jonas Linter
454b524077 Updated test_api.
Had to change hotel_ids used in test_requests. Previously any hotel_id was valid now only registered ones are. Doesn't make a difference in prod
2025-11-25 21:04:18 +01:00
Jonas Linter
aec8a99b71 Handling legacy endpoints directly in unified endpoints 2025-11-25 20:40:51 +01:00
Jonas Linter
4ab0888508 Moved some stuff around and fixed circular import 2025-11-25 20:30:07 +01:00
Jonas Linter
2941519899 Added an enum for Webhook Status 2025-11-25 20:20:51 +01:00
Jonas Linter
97994c5a38 Better typing + moved some code to webhook_processor 2025-11-25 20:20:40 +01:00
Jonas Linter
556eef529a Removed redundant size_field in webhook_requests 2025-11-25 20:20:06 +01:00
Jonas Linter
7b010ce65e Moved existing processing functions to webhook_processor 2025-11-25 20:19:48 +01:00
7 changed files with 377 additions and 300 deletions

View File

@@ -10,6 +10,8 @@ from typing import Sequence, Union
from alembic import op from alembic import op
import sqlalchemy as sa import sqlalchemy as sa
from alpine_bits_python.const import WebhookStatus
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
revision: str = 'e7ee03d8f430' revision: str = 'e7ee03d8f430'
@@ -66,13 +68,12 @@ def upgrade() -> None:
sa.Column('payload_hash', sa.String(length=64), nullable=False), sa.Column('payload_hash', sa.String(length=64), nullable=False),
sa.Column('webhook_endpoint_id', sa.Integer(), nullable=True), sa.Column('webhook_endpoint_id', sa.Integer(), nullable=True),
sa.Column('hotel_id', sa.String(length=50), nullable=True), sa.Column('hotel_id', sa.String(length=50), nullable=True),
sa.Column('status', sa.String(length=20), nullable=False, default='pending'), sa.Column('status', sa.String(length=20), nullable=False, default=WebhookStatus.PENDING.value),
sa.Column('processing_started_at', sa.DateTime(timezone=True), nullable=True), sa.Column('processing_started_at', sa.DateTime(timezone=True), nullable=True),
sa.Column('processing_completed_at', sa.DateTime(timezone=True), nullable=True), sa.Column('processing_completed_at', sa.DateTime(timezone=True), nullable=True),
sa.Column('retry_count', sa.Integer(), nullable=True, default=0), sa.Column('retry_count', sa.Integer(), nullable=True, default=0),
sa.Column('last_error', sa.String(length=2000), nullable=True), sa.Column('last_error', sa.String(length=2000), nullable=True),
sa.Column('payload_json', sa.JSON(), nullable=True), sa.Column('payload_json', sa.JSON(), nullable=True),
sa.Column('payload_size_bytes', sa.Integer(), nullable=True),
sa.Column('purged_at', sa.DateTime(timezone=True), nullable=True), sa.Column('purged_at', sa.DateTime(timezone=True), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False), sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
sa.Column('source_ip', sa.String(length=45), nullable=True), sa.Column('source_ip', sa.String(length=45), nullable=True),

View File

@@ -11,7 +11,7 @@ import urllib.parse
import xml.dom.minidom import xml.dom.minidom
from collections import defaultdict from collections import defaultdict
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from datetime import UTC, date, datetime, timedelta from datetime import UTC, datetime, timedelta
from functools import partial from functools import partial
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@@ -34,8 +34,7 @@ from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import async_sessionmaker from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.orm import selectinload from sqlalchemy.orm import selectinload
from alpine_bits_python.schemas import ReservationData from alpine_bits_python.webhook_processor import process_generic_webhook_submission, process_wix_form_submission
from alpine_bits_python.webhook_processor import process_generic_webhook_submission
from .alpinebits_server import ( from .alpinebits_server import (
AlpineBitsActionName, AlpineBitsActionName,
@@ -43,12 +42,11 @@ from .alpinebits_server import (
AlpineBitsServer, AlpineBitsServer,
Version, Version,
) )
from .auth import generate_unique_id, validate_api_key from .auth import validate_api_key
from .config_loader import load_config, get_username_for_hotel from .config_loader import load_config, get_username_for_hotel
from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT, HttpStatusCode from .const import HttpStatusCode, WebhookStatus
from .conversion_service import ConversionService from .conversion_service import ConversionService
from .csv_import import CSVImporter from .csv_import import CSVImporter
from .customer_service import CustomerService
from .db import Customer as DBCustomer from .db import Customer as DBCustomer
from .db import Reservation as DBReservation from .db import Reservation as DBReservation
from .db import Hotel, WebhookEndpoint, WebhookRequest from .db import Hotel, WebhookEndpoint, WebhookRequest
@@ -66,7 +64,6 @@ from .rate_limit import (
limiter, limiter,
webhook_limiter, webhook_limiter,
) )
from .reservation_service import ReservationService
from .webhook_processor import webhook_registry from .webhook_processor import webhook_registry
from .worker_coordination import is_primary_worker from .worker_coordination import is_primary_worker
@@ -82,45 +79,6 @@ security_bearer = HTTPBearer()
TOKEN_LOG_LENGTH = 10 TOKEN_LOG_LENGTH = 10
def get_advertising_account_ids(
config: dict[str, Any], hotel_code: str, fbclid: str | None, gclid: str | None
) -> tuple[str | None, str | None]:
"""Get advertising account IDs based on hotel config and click IDs.
Args:
config: Application configuration dict
hotel_code: Hotel identifier to look up in config
fbclid: Facebook click ID (if present, meta_account_id will be returned)
gclid: Google click ID (if present, google_account_id will be returned)
Returns:
Tuple of (meta_account_id, google_account_id) based on conditional logic:
- meta_account_id is set only if fbclid is present AND hotel has
meta_account configured
- google_account_id is set only if gclid is present AND hotel has
google_account configured
"""
meta_account_id = None
google_account_id = None
# Look up hotel in config
alpine_bits_auth = config.get("alpine_bits_auth", [])
for hotel in alpine_bits_auth:
if hotel.get(CONF_HOTEL_ID) == hotel_code:
# Conditionally set meta_account_id if fbclid is present
if fbclid:
meta_account_id = hotel.get(CONF_META_ACCOUNT)
# Conditionally set google_account_id if gclid is present
if gclid:
google_account_id = hotel.get(CONF_GOOGLE_ACCOUNT)
break
return meta_account_id, google_account_id
# Pydantic models for language detection # Pydantic models for language detection
class LanguageDetectionRequest(BaseModel): class LanguageDetectionRequest(BaseModel):
text: str text: str
@@ -279,12 +237,12 @@ async def cleanup_stale_webhooks(
update(WebhookRequest) update(WebhookRequest)
.where( .where(
and_( and_(
WebhookRequest.status == 'processing', WebhookRequest.status == WebhookStatus.PROCESSING,
WebhookRequest.processing_started_at < timeout_threshold WebhookRequest.processing_started_at < timeout_threshold
) )
) )
.values( .values(
status='failed', status=WebhookStatus.FAILED,
last_error='Processing timeout - worker may have crashed' last_error='Processing timeout - worker may have crashed'
) )
) )
@@ -319,7 +277,7 @@ async def purge_old_webhook_payloads(
update(WebhookRequest) update(WebhookRequest)
.where( .where(
and_( and_(
WebhookRequest.status == 'completed', WebhookRequest.status == WebhookStatus.COMPLETED,
WebhookRequest.created_at < cutoff_date, WebhookRequest.created_at < cutoff_date,
WebhookRequest.purged_at.is_(None) # Not already purged WebhookRequest.purged_at.is_(None) # Not already purged
) )
@@ -700,209 +658,6 @@ async def detect_language(
raise HTTPException(status_code=500, detail=f"Error detecting language: {e!s}") raise HTTPException(status_code=500, detail=f"Error detecting language: {e!s}")
# Extracted business logic for handling Wix form submissions
async def process_wix_form_submission(request: Request, data: dict[str, Any], db):
"""Shared business logic for handling Wix form submissions (test and production)."""
timestamp = datetime.now().isoformat()
_LOGGER.info("Received Wix form data at %s", timestamp)
data = data.get("data") # Handle nested "data" key if present
# save customer and reservation to DB
contact_info = data.get("contact", {})
first_name = contact_info.get("name", {}).get("first")
last_name = contact_info.get("name", {}).get("last")
email = contact_info.get("email")
phone_number = contact_info.get("phones", [{}])[0].get("e164Phone")
contact_info.get("locale", "de-de")
contact_id = contact_info.get("contactId")
name_prefix = data.get("field:anrede")
email_newsletter = data.get("field:form_field_5a7b", False)
# if email_newsletter is a string, attempt to convert to boolean, else false
if isinstance(email_newsletter, str):
email_newsletter = email_newsletter.lower() in [
"yes",
"true",
"1",
"on",
"selezionato",
"angekreuzt",
]
address_line = None
city_name = None
postal_code = None
country_code = None
gender = None
birth_date = None
language = data.get("contact", {}).get("locale", "en")[:2]
# Dates
start_date = (
data.get("field:date_picker_a7c8")
or data.get("Anreisedatum")
or data.get("submissions", [{}])[1].get("value")
)
end_date = (
data.get("field:date_picker_7e65")
or data.get("Abreisedatum")
or data.get("submissions", [{}])[2].get("value")
)
# Room/guest info
num_adults = int(data.get("field:number_7cf5") or 1)
num_children = int(data.get("field:anzahl_kinder") or 0)
children_ages = []
if num_children > 0:
# Collect all child age fields, then take only the first num_children
# This handles form updates that may send extra padded/zero fields
temp_ages = []
for k in data:
if k.startswith("field:alter_kind_"):
if data[k] is None or data[k] == "":
continue
try:
age = int(data[k])
temp_ages.append(age)
except ValueError:
_LOGGER.warning("Invalid age value for %s: %s", k, data[k])
# Only keep the first num_children ages, regardless of their values
children_ages = temp_ages[:num_children]
offer = data.get("field:angebot_auswaehlen")
# get submissionId and ensure max length 35. Generate one if not present
unique_id = data.get("submissionId", generate_unique_id())
# Use CustomerService to handle customer creation/update with hashing
customer_service = CustomerService(db)
customer_data = {
"given_name": first_name,
"surname": last_name,
"contact_id": contact_id,
"name_prefix": name_prefix,
"email_address": email,
"phone": phone_number,
"email_newsletter": email_newsletter,
"address_line": address_line,
"city_name": city_name,
"postal_code": postal_code,
"country_code": country_code,
"gender": gender,
"birth_date": birth_date,
"language": language,
"address_catalog": False,
"name_title": None,
}
# This automatically creates/updates both Customer and HashedCustomer
db_customer = await customer_service.get_or_create_customer(customer_data)
# Determine hotel_code and hotel_name
# Priority: 1) Form field, 2) Configuration default, 3) Hardcoded fallback
hotel_code = data.get("field:hotelid", None)
if hotel_code is None:
_LOGGER.warning("No hotel_code provided in form data, using default")
hotel_code = request.app.state.config.get("default_hotel_code", "123")
hotel_name = (
data.get("field:hotelname")
or data.get("hotelname")
or request.app.state.config.get("default_hotel_name")
or "Frangart Inn" # fallback
)
submissionTime = data.get("submissionTime") # 2025-10-07T05:48:41.855Z
try:
if submissionTime:
submissionTime = datetime.fromisoformat(
submissionTime[:-1]
) # Remove Z and convert
except Exception as e:
_LOGGER.exception("Error parsing submissionTime: %s", e)
submissionTime = None
# Extract fbclid and gclid for conditional account ID lookup
fbclid = data.get("field:fbclid")
gclid = data.get("field:gclid")
# Get advertising account IDs conditionally based on fbclid/gclid presence
meta_account_id, google_account_id = get_advertising_account_ids(
request.app.state.config, hotel_code, fbclid, gclid
)
reservation = ReservationData(
unique_id=unique_id,
start_date=date.fromisoformat(start_date),
end_date=date.fromisoformat(end_date),
num_adults=num_adults,
num_children=num_children,
children_ages=children_ages,
hotel_code=hotel_code,
hotel_name=hotel_name,
offer=offer,
created_at=submissionTime,
utm_source=data.get("field:utm_source"),
utm_medium=data.get("field:utm_medium"),
utm_campaign=data.get("field:utm_campaign"),
utm_term=data.get("field:utm_term"),
utm_content=data.get("field:utm_content"),
user_comment=data.get("field:long_answer_3524", ""),
fbclid=fbclid,
gclid=gclid,
meta_account_id=meta_account_id,
google_account_id=google_account_id,
)
if reservation.md5_unique_id is None:
raise HTTPException(status_code=400, detail="Failed to generate md5_unique_id")
# Use ReservationService to create reservation
reservation_service = ReservationService(db)
db_reservation = await reservation_service.create_reservation(
reservation, db_customer.id
)
async def push_event():
# Fire event for listeners (push, etc.) - hotel-specific dispatch
dispatcher = getattr(request.app.state, "event_dispatcher", None)
if dispatcher:
# Get hotel_code from reservation to target the right listeners
hotel_code = getattr(db_reservation, "hotel_code", None)
if hotel_code and hotel_code.strip():
await dispatcher.dispatch_for_hotel(
"form_processed", hotel_code, db_customer, db_reservation
)
_LOGGER.info("Dispatched form_processed event for hotel %s", hotel_code)
else:
_LOGGER.warning(
"No hotel_code in reservation, skipping push notifications"
)
# Create task and store reference to prevent it from being garbage collected
# The task runs independently and we don't need to await it here
task = asyncio.create_task(push_event())
# Add done callback to log any exceptions that occur in the background task
task.add_done_callback(lambda t: t.exception() if not t.cancelled() else None)
return {
"status": "success",
"message": "Wix form data received successfully",
"received_keys": list(data.keys()),
"timestamp": timestamp,
"note": "No authentication required for this endpoint",
}
async def validate_basic_auth( async def validate_basic_auth(
credentials: HTTPBasicCredentials = Depends(security_basic), credentials: HTTPBasicCredentials = Depends(security_basic),
@@ -950,8 +705,13 @@ async def handle_webhook_unified(
): ):
"""Unified webhook handler with deduplication and routing. """Unified webhook handler with deduplication and routing.
Supports both new secure webhook URLs and legacy endpoints:
- /webhook/{64-char-secret} - New secure endpoints
- /webhook/wix-form - Legacy Wix form endpoint (extracts hotel from payload)
- /webhook/generic - Legacy generic webhook endpoint (extracts hotel from payload)
Flow: Flow:
1. Look up webhook_endpoint by webhook_secret 1. Look up webhook_endpoint by webhook_secret (or detect legacy endpoint)
2. Parse and hash payload (SHA256) 2. Parse and hash payload (SHA256)
3. Check for duplicate using SELECT FOR UPDATE SKIP LOCKED 3. Check for duplicate using SELECT FOR UPDATE SKIP LOCKED
4. If duplicate and completed: return success (idempotent) 4. If duplicate and completed: return success (idempotent)
@@ -963,23 +723,7 @@ async def handle_webhook_unified(
""" """
timestamp = datetime.now(UTC) timestamp = datetime.now(UTC)
# 1. Look up webhook_endpoint # 2. Parse payload first (needed for legacy endpoint detection)
result = await db_session.execute(
select(WebhookEndpoint)
.where(
and_(
WebhookEndpoint.webhook_secret == webhook_secret,
WebhookEndpoint.is_enabled == True
)
)
.options(selectinload(WebhookEndpoint.hotel))
)
webhook_endpoint = result.scalar_one_or_none()
if not webhook_endpoint or not webhook_endpoint.hotel.is_active:
raise HTTPException(status_code=404, detail="Webhook not found")
# 2. Parse payload
body = await request.body() body = await request.body()
# Handle gzip compression # Handle gzip compression
@@ -996,6 +740,84 @@ async def handle_webhook_unified(
_LOGGER.error("Failed to parse JSON payload: %s", e) _LOGGER.error("Failed to parse JSON payload: %s", e)
raise HTTPException(status_code=400, detail="Invalid JSON payload") raise HTTPException(status_code=400, detail="Invalid JSON payload")
# 1. Detect if this is a legacy endpoint or look up webhook_endpoint
webhook_endpoint: WebhookEndpoint | None = None
is_legacy = False
webhook_type = None
hotel_id_from_payload = None
# Check if webhook_secret looks like a legacy endpoint name
if webhook_secret in ("wix-form", "generic"):
is_legacy = True
webhook_type = "wix_form" if webhook_secret == "wix-form" else "generic"
# Extract hotel_id from payload based on webhook type
if webhook_type == "wix_form":
# Wix forms: field:hotelid or use default
hotel_id_from_payload = payload.get("data", {}).get("field:hotelid") if isinstance(payload.get("data"), dict) else payload.get("field:hotelid")
if not hotel_id_from_payload:
hotel_id_from_payload = request.app.state.config.get("default_hotel_code", "123")
_LOGGER.info("Legacy wix-form endpoint: using default hotel_code=%s", hotel_id_from_payload)
elif webhook_type == "generic":
# Generic webhooks: hotel_data.hotelcode or use default
hotel_data = payload.get("hotel_data", {})
hotel_id_from_payload = hotel_data.get("hotelcode")
if not hotel_id_from_payload:
hotel_id_from_payload = request.app.state.config.get("default_hotel_code", "123")
_LOGGER.info("Legacy generic endpoint: using default hotel_code=%s", hotel_id_from_payload)
_LOGGER.info(
"Legacy endpoint detected: %s, webhook_type=%s, hotel_id=%s",
webhook_secret,
webhook_type,
hotel_id_from_payload
)
# Look up the webhook endpoint for this hotel and type
result = await db_session.execute(
select(WebhookEndpoint)
.where(
and_(
WebhookEndpoint.hotel_id == hotel_id_from_payload,
WebhookEndpoint.webhook_type == webhook_type,
WebhookEndpoint.is_enabled == True
)
)
.options(selectinload(WebhookEndpoint.hotel))
)
webhook_endpoint = result.scalar_one_or_none()
if not webhook_endpoint:
_LOGGER.error(
"No webhook endpoint found for legacy endpoint: hotel_id=%s, type=%s",
hotel_id_from_payload,
webhook_type
)
raise HTTPException(
status_code=404,
detail=f"No webhook configuration found for hotel {hotel_id_from_payload}"
)
else:
# New secure endpoint - look up by webhook_secret
result = await db_session.execute(
select(WebhookEndpoint)
.where(
and_(
WebhookEndpoint.webhook_secret == webhook_secret,
WebhookEndpoint.is_enabled == True
)
)
.options(selectinload(WebhookEndpoint.hotel))
)
webhook_endpoint = result.scalar_one_or_none()
if not webhook_endpoint:
raise HTTPException(status_code=404, detail="Webhook not found")
# Verify hotel is active
if not webhook_endpoint.hotel.is_active:
raise HTTPException(status_code=404, detail="Hotel is not active")
# 3. Hash payload (canonical JSON for consistent hashing) # 3. Hash payload (canonical JSON for consistent hashing)
payload_json_str = json.dumps(payload, sort_keys=True) payload_json_str = json.dumps(payload, sort_keys=True)
payload_hash = hashlib.sha256(payload_json_str.encode("utf-8")).hexdigest() payload_hash = hashlib.sha256(payload_json_str.encode("utf-8")).hexdigest()
@@ -1015,7 +837,7 @@ async def handle_webhook_unified(
existing = duplicate.scalar_one_or_none() existing = duplicate.scalar_one_or_none()
if existing: if existing:
if existing.status == 'completed': if existing.status == WebhookStatus.COMPLETED:
# Already processed successfully # Already processed successfully
_LOGGER.info( _LOGGER.info(
"Webhook already processed (webhook_id=%d, hotel=%s)", "Webhook already processed (webhook_id=%d, hotel=%s)",
@@ -1028,7 +850,7 @@ async def handle_webhook_unified(
"webhook_id": existing.id, "webhook_id": existing.id,
"duplicate": True, "duplicate": True,
} }
elif existing.status == 'processing': elif existing.status == WebhookStatus.PROCESSING:
# Another worker is processing right now # Another worker is processing right now
_LOGGER.info( _LOGGER.info(
"Webhook is being processed by another worker (webhook_id=%d)", "Webhook is being processed by another worker (webhook_id=%d)",
@@ -1040,7 +862,7 @@ async def handle_webhook_unified(
"webhook_id": existing.id, "webhook_id": existing.id,
"duplicate": True, "duplicate": True,
} }
elif existing.status == 'failed': elif existing.status == WebhookStatus.FAILED:
# Retry failed webhook # Retry failed webhook
_LOGGER.info( _LOGGER.info(
"Retrying failed webhook (webhook_id=%d, retry_count=%d)", "Retrying failed webhook (webhook_id=%d, retry_count=%d)",
@@ -1049,7 +871,7 @@ async def handle_webhook_unified(
) )
webhook_request = existing webhook_request = existing
webhook_request.retry_count += 1 webhook_request.retry_count += 1
webhook_request.status = 'processing' webhook_request.status = WebhookStatus.PROCESSING
webhook_request.processing_started_at = timestamp webhook_request.processing_started_at = timestamp
else: else:
# 5. Create new webhook_request # 5. Create new webhook_request
@@ -1057,9 +879,8 @@ async def handle_webhook_unified(
payload_hash=payload_hash, payload_hash=payload_hash,
webhook_endpoint_id=webhook_endpoint.id, webhook_endpoint_id=webhook_endpoint.id,
hotel_id=webhook_endpoint.hotel_id, hotel_id=webhook_endpoint.hotel_id,
status='processing', status=WebhookStatus.PROCESSING,
payload_json=payload, payload_json=payload,
payload_size_bytes=payload_size,
processing_started_at=timestamp, processing_started_at=timestamp,
created_at=timestamp, created_at=timestamp,
source_ip=request.client.host if request.client else None, source_ip=request.client.host if request.client else None,
@@ -1084,7 +905,7 @@ async def handle_webhook_unified(
) )
# 8. Update status # 8. Update status
webhook_request.status = 'completed' webhook_request.status = WebhookStatus.COMPLETED
webhook_request.processing_completed_at = datetime.now(UTC) webhook_request.processing_completed_at = datetime.now(UTC)
await db_session.commit() await db_session.commit()
@@ -1098,7 +919,7 @@ async def handle_webhook_unified(
except Exception as e: except Exception as e:
_LOGGER.exception("Error processing webhook: %s", e) _LOGGER.exception("Error processing webhook: %s", e)
webhook_request.status = 'failed' webhook_request.status = WebhookStatus.FAILED
webhook_request.last_error = str(e)[:2000] webhook_request.last_error = str(e)[:2000]
webhook_request.processing_completed_at = datetime.now(UTC) webhook_request.processing_completed_at = datetime.now(UTC)
await db_session.commit() await db_session.commit()

View File

@@ -1,5 +1,6 @@
import os import os
from pathlib import Path from pathlib import Path
from typing import Any
from annotatedyaml.loader import Secrets from annotatedyaml.loader import Secrets
from annotatedyaml.loader import load_yaml as load_annotated_yaml from annotatedyaml.loader import load_yaml as load_annotated_yaml
@@ -334,3 +335,42 @@ def load_config():
def get_username_for_hotel(config: dict, hotel_code: str) -> str: def get_username_for_hotel(config: dict, hotel_code: str) -> str:
"""Get the username associated with a hotel_code from config.""" """Get the username associated with a hotel_code from config."""
return next(h.get("username") for h in config.get("alpine_bits_auth", []) if h.get("hotel_id") == hotel_code) return next(h.get("username") for h in config.get("alpine_bits_auth", []) if h.get("hotel_id") == hotel_code)
def get_advertising_account_ids(
config: dict[str, Any], hotel_code: str, fbclid: str | None, gclid: str | None
) -> tuple[str | None, str | None]:
"""Get advertising account IDs based on hotel config and click IDs.
Args:
config: Application configuration dict
hotel_code: Hotel identifier to look up in config
fbclid: Facebook click ID (if present, meta_account_id will be returned)
gclid: Google click ID (if present, google_account_id will be returned)
Returns:
Tuple of (meta_account_id, google_account_id) based on conditional logic:
- meta_account_id is set only if fbclid is present AND hotel has
meta_account configured
- google_account_id is set only if gclid is present AND hotel has
google_account configured
"""
meta_account_id = None
google_account_id = None
# Look up hotel in config
alpine_bits_auth = config.get("alpine_bits_auth", [])
for hotel in alpine_bits_auth:
if hotel.get(CONF_HOTEL_ID) == hotel_code:
# Conditionally set meta_account_id if fbclid is present
if fbclid:
meta_account_id = hotel.get(CONF_META_ACCOUNT)
# Conditionally set google_account_id if gclid is present
if gclid:
google_account_id = hotel.get(CONF_GOOGLE_ACCOUNT)
break
return meta_account_id, google_account_id

View File

@@ -1,7 +1,16 @@
from enum import IntEnum from enum import IntEnum, StrEnum
from typing import Final from typing import Final
class WebhookStatus(StrEnum):
"""Allowed webhook processing statuses for AlpineBits."""
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
PENDING = "pending"
class HttpStatusCode(IntEnum): class HttpStatusCode(IntEnum):
"""Allowed HTTP status codes for AlpineBits responses.""" """Allowed HTTP status codes for AlpineBits responses."""

View File

@@ -4,6 +4,8 @@ import os
from collections.abc import AsyncGenerator, Callable from collections.abc import AsyncGenerator, Callable
from typing import TypeVar from typing import TypeVar
from .const import WebhookStatus
from sqlalchemy import ( from sqlalchemy import (
JSON, JSON,
Boolean, Boolean,
@@ -751,8 +753,8 @@ class WebhookRequest(Base):
hotel_id = Column(String(50), ForeignKey("hotels.hotel_id"), nullable=True, index=True) hotel_id = Column(String(50), ForeignKey("hotels.hotel_id"), nullable=True, index=True)
# Processing tracking # Processing tracking
status = Column(String(20), nullable=False, default='pending', index=True) status = Column(String(20), nullable=False, default=WebhookStatus.PENDING.value, index=True)
# Status values: 'pending', 'processing', 'completed', 'failed' # Status values: 'pending', 'processing', 'completed', 'failed' set by Enum WebhookStatus
processing_started_at = Column(DateTime(timezone=True), nullable=True) processing_started_at = Column(DateTime(timezone=True), nullable=True)
processing_completed_at = Column(DateTime(timezone=True), nullable=True) processing_completed_at = Column(DateTime(timezone=True), nullable=True)
@@ -763,7 +765,6 @@ class WebhookRequest(Base):
# Payload storage # Payload storage
payload_json = Column(JSON, nullable=True) # NULL after purge, kept for retries payload_json = Column(JSON, nullable=True) # NULL after purge, kept for retries
payload_size_bytes = Column(Integer, nullable=True) # Track original size
purged_at = Column(DateTime(timezone=True), nullable=True) # When JSON was purged purged_at = Column(DateTime(timezone=True), nullable=True) # When JSON was purged
# Metadata # Metadata

View File

@@ -1,18 +1,21 @@
"""Webhook processor interface and implementations.""" """Webhook processor interface and implementations."""
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from datetime import datetime import asyncio
from datetime import date, datetime
from typing import Any, Protocol from typing import Any, Protocol
from fastapi import HTTPException, Request from fastapi import HTTPException, Request
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from alpine_bits_python.api import _LOGGER, get_advertising_account_ids from alpine_bits_python.config_loader import get_advertising_account_ids
from alpine_bits_python.auth import generate_unique_id from alpine_bits_python.auth import generate_unique_id
from alpine_bits_python.customer_service import CustomerService from alpine_bits_python.customer_service import CustomerService
from alpine_bits_python.reservation_service import ReservationService from alpine_bits_python.reservation_service import ReservationService
from alpine_bits_python.schemas import ReservationData from alpine_bits_python.schemas import ReservationData
from .db import Hotel, WebhookRequest from .db import Hotel, WebhookRequest
from .logging_config import get_logger from .logging_config import get_logger
@@ -81,6 +84,209 @@ class WebhookProcessorRegistry:
return self._processors.get(webhook_type) return self._processors.get(webhook_type)
async def process_wix_form_submission(request: Request, data: dict[str, Any], db):
"""Shared business logic for handling Wix form submissions (test and production)."""
timestamp = datetime.now().isoformat()
_LOGGER.info("Received Wix form data at %s", timestamp)
data = data.get("data") # Handle nested "data" key if present
# save customer and reservation to DB
contact_info = data.get("contact", {})
first_name = contact_info.get("name", {}).get("first")
last_name = contact_info.get("name", {}).get("last")
email = contact_info.get("email")
phone_number = contact_info.get("phones", [{}])[0].get("e164Phone")
contact_info.get("locale", "de-de")
contact_id = contact_info.get("contactId")
name_prefix = data.get("field:anrede")
email_newsletter = data.get("field:form_field_5a7b", False)
# if email_newsletter is a string, attempt to convert to boolean, else false
if isinstance(email_newsletter, str):
email_newsletter = email_newsletter.lower() in [
"yes",
"true",
"1",
"on",
"selezionato",
"angekreuzt",
]
address_line = None
city_name = None
postal_code = None
country_code = None
gender = None
birth_date = None
language = data.get("contact", {}).get("locale", "en")[:2]
# Dates
start_date = (
data.get("field:date_picker_a7c8")
or data.get("Anreisedatum")
or data.get("submissions", [{}])[1].get("value")
)
end_date = (
data.get("field:date_picker_7e65")
or data.get("Abreisedatum")
or data.get("submissions", [{}])[2].get("value")
)
# Room/guest info
num_adults = int(data.get("field:number_7cf5") or 1)
num_children = int(data.get("field:anzahl_kinder") or 0)
children_ages = []
if num_children > 0:
# Collect all child age fields, then take only the first num_children
# This handles form updates that may send extra padded/zero fields
temp_ages = []
for k in data:
if k.startswith("field:alter_kind_"):
if data[k] is None or data[k] == "":
continue
try:
age = int(data[k])
temp_ages.append(age)
except ValueError:
_LOGGER.warning("Invalid age value for %s: %s", k, data[k])
# Only keep the first num_children ages, regardless of their values
children_ages = temp_ages[:num_children]
offer = data.get("field:angebot_auswaehlen")
# get submissionId and ensure max length 35. Generate one if not present
unique_id = data.get("submissionId", generate_unique_id())
# Use CustomerService to handle customer creation/update with hashing
customer_service = CustomerService(db)
customer_data = {
"given_name": first_name,
"surname": last_name,
"contact_id": contact_id,
"name_prefix": name_prefix,
"email_address": email,
"phone": phone_number,
"email_newsletter": email_newsletter,
"address_line": address_line,
"city_name": city_name,
"postal_code": postal_code,
"country_code": country_code,
"gender": gender,
"birth_date": birth_date,
"language": language,
"address_catalog": False,
"name_title": None,
}
# This automatically creates/updates both Customer and HashedCustomer
db_customer = await customer_service.get_or_create_customer(customer_data)
# Determine hotel_code and hotel_name
# Priority: 1) Form field, 2) Configuration default, 3) Hardcoded fallback
hotel_code = data.get("field:hotelid", None)
if hotel_code is None:
_LOGGER.warning("No hotel_code provided in form data, using default")
hotel_code = request.app.state.config.get("default_hotel_code", "123")
hotel_name = (
data.get("field:hotelname")
or data.get("hotelname")
or request.app.state.config.get("default_hotel_name")
or "Frangart Inn" # fallback
)
submissionTime = data.get("submissionTime") # 2025-10-07T05:48:41.855Z
try:
if submissionTime:
submissionTime = datetime.fromisoformat(
submissionTime[:-1]
) # Remove Z and convert
except Exception as e:
_LOGGER.exception("Error parsing submissionTime: %s", e)
submissionTime = None
# Extract fbclid and gclid for conditional account ID lookup
fbclid = data.get("field:fbclid")
gclid = data.get("field:gclid")
# Get advertising account IDs conditionally based on fbclid/gclid presence
meta_account_id, google_account_id = get_advertising_account_ids(
request.app.state.config, hotel_code, fbclid, gclid
)
reservation = ReservationData(
unique_id=unique_id,
start_date=date.fromisoformat(start_date),
end_date=date.fromisoformat(end_date),
num_adults=num_adults,
num_children=num_children,
children_ages=children_ages,
hotel_code=hotel_code,
hotel_name=hotel_name,
offer=offer,
created_at=submissionTime,
utm_source=data.get("field:utm_source"),
utm_medium=data.get("field:utm_medium"),
utm_campaign=data.get("field:utm_campaign"),
utm_term=data.get("field:utm_term"),
utm_content=data.get("field:utm_content"),
user_comment=data.get("field:long_answer_3524", ""),
fbclid=fbclid,
gclid=gclid,
meta_account_id=meta_account_id,
google_account_id=google_account_id,
)
if reservation.md5_unique_id is None:
raise HTTPException(status_code=400, detail="Failed to generate md5_unique_id")
# Use ReservationService to create reservation
reservation_service = ReservationService(db)
db_reservation = await reservation_service.create_reservation(
reservation, db_customer.id
)
async def push_event():
# Fire event for listeners (push, etc.) - hotel-specific dispatch
dispatcher = getattr(request.app.state, "event_dispatcher", None)
if dispatcher:
# Get hotel_code from reservation to target the right listeners
hotel_code = getattr(db_reservation, "hotel_code", None)
if hotel_code and hotel_code.strip():
await dispatcher.dispatch_for_hotel(
"form_processed", hotel_code, db_customer, db_reservation
)
_LOGGER.info("Dispatched form_processed event for hotel %s", hotel_code)
else:
_LOGGER.warning(
"No hotel_code in reservation, skipping push notifications"
)
# Create task and store reference to prevent it from being garbage collected
# The task runs independently and we don't need to await it here
task = asyncio.create_task(push_event())
# Add done callback to log any exceptions that occur in the background task
task.add_done_callback(lambda t: t.exception() if not t.cancelled() else None)
return {
"status": "success",
"message": "Wix form data received successfully",
"received_keys": list(data.keys()),
"timestamp": timestamp,
"note": "No authentication required for this endpoint",
}
class WixFormProcessor: class WixFormProcessor:
"""Processor for Wix form webhooks.""" """Processor for Wix form webhooks."""
@@ -110,7 +316,6 @@ class WixFormProcessor:
Response dict with status and details Response dict with status and details
""" """
# Import here to avoid circular dependency # Import here to avoid circular dependency
from .api import process_wix_form_submission
# Call existing processing function # Call existing processing function
result = await process_wix_form_submission(request, payload, db_session) result = await process_wix_form_submission(request, payload, db_session)

View File

@@ -417,7 +417,7 @@ class TestGenericWebhookEndpoint:
"""Test successful generic webhook submission with real form data.""" """Test successful generic webhook submission with real form data."""
unique_id = uuid.uuid4().hex[:8] unique_id = uuid.uuid4().hex[:8]
test_data = { test_data = {
"hotel_data": {"hotelname": "Bemelmans", "hotelcode": "39054_001"}, "hotel_data": {"hotelname": "Bemelmans", "hotelcode": "HOTEL123"},
"form_data": { "form_data": {
"sprache": "it", "sprache": "it",
"anreise": "14.10.2025", "anreise": "14.10.2025",
@@ -451,14 +451,14 @@ class TestGenericWebhookEndpoint:
assert "timestamp" in data assert "timestamp" in data
assert ( assert (
data["message"] data["message"]
== "Generic webhook data received and processed successfully" == "Generic webhook data processed successfully"
) )
def test_generic_webhook_creates_customer_and_reservation(self, client): def test_generic_webhook_creates_customer_and_reservation(self, client):
"""Test that webhook creates customer and reservation in database.""" """Test that webhook creates customer and reservation in database."""
unique_id = uuid.uuid4().hex[:8] unique_id = uuid.uuid4().hex[:8]
test_data = { test_data = {
"hotel_data": {"hotelname": "Test Hotel", "hotelcode": "TEST123"}, "hotel_data": {"hotelname": "Test Hotel", "hotelcode": "HOTEL123"},
"form_data": { "form_data": {
"sprache": "de", "sprache": "de",
"anreise": "25.12.2025", "anreise": "25.12.2025",
@@ -517,7 +517,7 @@ class TestGenericWebhookEndpoint:
(r for r in reservations if r.customer_id == customer.id), None (r for r in reservations if r.customer_id == customer.id), None
) )
assert reservation is not None, "Reservation should be created" assert reservation is not None, "Reservation should be created"
assert reservation.hotel_code == "TEST123" assert reservation.hotel_code == "HOTEL123"
assert reservation.hotel_name == "Test Hotel" assert reservation.hotel_name == "Test Hotel"
assert reservation.num_adults == 2 assert reservation.num_adults == 2
assert reservation.num_children == 1 assert reservation.num_children == 1
@@ -537,7 +537,7 @@ class TestGenericWebhookEndpoint:
def test_generic_webhook_missing_dates(self, client): def test_generic_webhook_missing_dates(self, client):
"""Test webhook with missing required dates.""" """Test webhook with missing required dates."""
test_data = { test_data = {
"hotel_data": {"hotelname": "Test", "hotelcode": "123"}, "hotel_data": {"hotelname": "Test", "hotelcode": "HOTEL123"},
"form_data": { "form_data": {
"sprache": "de", "sprache": "de",
"name": "John", "name": "John",
@@ -555,7 +555,7 @@ class TestGenericWebhookEndpoint:
def test_generic_webhook_invalid_date_format(self, client): def test_generic_webhook_invalid_date_format(self, client):
"""Test webhook with invalid date format.""" """Test webhook with invalid date format."""
test_data = { test_data = {
"hotel_data": {"hotelname": "Test", "hotelcode": "123"}, "hotel_data": {"hotelname": "Test", "hotelcode": "HOTEL123"},
"form_data": { "form_data": {
"sprache": "en", "sprache": "en",
"anreise": "2025-10-14", # Wrong format, should be DD.MM.YYYY "anreise": "2025-10-14", # Wrong format, should be DD.MM.YYYY
@@ -577,7 +577,7 @@ class TestGenericWebhookEndpoint:
"""Test webhook properly handles children ages.""" """Test webhook properly handles children ages."""
unique_id = uuid.uuid4().hex[:8] unique_id = uuid.uuid4().hex[:8]
test_data = { test_data = {
"hotel_data": {"hotelname": "Family Hotel", "hotelcode": "FAM001"}, "hotel_data": {"hotelname": "Family Hotel", "hotelcode": "HOTEL123"},
"form_data": { "form_data": {
"sprache": "it", "sprache": "it",
"anreise": "01.08.2025", "anreise": "01.08.2025",
@@ -608,9 +608,9 @@ class TestGenericWebhookEndpoint:
result = await session.execute(select(Reservation)) result = await session.execute(select(Reservation))
reservations = result.scalars().all() reservations = result.scalars().all()
reservation = next( reservation = next(
(r for r in reservations if r.hotel_code == "FAM001"), None (r for r in reservations if r.hotel_code == "HOTEL123"), None
) )
assert reservation is not None assert reservation is not None, "Reservation should be created"
assert reservation.num_children == 3 assert reservation.num_children == 3
# children_ages is stored as CSV string # children_ages is stored as CSV string
children_ages = [ children_ages = [
@@ -631,9 +631,9 @@ class TestGenericWebhookEndpoint:
), ),
None, None,
) )
assert customer is not None assert customer is not None, "Customer should be created"
assert customer.phone is None # Empty phone should be None assert customer.phone is None, "Empty phone should be None"
assert customer.name_prefix is None # -- should be filtered out assert customer.name_prefix is None, "Name prefix '--' should be filtered out"
import asyncio import asyncio
@@ -914,7 +914,7 @@ class TestErrorHandling:
headers={"Content-Type": "application/json"}, headers={"Content-Type": "application/json"},
) )
assert response.status_code == 422 assert response.status_code == 400
def test_wix_webhook_missing_required_fields(self, client): def test_wix_webhook_missing_required_fields(self, client):
"""Test webhook with missing required fields.""" """Test webhook with missing required fields."""