2 Commits

Author SHA1 Message Date
Jonas Linter
e2d0ef8e53 Some more refactoring. Push_events don't work at the moment 2025-11-27 15:33:15 +01:00
Jonas Linter
f89236228d Catch integrity errors gracefully instead of dumping a giant stacktrace 2025-11-27 14:47:05 +01:00
4 changed files with 1330209 additions and 323 deletions

1329910
meta_insights_dump2025_11_27.sql Normal file

File diff suppressed because one or more lines are too long

View File

@@ -6,7 +6,6 @@ import hashlib
import json
import multiprocessing
import os
import traceback
import urllib.parse
import xml.dom.minidom
from collections import defaultdict
@@ -30,12 +29,9 @@ from fastapi.security import (
from pydantic import BaseModel
from slowapi.errors import RateLimitExceeded
from sqlalchemy import and_, select, update
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.orm import selectinload
from alpine_bits_python.webhook_processor import process_generic_webhook_submission, process_wix_form_submission
from .alpinebits_server import (
AlpineBitsActionName,
AlpineBitsClientInfo,
@@ -43,14 +39,19 @@ from .alpinebits_server import (
Version,
)
from .auth import validate_api_key
from .config_loader import load_config, get_username_for_hotel
from .config_loader import get_username_for_hotel, load_config
from .const import HttpStatusCode, WebhookStatus
from .conversion_service import ConversionService
from .csv_import import CSVImporter
from .db import Customer as DBCustomer
from .db import Reservation as DBReservation
from .db import Hotel, WebhookEndpoint, WebhookRequest
from .db import ResilientAsyncSession, SessionMaker, create_database_engine
from .db import (
ResilientAsyncSession,
SessionMaker,
WebhookEndpoint,
WebhookRequest,
create_database_engine,
)
from .db_setup import run_startup_tasks
from .email_monitoring import ReservationStatsCollector
from .email_service import create_email_service
@@ -218,8 +219,7 @@ async def push_listener(customer: DBCustomer, reservation: DBReservation, hotel)
async def cleanup_stale_webhooks(
async_sessionmaker: async_sessionmaker,
timeout_minutes: int = 10
async_sessionmaker: async_sessionmaker, timeout_minutes: int = 10
) -> int:
"""Reset webhooks stuck in 'processing' (worker crashed).
@@ -229,6 +229,7 @@ async def cleanup_stale_webhooks(
Returns:
Number of stale webhooks reset
"""
timeout_threshold = datetime.now(UTC) - timedelta(minutes=timeout_minutes)
@@ -238,12 +239,12 @@ async def cleanup_stale_webhooks(
.where(
and_(
WebhookRequest.status == WebhookStatus.PROCESSING,
WebhookRequest.processing_started_at < timeout_threshold
WebhookRequest.processing_started_at < timeout_threshold,
)
)
.values(
status=WebhookStatus.FAILED,
last_error='Processing timeout - worker may have crashed'
last_error="Processing timeout - worker may have crashed",
)
)
await session.commit()
@@ -256,8 +257,7 @@ async def cleanup_stale_webhooks(
async def purge_old_webhook_payloads(
async_sessionmaker: async_sessionmaker,
retention_days: int = 7
async_sessionmaker: async_sessionmaker, retention_days: int = 7
) -> int:
"""Purge payload_json from old completed webhooks.
@@ -269,6 +269,7 @@ async def purge_old_webhook_payloads(
Returns:
Number of payloads purged
"""
cutoff_date = datetime.now(UTC) - timedelta(days=retention_days)
@@ -279,13 +280,10 @@ async def purge_old_webhook_payloads(
and_(
WebhookRequest.status == WebhookStatus.COMPLETED,
WebhookRequest.created_at < cutoff_date,
WebhookRequest.purged_at.is_(None) # Not already purged
WebhookRequest.purged_at.is_(None), # Not already purged
)
)
.values(
payload_json=None,
purged_at=datetime.now(UTC)
)
.values(payload_json=None, purged_at=datetime.now(UTC))
)
await session.commit()
count = result.rowcount
@@ -303,6 +301,7 @@ async def periodic_webhook_cleanup(async_sessionmaker: async_sessionmaker):
Args:
async_sessionmaker: SQLAlchemy async sessionmaker
"""
try:
# Clean up stale webhooks (stuck in 'processing')
@@ -314,7 +313,7 @@ async def periodic_webhook_cleanup(async_sessionmaker: async_sessionmaker):
_LOGGER.debug(
"Webhook cleanup: %d stale reset, %d payloads purged",
stale_count,
purged_count
purged_count,
)
except Exception as e:
_LOGGER.exception("Error during periodic webhook cleanup: %s", e)
@@ -382,6 +381,7 @@ async def lifespan(app: FastAPI):
# Initialize webhook processors
from .webhook_processor import initialize_webhook_processors
initialize_webhook_processors()
_LOGGER.info("Webhook processors initialized")
@@ -433,6 +433,7 @@ async def lifespan(app: FastAPI):
# Start periodic webhook cleanup (only on primary worker)
cleanup_task = None
if is_primary:
async def run_periodic_cleanup():
"""Run cleanup tasks every 5 minutes."""
while True:
@@ -658,7 +659,6 @@ async def detect_language(
raise HTTPException(status_code=500, detail=f"Error detecting language: {e!s}")
async def validate_basic_auth(
credentials: HTTPBasicCredentials = Depends(security_basic),
) -> str:
@@ -754,23 +754,37 @@ async def handle_webhook_unified(
# Extract hotel_id from payload based on webhook type
if webhook_type == "wix_form":
# Wix forms: field:hotelid or use default
hotel_id_from_payload = payload.get("data", {}).get("field:hotelid") if isinstance(payload.get("data"), dict) else payload.get("field:hotelid")
hotel_id_from_payload = (
payload.get("data", {}).get("field:hotelid")
if isinstance(payload.get("data"), dict)
else payload.get("field:hotelid")
)
if not hotel_id_from_payload:
hotel_id_from_payload = request.app.state.config.get("default_hotel_code", "123")
_LOGGER.info("Legacy wix-form endpoint: using default hotel_code=%s", hotel_id_from_payload)
hotel_id_from_payload = request.app.state.config.get(
"default_hotel_code", "123"
)
_LOGGER.info(
"Legacy wix-form endpoint: using default hotel_code=%s",
hotel_id_from_payload,
)
elif webhook_type == "generic":
# Generic webhooks: hotel_data.hotelcode or use default
hotel_data = payload.get("hotel_data", {})
hotel_id_from_payload = hotel_data.get("hotelcode")
if not hotel_id_from_payload:
hotel_id_from_payload = request.app.state.config.get("default_hotel_code", "123")
_LOGGER.info("Legacy generic endpoint: using default hotel_code=%s", hotel_id_from_payload)
hotel_id_from_payload = request.app.state.config.get(
"default_hotel_code", "123"
)
_LOGGER.info(
"Legacy generic endpoint: using default hotel_code=%s",
hotel_id_from_payload,
)
_LOGGER.info(
"Legacy endpoint detected: %s, webhook_type=%s, hotel_id=%s",
webhook_secret,
webhook_type,
hotel_id_from_payload
hotel_id_from_payload,
)
# Look up the webhook endpoint for this hotel and type
@@ -780,7 +794,7 @@ async def handle_webhook_unified(
and_(
WebhookEndpoint.hotel_id == hotel_id_from_payload,
WebhookEndpoint.webhook_type == webhook_type,
WebhookEndpoint.is_enabled == True
WebhookEndpoint.is_enabled == True,
)
)
.options(selectinload(WebhookEndpoint.hotel))
@@ -791,11 +805,11 @@ async def handle_webhook_unified(
_LOGGER.error(
"No webhook endpoint found for legacy endpoint: hotel_id=%s, type=%s",
hotel_id_from_payload,
webhook_type
webhook_type,
)
raise HTTPException(
status_code=404,
detail=f"No webhook configuration found for hotel {hotel_id_from_payload}"
detail=f"No webhook configuration found for hotel {hotel_id_from_payload}",
)
else:
# New secure endpoint - look up by webhook_secret
@@ -804,7 +818,7 @@ async def handle_webhook_unified(
.where(
and_(
WebhookEndpoint.webhook_secret == webhook_secret,
WebhookEndpoint.is_enabled == True
WebhookEndpoint.is_enabled == True,
)
)
.options(selectinload(WebhookEndpoint.hotel))
@@ -842,7 +856,7 @@ async def handle_webhook_unified(
_LOGGER.info(
"Webhook already processed (webhook_id=%d, hotel=%s)",
existing.id,
webhook_endpoint.hotel_id
webhook_endpoint.hotel_id,
)
return {
"status": "success",
@@ -850,11 +864,11 @@ async def handle_webhook_unified(
"webhook_id": existing.id,
"duplicate": True,
}
elif existing.status == WebhookStatus.PROCESSING:
if existing.status == WebhookStatus.PROCESSING:
# Another worker is processing right now
_LOGGER.info(
"Webhook is being processed by another worker (webhook_id=%d)",
existing.id
existing.id,
)
return {
"status": "success",
@@ -862,12 +876,12 @@ async def handle_webhook_unified(
"webhook_id": existing.id,
"duplicate": True,
}
elif existing.status == WebhookStatus.FAILED:
if existing.status == WebhookStatus.FAILED:
# Retry failed webhook
_LOGGER.info(
"Retrying failed webhook (webhook_id=%d, retry_count=%d)",
existing.id,
existing.retry_count
existing.retry_count,
)
webhook_request = existing
webhook_request.retry_count += 1
@@ -895,13 +909,11 @@ async def handle_webhook_unified(
if not processor:
raise ValueError(f"No processor for type: {webhook_endpoint.webhook_type}")
# 7. Process webhook
# 7. Process webhook with simplified interface
result = await processor.process(
payload=payload,
webhook_request=webhook_request,
hotel=webhook_endpoint.hotel,
db_session=db_session,
request=request,
config=request.app.state.config,
)
# 8. Update status
@@ -927,101 +939,6 @@ async def handle_webhook_unified(
raise HTTPException(status_code=500, detail="Error processing webhook")
@api_router.post("/webhook/wix-form")
@webhook_limiter.limit(WEBHOOK_RATE_LIMIT)
async def handle_wix_form(
request: Request, data: dict[str, Any], db_session=Depends(get_async_session)
):
"""Unified endpoint to handle Wix form submissions (test and production).
No authentication required for this endpoint.
"""
try:
return await process_wix_form_submission(request, data, db_session)
except IntegrityError as e:
# Handle duplicate submissions gracefully - likely same form sent twice
# or race condition between workers
if "unique constraint" in str(e).lower() and "unique_id" in str(e).lower():
_LOGGER.warning(
"Duplicate submission detected (unique_id already exists). "
"Returning success to prevent retry. Error: %s",
str(e),
)
# Return success since the reservation already exists
return {"status": "success", "message": "Reservation already processed"}
# Re-raise if it's a different integrity error
raise
except Exception as e:
_LOGGER.exception("Error in handle_wix_form")
log_entry = {
"timestamp": datetime.now().isoformat(),
"client_ip": request.client.host if request.client else "unknown",
"headers": dict(request.headers),
"data": data,
"error": str(e),
}
# Use asyncio to run file I/O in thread pool to avoid blocking
logs_dir = Path("logs/errors")
await asyncio.to_thread(logs_dir.mkdir, parents=True, mode=0o755, exist_ok=True)
log_filename = (
logs_dir / f"wix_error_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
)
await asyncio.to_thread(
log_filename.write_text,
json.dumps(log_entry, indent=2, default=str, ensure_ascii=False),
encoding="utf-8",
)
_LOGGER.error("Error data logged to: %s", log_filename)
raise HTTPException(status_code=500, detail="Error processing Wix form data")
@api_router.post("/webhook/wix-form/test")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def handle_wix_form_test(
request: Request, data: dict[str, Any], db_session=Depends(get_async_session)
):
"""Test endpoint to verify the API is working with raw JSON data.
No authentication required for testing purposes.
"""
try:
return await process_wix_form_submission(request, data, db_session)
except Exception as e:
_LOGGER.exception("Error in handle_wix_form_test: %s", e)
# Log error data to file asynchronously
import traceback
log_entry = {
"timestamp": datetime.now().isoformat(),
"client_ip": request.client.host if request.client else "unknown",
"headers": dict(request.headers),
"data": data,
"error": str(e),
"traceback": traceback.format_exc(),
}
# Use asyncio to run file I/O in thread pool to avoid blocking
logs_dir = Path("logs/errors")
await asyncio.to_thread(logs_dir.mkdir, parents=True, mode=0o755, exist_ok=True)
log_filename = (
logs_dir / f"wix_test_error_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
)
await asyncio.to_thread(
log_filename.write_text,
json.dumps(log_entry, indent=2, default=str, ensure_ascii=False),
encoding="utf-8",
)
_LOGGER.error("Error data logged to: %s", log_filename)
raise HTTPException(status_code=500, detail="Error processing test data")
async def _process_csv_import_background(
csv_content: str,
filename: str,
@@ -1044,6 +961,7 @@ async def _process_csv_import_background(
session_maker: SessionMaker for creating database sessions
config: Application configuration
log_filename: Path to save the CSV file
"""
try:
# First, save the CSV file (in background)
@@ -1067,18 +985,14 @@ async def _process_csv_import_background(
dryrun=False,
pre_acknowledge=True,
client_id=hotel_code,
username=username
username=username,
)
_LOGGER.info(
"CSV import complete for %s: %s", filename, stats
)
_LOGGER.info("CSV import complete for %s: %s", filename, stats)
finally:
await db_session.close()
except Exception:
_LOGGER.exception(
"Error processing CSV import in background for %s", filename
)
_LOGGER.exception("Error processing CSV import in background for %s", filename)
@api_router.put("/admin/import-csv/{hotel_code}/{filename:path}")
@@ -1150,9 +1064,7 @@ async def import_csv_endpoint(
# Basic validation that it looks like CSV
if not csv_content.strip():
raise HTTPException(
status_code=400, detail="ERROR: CSV content is empty"
)
raise HTTPException(status_code=400, detail="ERROR: CSV content is empty")
# Create logs directory for CSV imports (blocking, but fast)
logs_dir = Path("logs/csv_imports")
@@ -1191,12 +1103,14 @@ async def import_csv_endpoint(
# Return immediate acknowledgment
return Response(
content=json.dumps({
"status": "accepted",
"message": "CSV file received and queued for processing",
"filename": filename,
"timestamp": datetime.now().isoformat(),
}),
content=json.dumps(
{
"status": "accepted",
"message": "CSV file received and queued for processing",
"filename": filename,
"timestamp": datetime.now().isoformat(),
}
),
headers=response_headers,
status_code=202,
)
@@ -1208,116 +1122,6 @@ async def import_csv_endpoint(
raise HTTPException(status_code=500, detail="Error processing CSV upload")
@api_router.post("/webhook/generic")
@webhook_limiter.limit(WEBHOOK_RATE_LIMIT)
async def handle_generic_webhook(
request: Request, db_session=Depends(get_async_session)
):
"""Handle generic webhook endpoint for receiving JSON payloads.
Supports gzip compression, extracts customer and reservation data,
saves to database, and triggers push notifications.
No authentication required for this endpoint.
"""
# Store data for error logging if needed
data = None
try:
timestamp = datetime.now().isoformat()
_LOGGER.info("Received generic webhook data at %s", timestamp)
# Get the raw body content
body = await request.body()
if not body:
raise HTTPException(status_code=400, detail="ERROR: No content provided")
# Check if content is gzip compressed
content_encoding = request.headers.get("content-encoding", "").lower()
is_gzipped = content_encoding == "gzip"
# Decompress if gzipped
if is_gzipped:
try:
body = gzip.decompress(body)
_LOGGER.info("Successfully decompressed gzip content")
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"ERROR: Failed to decompress gzip content: {e}",
) from e
# Parse JSON
try:
data = json.loads(body.decode("utf-8"))
except json.JSONDecodeError as e:
raise HTTPException(
status_code=400,
detail=f"ERROR: Invalid JSON content: {e}",
) from e
if True:
# log to file for now
logs_dir = Path("logs/generic_webhooks")
await asyncio.to_thread(
logs_dir.mkdir, parents=True, mode=0o755, exist_ok=True
)
log_filename = (
logs_dir
/ f"generic_webhook_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
)
await asyncio.to_thread(
log_filename.write_text,
json.dumps(data, indent=2, default=str, ensure_ascii=False),
encoding="utf-8",
)
# Process the webhook data and save to database
await process_generic_webhook_submission(request, data, db_session)
return {
"status": "success",
"message": "Generic webhook data received and processed successfully",
"timestamp": timestamp,
}
except HTTPException:
raise
except Exception as e:
_LOGGER.exception("Error in handle_generic_webhook")
# Log error data to file asynchronously (only on error)
error_log_entry = {
"timestamp": datetime.now().isoformat(),
"client_ip": request.client.host if request.client else "unknown",
"headers": dict(request.headers),
"data": data, # Include the parsed data if available
"error": str(e),
"traceback": traceback.format_exc(),
}
# Use asyncio to run file I/O in thread pool to avoid blocking
error_logs_dir = Path("logs/errors")
await asyncio.to_thread(
error_logs_dir.mkdir, parents=True, mode=0o755, exist_ok=True
)
error_log_filename = error_logs_dir / (
f"generic_webhook_error_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
)
await asyncio.to_thread(
error_log_filename.write_text,
json.dumps(error_log_entry, indent=2, default=str, ensure_ascii=False),
encoding="utf-8",
)
_LOGGER.error("Error data logged to: %s", error_log_filename)
raise HTTPException(
status_code=500, detail="Error processing generic webhook data"
) from e
async def _process_conversion_xml_background(
xml_content: str,
filename: str,

View File

@@ -7,15 +7,18 @@ before the application starts accepting requests. It includes:
"""
import asyncio
from datetime import UTC, datetime
from typing import Any
from sqlalchemy import text
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker
from sqlalchemy.orm import selectinload
from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT
from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT, WebhookStatus
from .customer_service import CustomerService
from .db import create_database_engine
from .db import WebhookEndpoint, WebhookRequest, create_database_engine
from .logging_config import get_logger
from .webhook_processor import webhook_registry
_LOGGER = get_logger(__name__)
@@ -236,6 +239,123 @@ async def backfill_acked_requests_username(
)
async def reprocess_stuck_webhooks(
sessionmaker: async_sessionmaker,
config: dict[str, Any] | None = None,
) -> None:
"""Reprocess webhooks that were stuck in 'processing' state.
Finds webhooks with status='processing' and reprocesses them.
These are webhooks that were not fully processed in the previous run,
likely due to a crash or unexpected shutdown.
Args:
sessionmaker: SQLAlchemy async sessionmaker
config: Application configuration dictionary
"""
_LOGGER.info("Checking for stuck webhooks to reprocess...")
async with sessionmaker() as session:
# Find all webhooks stuck in 'processing' state
result = await session.execute(
select(WebhookRequest)
.where(WebhookRequest.status == WebhookStatus.PROCESSING)
.options(
selectinload(WebhookRequest.webhook_endpoint).selectinload(
WebhookEndpoint.hotel
)
)
)
stuck_webhooks = result.scalars().all()
if not stuck_webhooks:
_LOGGER.info("No stuck webhooks found")
return
_LOGGER.info("Found %d stuck webhooks to reprocess", len(stuck_webhooks))
reprocessed_count = 0
failed_count = 0
for webhook_request in stuck_webhooks:
webhook_id = webhook_request.id
webhook_endpoint = webhook_request.webhook_endpoint
if not webhook_endpoint:
_LOGGER.error(
"Webhook request %d has no webhook_endpoint, skipping", webhook_id
)
webhook_request.status = WebhookStatus.FAILED
webhook_request.last_error = (
"No webhook endpoint found during startup reprocessing"
)
webhook_request.processing_completed_at = datetime.now(UTC)
failed_count += 1
continue
if not webhook_request.payload_json:
_LOGGER.error(
"Webhook request %d has no payload (purged?), marking as failed",
webhook_id,
)
webhook_request.status = WebhookStatus.FAILED
webhook_request.last_error = (
"No payload available for reprocessing (purged)"
)
webhook_request.processing_completed_at = datetime.now(UTC)
failed_count += 1
continue
try:
_LOGGER.info(
"Reprocessing webhook %d (hotel=%s, type=%s)",
webhook_id,
webhook_endpoint.hotel_id,
webhook_endpoint.webhook_type,
)
# Get processor for webhook_type
processor = webhook_registry.get_processor(
webhook_endpoint.webhook_type
)
if not processor:
raise ValueError(
f"No processor for type: {webhook_endpoint.webhook_type}"
)
# Reprocess webhook with simplified interface
await processor.process(
webhook_request=webhook_request,
db_session=session,
config=config,
)
# Update status to completed
webhook_request.status = WebhookStatus.COMPLETED
webhook_request.processing_completed_at = datetime.now(UTC)
reprocessed_count += 1
_LOGGER.info("Successfully reprocessed webhook %d", webhook_id)
except Exception as e:
_LOGGER.exception("Failed to reprocess webhook %d: %s", webhook_id, e)
webhook_request.status = WebhookStatus.FAILED
webhook_request.last_error = (
f"Reprocessing failed during startup: {str(e)[:1950]}"
)
webhook_request.processing_completed_at = datetime.now(UTC)
failed_count += 1
# Commit all changes
await session.commit()
_LOGGER.info(
"Webhook reprocessing complete: %d successful, %d failed",
reprocessed_count,
failed_count,
)
async def run_startup_tasks(
sessionmaker: async_sessionmaker,
config: dict[str, Any] | None = None,
@@ -284,3 +404,6 @@ async def run_startup_tasks(
"No engine provided to run_startup_tasks, "
"skipping config-based backfill tasks"
)
# Reprocess stuck webhooks (those stuck in 'processing' state)
await reprocess_stuck_webhooks(sessionmaker, config)

View File

@@ -1,21 +1,19 @@
"""Webhook processor interface and implementations."""
from abc import ABC, abstractmethod
import asyncio
from datetime import date, datetime
from typing import Any, Protocol
from fastapi import HTTPException, Request
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from alpine_bits_python.config_loader import get_advertising_account_ids
from alpine_bits_python.auth import generate_unique_id
from alpine_bits_python.config_loader import get_advertising_account_ids
from alpine_bits_python.customer_service import CustomerService
from alpine_bits_python.reservation_service import ReservationService
from alpine_bits_python.schemas import ReservationData
from .db import Hotel, WebhookRequest
from .logging_config import get_logger
@@ -32,26 +30,23 @@ class WebhookProcessorProtocol(Protocol):
async def process(
self,
payload: dict[str, Any],
webhook_request: WebhookRequest,
hotel: Hotel,
db_session: AsyncSession,
request: Request,
config: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Process webhook payload.
Args:
payload: Parsed webhook payload
webhook_request: WebhookRequest database record
hotel: Hotel associated with this webhook
webhook_request: WebhookRequest database record (contains payload_json and hotel_id)
db_session: Database session
request: FastAPI Request object
config: Application configuration (optional)
Returns:
Response dict with status, message, customer_id, reservation_id
Raises:
HTTPException on processing errors
"""
...
@@ -68,6 +63,7 @@ class WebhookProcessorRegistry:
Args:
processor: Processor instance to register
"""
self._processors[processor.webhook_type] = processor
_LOGGER.info("Registered webhook processor: %s", processor.webhook_type)
@@ -80,16 +76,43 @@ class WebhookProcessorRegistry:
Returns:
Processor instance or None if not found
"""
return self._processors.get(webhook_type)
async def process_wix_form_submission(request: Request, data: dict[str, Any], db):
"""Shared business logic for handling Wix form submissions (test and production)."""
async def process_wix_form_submission(
request: Request | None,
data: dict[str, Any],
db,
config: dict[str, Any] | None = None,
hotel_id: str | None = None,
event_dispatcher=None,
):
"""Shared business logic for handling Wix form submissions (test and production).
Args:
request: FastAPI Request object (can be None during startup reprocessing)
data: Webhook payload data
db: Database session
config: Application config (optional, extracted from request if not provided)
hotel_id: Hotel ID (optional, will use from data or config default if not provided)
event_dispatcher: Event dispatcher for push notifications (optional)
"""
timestamp = datetime.now().isoformat()
_LOGGER.info("Received Wix form data at %s", timestamp)
# Extract config and event_dispatcher from request if not provided
if config is None and request is not None:
config = request.app.state.config
if event_dispatcher is None and request is not None:
event_dispatcher = getattr(request.app.state, "event_dispatcher", None)
# Provide fallback config if still None
if config is None:
config = {}
data = data.get("data") # Handle nested "data" key if present
# save customer and reservation to DB
@@ -190,18 +213,17 @@ async def process_wix_form_submission(request: Request, data: dict[str, Any], db
db_customer = await customer_service.get_or_create_customer(customer_data)
# Determine hotel_code and hotel_name
# Priority: 1) Form field, 2) Configuration default, 3) Hardcoded fallback
hotel_code = data.get("field:hotelid", None)
# Priority: 1) Passed hotel_id, 2) Form field, 3) Config default, 4) Fallback
hotel_code = hotel_id or data.get("field:hotelid", None)
if hotel_code is None:
_LOGGER.warning("No hotel_code provided in form data, using default")
hotel_code = request.app.state.config.get("default_hotel_code", "123")
_LOGGER.warning("No hotel_code provided, using default from config")
hotel_code = config.get("default_hotel_code", "123")
hotel_name = (
data.get("field:hotelname")
or data.get("hotelname")
or request.app.state.config.get("default_hotel_name")
or config.get("default_hotel_name")
or "Frangart Inn" # fallback
)
@@ -221,7 +243,7 @@ async def process_wix_form_submission(request: Request, data: dict[str, Any], db
# Get advertising account IDs conditionally based on fbclid/gclid presence
meta_account_id, google_account_id = get_advertising_account_ids(
request.app.state.config, hotel_code, fbclid, gclid
config, hotel_code, fbclid, gclid
)
reservation = ReservationData(
@@ -252,18 +274,23 @@ async def process_wix_form_submission(request: Request, data: dict[str, Any], db
# Use ReservationService to create reservation
reservation_service = ReservationService(db)
db_reservation = await reservation_service.create_reservation(
reservation, db_customer.id
)
try:
db_reservation = await reservation_service.create_reservation(
reservation, db_customer.id
)
except IntegrityError as e:
_LOGGER.exception("Database integrity error creating reservation: %s", e)
raise HTTPException(
status_code=500, detail="Database error creating reservation"
) from e
async def push_event():
# Fire event for listeners (push, etc.) - hotel-specific dispatch
dispatcher = getattr(request.app.state, "event_dispatcher", None)
if dispatcher:
if event_dispatcher:
# Get hotel_code from reservation to target the right listeners
hotel_code = getattr(db_reservation, "hotel_code", None)
if hotel_code and hotel_code.strip():
await dispatcher.dispatch_for_hotel(
await event_dispatcher.dispatch_for_hotel(
"form_processed", hotel_code, db_customer, db_reservation
)
_LOGGER.info("Dispatched form_processed event for hotel %s", hotel_code)
@@ -297,40 +324,52 @@ class WixFormProcessor:
async def process(
self,
payload: dict[str, Any],
webhook_request: WebhookRequest,
hotel: Hotel,
db_session: AsyncSession,
request: Request,
config: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Process Wix form webhook payload.
Args:
payload: Parsed webhook payload
webhook_request: WebhookRequest database record
hotel: Hotel associated with this webhook
db_session: Database session
request: FastAPI Request object
config: Application configuration (optional)
Returns:
Response dict with status and details
"""
# Import here to avoid circular dependency
# Call processing function with data from webhook_request
result = await process_wix_form_submission(
request=None, # No request context needed
data=webhook_request.payload_json,
db=db_session,
config=config,
hotel_id=webhook_request.hotel_id,
event_dispatcher=None, # No push events during reprocessing
)
# Call existing processing function
result = await process_wix_form_submission(request, payload, db_session)
# The existing function doesn't return customer/reservation IDs directly,
# but they would be in the database session. We'll need to extract them
# from the result or query after the fact. For now, return the result as-is.
return result
async def process_generic_webhook_submission(
request: Request, data: dict[str, Any], db
request: Request | None,
data: dict[str, Any],
db,
config: dict[str, Any] | None = None,
hotel_id: str | None = None,
event_dispatcher=None,
):
"""Process generic webhook submissions with nested structure.
Args:
request: FastAPI Request object (can be None during startup reprocessing)
data: Webhook payload data
db: Database session
config: Application config (optional, extracted from request if not provided)
hotel_id: Hotel ID (optional, will use from data or config default)
event_dispatcher: Event dispatcher for push notifications (optional)
Expected structure:
{
"hotel_data": {"hotelname": "...", "hotelcode": "..."},
@@ -363,6 +402,16 @@ async def process_generic_webhook_submission(
timestamp = datetime.now().isoformat()
_LOGGER.info("Processing generic webhook submission at %s", timestamp)
# Extract config and event_dispatcher from request if not provided
if config is None and request is not None:
config = request.app.state.config
if event_dispatcher is None and request is not None:
event_dispatcher = getattr(request.app.state, "event_dispatcher", None)
# Provide fallback config if still None
if config is None:
config = {}
# Extract nested data
hotel_data = data.get("hotel_data", {})
form_data = data.get("form_data", {})
@@ -383,17 +432,16 @@ async def process_generic_webhook_submission(
selected_offers_str = ", ".join(selected_offers) if selected_offers else None
# Extract hotel information
hotel_code = hotel_data.get("hotelcode")
# Priority: 1) Passed hotel_id, 2) Webhook data, 3) Config default, 4) Fallback
hotel_code = hotel_id or hotel_data.get("hotelcode")
hotel_name = hotel_data.get("hotelname")
if not hotel_code:
_LOGGER.warning("No hotel_code provided in webhook data, using default")
hotel_code = request.app.state.config.get("default_hotel_code", "123")
_LOGGER.warning("No hotel_code provided, using default from config")
hotel_code = config.get("default_hotel_code", "123")
if not hotel_name:
hotel_name = (
request.app.state.config.get("default_hotel_name") or "Frangart Inn"
)
hotel_name = config.get("default_hotel_name") or "Frangart Inn"
# Extract customer information
first_name = form_data.get("name")
@@ -509,7 +557,7 @@ async def process_generic_webhook_submission(
# Get advertising account IDs conditionally based on fbclid/gclid presence
meta_account_id, google_account_id = get_advertising_account_ids(
request.app.state.config, hotel_code, fbclid, gclid
config, hotel_code, fbclid, gclid
)
# Create reservation
@@ -552,12 +600,11 @@ async def process_generic_webhook_submission(
async def push_event():
# Fire event for listeners (push, etc.) - hotel-specific dispatch
dispatcher = getattr(request.app.state, "event_dispatcher", None)
if dispatcher:
if event_dispatcher:
# Get hotel_code from reservation to target the right listeners
hotel_code = getattr(db_reservation, "hotel_code", None)
if hotel_code and hotel_code.strip():
await dispatcher.dispatch_for_hotel(
await event_dispatcher.dispatch_for_hotel(
"form_processed", hotel_code, db_customer, db_reservation
)
_LOGGER.info("Dispatched form_processed event for hotel %s", hotel_code)
@@ -596,28 +643,30 @@ class GenericWebhookProcessor:
async def process(
self,
payload: dict[str, Any],
webhook_request: WebhookRequest,
hotel: Hotel,
db_session: AsyncSession,
request: Request,
config: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Process generic webhook payload.
Args:
payload: Parsed webhook payload
webhook_request: WebhookRequest database record
hotel: Hotel associated with this webhook
db_session: Database session
request: FastAPI Request object
config: Application configuration (optional)
Returns:
Response dict with status and details
"""
# Call existing processing function
result = await process_generic_webhook_submission(request, payload, db_session)
# Call processing function with data from webhook_request
result = await process_generic_webhook_submission(
request=None, # No request context needed
data=webhook_request.payload_json,
db=db_session,
config=config,
hotel_id=webhook_request.hotel_id,
event_dispatcher=None, # No push events during reprocessing
)
return result