feat: Add hotel and webhook endpoint management

- Introduced Hotel and WebhookEndpoint models to manage hotel configurations and webhook settings.
- Implemented sync_config_to_database function to synchronize hotel data from configuration to the database.
- Added HotelService for accessing hotel configurations and managing customer data.
- Created WebhookProcessor interface and specific processors for handling different webhook types (Wix form and generic).
- Enhanced webhook processing logic to handle incoming requests and create/update reservations and customers.
- Added logging for better traceability of operations related to hotels and webhooks.
This commit is contained in:
Jonas Linter
2025-11-25 12:05:48 +01:00
parent c339a9cace
commit b77a0be80f
8 changed files with 1706 additions and 262 deletions

View File

@@ -2,6 +2,7 @@
import asyncio
import gzip
import hashlib
import json
import multiprocessing
import os
@@ -9,7 +10,8 @@ import traceback
import urllib.parse
import xml.dom.minidom
from collections import defaultdict
from datetime import date, datetime
from contextlib import asynccontextmanager
from datetime import UTC, date, datetime, timedelta
from functools import partial
from pathlib import Path
from typing import Any
@@ -27,10 +29,13 @@ from fastapi.security import (
)
from pydantic import BaseModel
from slowapi.errors import RateLimitExceeded
from sqlalchemy import and_, select, update
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.orm import selectinload
from alpine_bits_python.schemas import ReservationData
from alpine_bits_python.webhook_processor import process_generic_webhook_submission
from .alpinebits_server import (
AlpineBitsActionName,
@@ -46,6 +51,7 @@ from .csv_import import CSVImporter
from .customer_service import CustomerService
from .db import Customer as DBCustomer
from .db import Reservation as DBReservation
from .db import Hotel, WebhookEndpoint, WebhookRequest
from .db import ResilientAsyncSession, SessionMaker, create_database_engine
from .db_setup import run_startup_tasks
from .email_monitoring import ReservationStatsCollector
@@ -61,6 +67,7 @@ from .rate_limit import (
webhook_limiter,
)
from .reservation_service import ReservationService
from .webhook_processor import webhook_registry
from .worker_coordination import is_primary_worker
# Configure logging - will be reconfigured during lifespan with actual config
@@ -249,9 +256,113 @@ async def push_listener(customer: DBCustomer, reservation: DBReservation, hotel)
except Exception as e:
_LOGGER.exception("Push event failed for hotel %s: %s", hotel["hotel_id"], e)
# Optionally implement retry logic here@asynccontextmanager
# Optionally implement retry logic here
async def cleanup_stale_webhooks(
async_sessionmaker: async_sessionmaker,
timeout_minutes: int = 10
) -> int:
"""Reset webhooks stuck in 'processing' (worker crashed).
Args:
async_sessionmaker: SQLAlchemy async sessionmaker
timeout_minutes: Timeout threshold in minutes
Returns:
Number of stale webhooks reset
"""
timeout_threshold = datetime.now(UTC) - timedelta(minutes=timeout_minutes)
async with async_sessionmaker() as session:
result = await session.execute(
update(WebhookRequest)
.where(
and_(
WebhookRequest.status == 'processing',
WebhookRequest.processing_started_at < timeout_threshold
)
)
.values(
status='failed',
last_error='Processing timeout - worker may have crashed'
)
)
await session.commit()
count = result.rowcount
if count > 0:
_LOGGER.warning("Reset %d stale webhooks to 'failed'", count)
return count
async def purge_old_webhook_payloads(
async_sessionmaker: async_sessionmaker,
retention_days: int = 7
) -> int:
"""Purge payload_json from old completed webhooks.
Keeps metadata for history but removes large JSON payload.
Args:
async_sessionmaker: SQLAlchemy async sessionmaker
retention_days: Days to retain payloads before purging
Returns:
Number of payloads purged
"""
cutoff_date = datetime.now(UTC) - timedelta(days=retention_days)
async with async_sessionmaker() as session:
result = await session.execute(
update(WebhookRequest)
.where(
and_(
WebhookRequest.status == 'completed',
WebhookRequest.created_at < cutoff_date,
WebhookRequest.purged_at.is_(None) # Not already purged
)
)
.values(
payload_json=None,
purged_at=datetime.now(UTC)
)
)
await session.commit()
count = result.rowcount
if count > 0:
_LOGGER.info("Purged payloads from %d old webhook requests", count)
return count
async def periodic_webhook_cleanup(async_sessionmaker: async_sessionmaker):
"""Run periodic cleanup tasks for webhooks.
This should be scheduled to run every 5-10 minutes.
Args:
async_sessionmaker: SQLAlchemy async sessionmaker
"""
try:
# Clean up stale webhooks (stuck in 'processing')
stale_count = await cleanup_stale_webhooks(async_sessionmaker)
# Purge old webhook payloads (older than 7 days)
purged_count = await purge_old_webhook_payloads(async_sessionmaker)
_LOGGER.debug(
"Webhook cleanup: %d stale reset, %d payloads purged",
stale_count,
purged_count
)
except Exception as e:
_LOGGER.exception("Error during periodic webhook cleanup: %s", e)
@asynccontextmanager
async def lifespan(app: FastAPI):
# Setup DB
@@ -311,6 +422,11 @@ async def lifespan(app: FastAPI):
app.state.alert_handler = alert_handler
app.state.report_scheduler = report_scheduler
# Initialize webhook processors
from .webhook_processor import initialize_webhook_processors
initialize_webhook_processors()
_LOGGER.info("Webhook processors initialized")
# Register push listeners for hotels with push_endpoint
for hotel in config.get("alpine_bits_auth", []):
push_endpoint = hotel.get("push_endpoint")
@@ -356,6 +472,24 @@ async def lifespan(app: FastAPI):
report_scheduler.start()
_LOGGER.info("Daily report scheduler started")
# Start periodic webhook cleanup (only on primary worker)
cleanup_task = None
if is_primary:
async def run_periodic_cleanup():
"""Run cleanup tasks every 5 minutes."""
while True:
try:
await asyncio.sleep(300) # 5 minutes
await periodic_webhook_cleanup(AsyncSessionLocal)
except asyncio.CancelledError:
_LOGGER.info("Webhook cleanup task cancelled")
break
except Exception as e:
_LOGGER.exception("Error in periodic webhook cleanup: %s", e)
cleanup_task = asyncio.create_task(run_periodic_cleanup())
_LOGGER.info("Webhook periodic cleanup task started")
_LOGGER.info("Application startup complete")
yield
@@ -363,6 +497,15 @@ async def lifespan(app: FastAPI):
# Cleanup on shutdown
_LOGGER.info("Application shutdown initiated")
# Stop webhook cleanup task
if cleanup_task:
cleanup_task.cancel()
try:
await cleanup_task
except asyncio.CancelledError:
pass
_LOGGER.info("Webhook cleanup task stopped")
# Stop daily report scheduler
if report_scheduler:
report_scheduler.stop()
@@ -761,266 +904,6 @@ async def process_wix_form_submission(request: Request, data: dict[str, Any], db
}
async def process_generic_webhook_submission(
request: Request, data: dict[str, Any], db
):
"""Process generic webhook submissions with nested structure.
Expected structure:
{
"hotel_data": {"hotelname": "...", "hotelcode": "..."},
"form_data": {
"sprache": "de/it/en",
"anreise": "DD.MM.YYYY",
"abreise": "DD.MM.YYYY",
"erwachsene": "N",
"kinder": "N",
"alter": {"1": "age1", "2": "age2", ...},
"anrede": "...",
"name": "...",
"nachname": "...",
"mail": "...",
"tel": "...",
"nachricht": "..."
},
"tracking_data": {
"utm_source": "...",
"utm_medium": "...",
"utm_campaign": "...",
"utm_content": "...",
"utm_term": "...",
"fbclid": "...",
"gclid": "..."
},
"timestamp": "ISO8601"
}
"""
timestamp = datetime.now().isoformat()
_LOGGER.info("Processing generic webhook submission at %s", timestamp)
# Extract nested data
hotel_data = data.get("hotel_data", {})
form_data = data.get("form_data", {})
tracking_data = data.get("tracking_data", {})
offer_data = data.get("unterkunftTyp", {})
selected_offers = []
if offer_data:
# grab keys and values. If value is "on" add the key not the value to a list of selected offers
offer_data: dict[str, str]
for key, value in offer_data.items():
if value == "on":
selected_offers.append(key)
selected_offers_str = ", ".join(selected_offers) if selected_offers else None
# Extract hotel information
hotel_code = hotel_data.get("hotelcode")
hotel_name = hotel_data.get("hotelname")
if not hotel_code:
_LOGGER.warning("No hotel_code provided in webhook data, using default")
hotel_code = request.app.state.config.get("default_hotel_code", "123")
if not hotel_name:
hotel_name = (
request.app.state.config.get("default_hotel_name") or "Frangart Inn"
)
# Extract customer information
first_name = form_data.get("name")
last_name = form_data.get("nachname")
email = form_data.get("mail")
phone_number = form_data.get("tel")
name_prefix = form_data.get("anrede")
language = form_data.get("sprache", "de")[:2]
user_comment = form_data.get("nachricht", "")
plz = form_data.get("plz", "")
city = form_data.get("stadt", "")
country = form_data.get("land", "")
# Parse dates - handle DD.MM.YYYY format
start_date_str = form_data.get("anreise")
end_date_str = form_data.get("abreise")
if not start_date_str or not end_date_str:
raise HTTPException(
status_code=400, detail="Missing required dates (anreise/abreise)"
)
try:
# Parse DD.MM.YYYY format using strptime
start_date = datetime.strptime(start_date_str, "%d.%m.%Y").date()
end_date = datetime.strptime(end_date_str, "%d.%m.%Y").date()
except ValueError as e:
_LOGGER.error(
"Error parsing dates: start=%s, end=%s, error=%s",
start_date_str,
end_date_str,
e,
)
raise HTTPException(status_code=400, detail=f"Invalid date format: {e}") from e
# Extract room/guest info
num_adults = int(form_data.get("erwachsene", 2))
num_children = int(form_data.get("kinder", 0))
# Extract children ages from nested structure
children_ages = []
if num_children > 0:
alter_data = form_data.get("alter", {})
for i in range(1, num_children + 1):
age_str = alter_data.get(str(i))
if age_str:
try:
children_ages.append(int(age_str))
except ValueError:
_LOGGER.warning("Invalid age value for child %d: %s", i, age_str)
# Extract tracking information
utm_source = None
utm_medium = None
utm_campaign = None
utm_term = None
utm_content = None
fbclid = None
gclid = None
if tracking_data:
utm_source = tracking_data.get("utm_source")
utm_medium = tracking_data.get("utm_medium")
utm_campaign = tracking_data.get("utm_campaign")
utm_term = tracking_data.get("utm_term")
utm_content = tracking_data.get("utm_content")
fbclid = tracking_data.get("fbclid")
gclid = tracking_data.get("gclid")
# Parse submission timestamp
submission_time = data.get("timestamp")
try:
if submission_time:
# Handle ISO8601 format with timezone
if submission_time.endswith("Z"):
submission_time = datetime.fromisoformat(submission_time[:-1])
elif "+" in submission_time:
# Remove timezone info (e.g., +02:00)
submission_time = datetime.fromisoformat(submission_time.split("+")[0])
else:
submission_time = datetime.fromisoformat(submission_time)
except Exception as e:
_LOGGER.exception("Error parsing submission timestamp: %s", e)
submission_time = None
# Generate unique ID
unique_id = generate_unique_id()
# Use CustomerService to handle customer creation/update with hashing
customer_service = CustomerService(db)
customer_data = {
"given_name": first_name,
"surname": last_name,
"contact_id": None,
"name_prefix": name_prefix if name_prefix != "--" else None,
"email_address": email,
"phone": phone_number if phone_number else None,
"email_newsletter": False,
"address_line": None,
"city_name": city if city else None,
"postal_code": plz if plz else None,
"country_code": country if country else None,
"gender": None,
"birth_date": None,
"language": language,
"address_catalog": False,
"name_title": None,
}
# Create/update customer
db_customer = await customer_service.get_or_create_customer(customer_data)
# Get advertising account IDs conditionally based on fbclid/gclid presence
meta_account_id, google_account_id = get_advertising_account_ids(
request.app.state.config, hotel_code, fbclid, gclid
)
# Create reservation
reservation_kwargs = {
"unique_id": unique_id,
"start_date": start_date,
"end_date": end_date,
"num_adults": num_adults,
"num_children": num_children,
"children_ages": children_ages,
"hotel_code": hotel_code,
"hotel_name": hotel_name,
"offer": selected_offers_str,
"utm_source": utm_source,
"utm_medium": utm_medium,
"utm_campaign": utm_campaign,
"utm_term": utm_term,
"utm_content": utm_content,
"user_comment": user_comment,
"fbclid": fbclid,
"gclid": gclid,
"meta_account_id": meta_account_id,
"google_account_id": google_account_id,
}
# Only include created_at if we have a valid submission_time
if submission_time:
reservation_kwargs["created_at"] = submission_time
reservation = ReservationData(**reservation_kwargs)
if reservation.md5_unique_id is None:
raise HTTPException(status_code=400, detail="Failed to generate md5_unique_id")
# Use ReservationService to create reservation
reservation_service = ReservationService(db)
db_reservation = await reservation_service.create_reservation(
reservation, db_customer.id
)
async def push_event():
# Fire event for listeners (push, etc.) - hotel-specific dispatch
dispatcher = getattr(request.app.state, "event_dispatcher", None)
if dispatcher:
# Get hotel_code from reservation to target the right listeners
hotel_code = getattr(db_reservation, "hotel_code", None)
if hotel_code and hotel_code.strip():
await dispatcher.dispatch_for_hotel(
"form_processed", hotel_code, db_customer, db_reservation
)
_LOGGER.info("Dispatched form_processed event for hotel %s", hotel_code)
else:
_LOGGER.warning(
"No hotel_code in reservation, skipping push notifications"
)
# Create task and store reference to prevent garbage collection
task = asyncio.create_task(push_event())
# Add done callback to log any exceptions
task.add_done_callback(lambda t: t.exception() if not t.cancelled() else None)
_LOGGER.info(
"Successfully processed generic webhook: customer_id=%s, reservation_id=%s",
db_customer.id,
db_reservation.id,
)
return {
"status": "success",
"message": "Generic webhook data processed successfully",
"customer_id": db_customer.id,
"reservation_id": db_reservation.id,
"timestamp": timestamp,
}
async def validate_basic_auth(
credentials: HTTPBasicCredentials = Depends(security_basic),
) -> str:
@@ -1058,6 +941,171 @@ async def validate_basic_auth(
return credentials.username, credentials.password
@api_router.post("/webhook/{webhook_secret}")
@webhook_limiter.limit(WEBHOOK_RATE_LIMIT)
async def handle_webhook_unified(
request: Request,
webhook_secret: str,
db_session=Depends(get_async_session),
):
"""Unified webhook handler with deduplication and routing.
Flow:
1. Look up webhook_endpoint by webhook_secret
2. Parse and hash payload (SHA256)
3. Check for duplicate using SELECT FOR UPDATE SKIP LOCKED
4. If duplicate and completed: return success (idempotent)
5. If duplicate and processing: return success (concurrent request)
6. Create or update webhook_request with status='processing'
7. Route to appropriate processor based on webhook_endpoint.webhook_type
8. Update status to 'completed' or 'failed'
9. Return response
"""
timestamp = datetime.now(UTC)
# 1. Look up webhook_endpoint
result = await db_session.execute(
select(WebhookEndpoint)
.where(
and_(
WebhookEndpoint.webhook_secret == webhook_secret,
WebhookEndpoint.is_enabled == True
)
)
.options(selectinload(WebhookEndpoint.hotel))
)
webhook_endpoint = result.scalar_one_or_none()
if not webhook_endpoint or not webhook_endpoint.hotel.is_active:
raise HTTPException(status_code=404, detail="Webhook not found")
# 2. Parse payload
body = await request.body()
# Handle gzip compression
if request.headers.get("content-encoding", "").lower() == "gzip":
try:
body = gzip.decompress(body)
except Exception as e:
_LOGGER.error("Failed to decompress gzip payload: %s", e)
raise HTTPException(status_code=400, detail="Invalid gzip compression")
try:
payload = json.loads(body.decode("utf-8"))
except Exception as e:
_LOGGER.error("Failed to parse JSON payload: %s", e)
raise HTTPException(status_code=400, detail="Invalid JSON payload")
# 3. Hash payload (canonical JSON for consistent hashing)
payload_json_str = json.dumps(payload, sort_keys=True)
payload_hash = hashlib.sha256(payload_json_str.encode("utf-8")).hexdigest()
payload_size = len(payload_json_str.encode("utf-8"))
# Check payload size limit (10MB)
if payload_size > 10 * 1024 * 1024:
_LOGGER.error("Payload too large: %d bytes", payload_size)
raise HTTPException(status_code=413, detail="Payload too large (max 10MB)")
# 4. Check for duplicate with row-level locking
duplicate = await db_session.execute(
select(WebhookRequest)
.where(WebhookRequest.payload_hash == payload_hash)
.with_for_update(skip_locked=True)
)
existing = duplicate.scalar_one_or_none()
if existing:
if existing.status == 'completed':
# Already processed successfully
_LOGGER.info(
"Webhook already processed (webhook_id=%d, hotel=%s)",
existing.id,
webhook_endpoint.hotel_id
)
return {
"status": "success",
"message": "Webhook already processed",
"webhook_id": existing.id,
"duplicate": True,
}
elif existing.status == 'processing':
# Another worker is processing right now
_LOGGER.info(
"Webhook is being processed by another worker (webhook_id=%d)",
existing.id
)
return {
"status": "success",
"message": "Webhook is being processed",
"webhook_id": existing.id,
"duplicate": True,
}
elif existing.status == 'failed':
# Retry failed webhook
_LOGGER.info(
"Retrying failed webhook (webhook_id=%d, retry_count=%d)",
existing.id,
existing.retry_count
)
webhook_request = existing
webhook_request.retry_count += 1
webhook_request.status = 'processing'
webhook_request.processing_started_at = timestamp
else:
# 5. Create new webhook_request
webhook_request = WebhookRequest(
payload_hash=payload_hash,
webhook_endpoint_id=webhook_endpoint.id,
hotel_id=webhook_endpoint.hotel_id,
status='processing',
payload_json=payload,
payload_size_bytes=payload_size,
processing_started_at=timestamp,
created_at=timestamp,
source_ip=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
)
db_session.add(webhook_request)
await db_session.flush()
try:
# 6. Get processor for webhook_type
processor = webhook_registry.get_processor(webhook_endpoint.webhook_type)
if not processor:
raise ValueError(f"No processor for type: {webhook_endpoint.webhook_type}")
# 7. Process webhook
result = await processor.process(
payload=payload,
webhook_request=webhook_request,
hotel=webhook_endpoint.hotel,
db_session=db_session,
request=request,
)
# 8. Update status
webhook_request.status = 'completed'
webhook_request.processing_completed_at = datetime.now(UTC)
await db_session.commit()
return {
**result,
"webhook_id": webhook_request.id,
"hotel_id": webhook_endpoint.hotel_id,
}
except Exception as e:
_LOGGER.exception("Error processing webhook: %s", e)
webhook_request.status = 'failed'
webhook_request.last_error = str(e)[:2000]
webhook_request.processing_completed_at = datetime.now(UTC)
await db_session.commit()
raise HTTPException(status_code=500, detail="Error processing webhook")
@api_router.post("/webhook/wix-form")
@webhook_limiter.limit(WEBHOOK_RATE_LIMIT)
async def handle_wix_form(