feat: Add hotel and webhook endpoint management

- Introduced Hotel and WebhookEndpoint models to manage hotel configurations and webhook settings.
- Implemented sync_config_to_database function to synchronize hotel data from configuration to the database.
- Added HotelService for accessing hotel configurations and managing customer data.
- Created WebhookProcessor interface and specific processors for handling different webhook types (Wix form and generic).
- Enhanced webhook processing logic to handle incoming requests and create/update reservations and customers.
- Added logging for better traceability of operations related to hotels and webhooks.
This commit is contained in:
Jonas Linter
2025-11-25 12:05:48 +01:00
parent da85098d8d
commit 8d144a761c
8 changed files with 1706 additions and 262 deletions

View File

@@ -2,6 +2,7 @@
import asyncio
import gzip
import hashlib
import json
import multiprocessing
import os
@@ -9,7 +10,8 @@ import traceback
import urllib.parse
import xml.dom.minidom
from collections import defaultdict
from datetime import date, datetime
from contextlib import asynccontextmanager
from datetime import UTC, date, datetime, timedelta
from functools import partial
from pathlib import Path
from typing import Any
@@ -27,10 +29,13 @@ from fastapi.security import (
)
from pydantic import BaseModel
from slowapi.errors import RateLimitExceeded
from sqlalchemy import and_, select, update
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.orm import selectinload
from alpine_bits_python.schemas import ReservationData
from alpine_bits_python.webhook_processor import process_generic_webhook_submission
from .alpinebits_server import (
AlpineBitsActionName,
@@ -46,6 +51,7 @@ from .csv_import import CSVImporter
from .customer_service import CustomerService
from .db import Customer as DBCustomer
from .db import Reservation as DBReservation
from .db import Hotel, WebhookEndpoint, WebhookRequest
from .db import ResilientAsyncSession, SessionMaker, create_database_engine
from .db_setup import run_startup_tasks
from .email_monitoring import ReservationStatsCollector
@@ -61,6 +67,7 @@ from .rate_limit import (
webhook_limiter,
)
from .reservation_service import ReservationService
from .webhook_processor import webhook_registry
from .worker_coordination import is_primary_worker
# Configure logging - will be reconfigured during lifespan with actual config
@@ -249,9 +256,113 @@ async def push_listener(customer: DBCustomer, reservation: DBReservation, hotel)
except Exception as e:
_LOGGER.exception("Push event failed for hotel %s: %s", hotel["hotel_id"], e)
# Optionally implement retry logic here@asynccontextmanager
# Optionally implement retry logic here
async def cleanup_stale_webhooks(
async_sessionmaker: async_sessionmaker,
timeout_minutes: int = 10
) -> int:
"""Reset webhooks stuck in 'processing' (worker crashed).
Args:
async_sessionmaker: SQLAlchemy async sessionmaker
timeout_minutes: Timeout threshold in minutes
Returns:
Number of stale webhooks reset
"""
timeout_threshold = datetime.now(UTC) - timedelta(minutes=timeout_minutes)
async with async_sessionmaker() as session:
result = await session.execute(
update(WebhookRequest)
.where(
and_(
WebhookRequest.status == 'processing',
WebhookRequest.processing_started_at < timeout_threshold
)
)
.values(
status='failed',
last_error='Processing timeout - worker may have crashed'
)
)
await session.commit()
count = result.rowcount
if count > 0:
_LOGGER.warning("Reset %d stale webhooks to 'failed'", count)
return count
async def purge_old_webhook_payloads(
async_sessionmaker: async_sessionmaker,
retention_days: int = 7
) -> int:
"""Purge payload_json from old completed webhooks.
Keeps metadata for history but removes large JSON payload.
Args:
async_sessionmaker: SQLAlchemy async sessionmaker
retention_days: Days to retain payloads before purging
Returns:
Number of payloads purged
"""
cutoff_date = datetime.now(UTC) - timedelta(days=retention_days)
async with async_sessionmaker() as session:
result = await session.execute(
update(WebhookRequest)
.where(
and_(
WebhookRequest.status == 'completed',
WebhookRequest.created_at < cutoff_date,
WebhookRequest.purged_at.is_(None) # Not already purged
)
)
.values(
payload_json=None,
purged_at=datetime.now(UTC)
)
)
await session.commit()
count = result.rowcount
if count > 0:
_LOGGER.info("Purged payloads from %d old webhook requests", count)
return count
async def periodic_webhook_cleanup(async_sessionmaker: async_sessionmaker):
"""Run periodic cleanup tasks for webhooks.
This should be scheduled to run every 5-10 minutes.
Args:
async_sessionmaker: SQLAlchemy async sessionmaker
"""
try:
# Clean up stale webhooks (stuck in 'processing')
stale_count = await cleanup_stale_webhooks(async_sessionmaker)
# Purge old webhook payloads (older than 7 days)
purged_count = await purge_old_webhook_payloads(async_sessionmaker)
_LOGGER.debug(
"Webhook cleanup: %d stale reset, %d payloads purged",
stale_count,
purged_count
)
except Exception as e:
_LOGGER.exception("Error during periodic webhook cleanup: %s", e)
@asynccontextmanager
async def lifespan(app: FastAPI):
# Setup DB
@@ -311,6 +422,11 @@ async def lifespan(app: FastAPI):
app.state.alert_handler = alert_handler
app.state.report_scheduler = report_scheduler
# Initialize webhook processors
from .webhook_processor import initialize_webhook_processors
initialize_webhook_processors()
_LOGGER.info("Webhook processors initialized")
# Register push listeners for hotels with push_endpoint
for hotel in config.get("alpine_bits_auth", []):
push_endpoint = hotel.get("push_endpoint")
@@ -356,6 +472,24 @@ async def lifespan(app: FastAPI):
report_scheduler.start()
_LOGGER.info("Daily report scheduler started")
# Start periodic webhook cleanup (only on primary worker)
cleanup_task = None
if is_primary:
async def run_periodic_cleanup():
"""Run cleanup tasks every 5 minutes."""
while True:
try:
await asyncio.sleep(300) # 5 minutes
await periodic_webhook_cleanup(AsyncSessionLocal)
except asyncio.CancelledError:
_LOGGER.info("Webhook cleanup task cancelled")
break
except Exception as e:
_LOGGER.exception("Error in periodic webhook cleanup: %s", e)
cleanup_task = asyncio.create_task(run_periodic_cleanup())
_LOGGER.info("Webhook periodic cleanup task started")
_LOGGER.info("Application startup complete")
yield
@@ -363,6 +497,15 @@ async def lifespan(app: FastAPI):
# Cleanup on shutdown
_LOGGER.info("Application shutdown initiated")
# Stop webhook cleanup task
if cleanup_task:
cleanup_task.cancel()
try:
await cleanup_task
except asyncio.CancelledError:
pass
_LOGGER.info("Webhook cleanup task stopped")
# Stop daily report scheduler
if report_scheduler:
report_scheduler.stop()
@@ -761,266 +904,6 @@ async def process_wix_form_submission(request: Request, data: dict[str, Any], db
}
async def process_generic_webhook_submission(
request: Request, data: dict[str, Any], db
):
"""Process generic webhook submissions with nested structure.
Expected structure:
{
"hotel_data": {"hotelname": "...", "hotelcode": "..."},
"form_data": {
"sprache": "de/it/en",
"anreise": "DD.MM.YYYY",
"abreise": "DD.MM.YYYY",
"erwachsene": "N",
"kinder": "N",
"alter": {"1": "age1", "2": "age2", ...},
"anrede": "...",
"name": "...",
"nachname": "...",
"mail": "...",
"tel": "...",
"nachricht": "..."
},
"tracking_data": {
"utm_source": "...",
"utm_medium": "...",
"utm_campaign": "...",
"utm_content": "...",
"utm_term": "...",
"fbclid": "...",
"gclid": "..."
},
"timestamp": "ISO8601"
}
"""
timestamp = datetime.now().isoformat()
_LOGGER.info("Processing generic webhook submission at %s", timestamp)
# Extract nested data
hotel_data = data.get("hotel_data", {})
form_data = data.get("form_data", {})
tracking_data = data.get("tracking_data", {})
offer_data = data.get("unterkunftTyp", {})
selected_offers = []
if offer_data:
# grab keys and values. If value is "on" add the key not the value to a list of selected offers
offer_data: dict[str, str]
for key, value in offer_data.items():
if value == "on":
selected_offers.append(key)
selected_offers_str = ", ".join(selected_offers) if selected_offers else None
# Extract hotel information
hotel_code = hotel_data.get("hotelcode")
hotel_name = hotel_data.get("hotelname")
if not hotel_code:
_LOGGER.warning("No hotel_code provided in webhook data, using default")
hotel_code = request.app.state.config.get("default_hotel_code", "123")
if not hotel_name:
hotel_name = (
request.app.state.config.get("default_hotel_name") or "Frangart Inn"
)
# Extract customer information
first_name = form_data.get("name")
last_name = form_data.get("nachname")
email = form_data.get("mail")
phone_number = form_data.get("tel")
name_prefix = form_data.get("anrede")
language = form_data.get("sprache", "de")[:2]
user_comment = form_data.get("nachricht", "")
plz = form_data.get("plz", "")
city = form_data.get("stadt", "")
country = form_data.get("land", "")
# Parse dates - handle DD.MM.YYYY format
start_date_str = form_data.get("anreise")
end_date_str = form_data.get("abreise")
if not start_date_str or not end_date_str:
raise HTTPException(
status_code=400, detail="Missing required dates (anreise/abreise)"
)
try:
# Parse DD.MM.YYYY format using strptime
start_date = datetime.strptime(start_date_str, "%d.%m.%Y").date()
end_date = datetime.strptime(end_date_str, "%d.%m.%Y").date()
except ValueError as e:
_LOGGER.error(
"Error parsing dates: start=%s, end=%s, error=%s",
start_date_str,
end_date_str,
e,
)
raise HTTPException(status_code=400, detail=f"Invalid date format: {e}") from e
# Extract room/guest info
num_adults = int(form_data.get("erwachsene", 2))
num_children = int(form_data.get("kinder", 0))
# Extract children ages from nested structure
children_ages = []
if num_children > 0:
alter_data = form_data.get("alter", {})
for i in range(1, num_children + 1):
age_str = alter_data.get(str(i))
if age_str:
try:
children_ages.append(int(age_str))
except ValueError:
_LOGGER.warning("Invalid age value for child %d: %s", i, age_str)
# Extract tracking information
utm_source = None
utm_medium = None
utm_campaign = None
utm_term = None
utm_content = None
fbclid = None
gclid = None
if tracking_data:
utm_source = tracking_data.get("utm_source")
utm_medium = tracking_data.get("utm_medium")
utm_campaign = tracking_data.get("utm_campaign")
utm_term = tracking_data.get("utm_term")
utm_content = tracking_data.get("utm_content")
fbclid = tracking_data.get("fbclid")
gclid = tracking_data.get("gclid")
# Parse submission timestamp
submission_time = data.get("timestamp")
try:
if submission_time:
# Handle ISO8601 format with timezone
if submission_time.endswith("Z"):
submission_time = datetime.fromisoformat(submission_time[:-1])
elif "+" in submission_time:
# Remove timezone info (e.g., +02:00)
submission_time = datetime.fromisoformat(submission_time.split("+")[0])
else:
submission_time = datetime.fromisoformat(submission_time)
except Exception as e:
_LOGGER.exception("Error parsing submission timestamp: %s", e)
submission_time = None
# Generate unique ID
unique_id = generate_unique_id()
# Use CustomerService to handle customer creation/update with hashing
customer_service = CustomerService(db)
customer_data = {
"given_name": first_name,
"surname": last_name,
"contact_id": None,
"name_prefix": name_prefix if name_prefix != "--" else None,
"email_address": email,
"phone": phone_number if phone_number else None,
"email_newsletter": False,
"address_line": None,
"city_name": city if city else None,
"postal_code": plz if plz else None,
"country_code": country if country else None,
"gender": None,
"birth_date": None,
"language": language,
"address_catalog": False,
"name_title": None,
}
# Create/update customer
db_customer = await customer_service.get_or_create_customer(customer_data)
# Get advertising account IDs conditionally based on fbclid/gclid presence
meta_account_id, google_account_id = get_advertising_account_ids(
request.app.state.config, hotel_code, fbclid, gclid
)
# Create reservation
reservation_kwargs = {
"unique_id": unique_id,
"start_date": start_date,
"end_date": end_date,
"num_adults": num_adults,
"num_children": num_children,
"children_ages": children_ages,
"hotel_code": hotel_code,
"hotel_name": hotel_name,
"offer": selected_offers_str,
"utm_source": utm_source,
"utm_medium": utm_medium,
"utm_campaign": utm_campaign,
"utm_term": utm_term,
"utm_content": utm_content,
"user_comment": user_comment,
"fbclid": fbclid,
"gclid": gclid,
"meta_account_id": meta_account_id,
"google_account_id": google_account_id,
}
# Only include created_at if we have a valid submission_time
if submission_time:
reservation_kwargs["created_at"] = submission_time
reservation = ReservationData(**reservation_kwargs)
if reservation.md5_unique_id is None:
raise HTTPException(status_code=400, detail="Failed to generate md5_unique_id")
# Use ReservationService to create reservation
reservation_service = ReservationService(db)
db_reservation = await reservation_service.create_reservation(
reservation, db_customer.id
)
async def push_event():
# Fire event for listeners (push, etc.) - hotel-specific dispatch
dispatcher = getattr(request.app.state, "event_dispatcher", None)
if dispatcher:
# Get hotel_code from reservation to target the right listeners
hotel_code = getattr(db_reservation, "hotel_code", None)
if hotel_code and hotel_code.strip():
await dispatcher.dispatch_for_hotel(
"form_processed", hotel_code, db_customer, db_reservation
)
_LOGGER.info("Dispatched form_processed event for hotel %s", hotel_code)
else:
_LOGGER.warning(
"No hotel_code in reservation, skipping push notifications"
)
# Create task and store reference to prevent garbage collection
task = asyncio.create_task(push_event())
# Add done callback to log any exceptions
task.add_done_callback(lambda t: t.exception() if not t.cancelled() else None)
_LOGGER.info(
"Successfully processed generic webhook: customer_id=%s, reservation_id=%s",
db_customer.id,
db_reservation.id,
)
return {
"status": "success",
"message": "Generic webhook data processed successfully",
"customer_id": db_customer.id,
"reservation_id": db_reservation.id,
"timestamp": timestamp,
}
async def validate_basic_auth(
credentials: HTTPBasicCredentials = Depends(security_basic),
) -> str:
@@ -1058,6 +941,171 @@ async def validate_basic_auth(
return credentials.username, credentials.password
@api_router.post("/webhook/{webhook_secret}")
@webhook_limiter.limit(WEBHOOK_RATE_LIMIT)
async def handle_webhook_unified(
request: Request,
webhook_secret: str,
db_session=Depends(get_async_session),
):
"""Unified webhook handler with deduplication and routing.
Flow:
1. Look up webhook_endpoint by webhook_secret
2. Parse and hash payload (SHA256)
3. Check for duplicate using SELECT FOR UPDATE SKIP LOCKED
4. If duplicate and completed: return success (idempotent)
5. If duplicate and processing: return success (concurrent request)
6. Create or update webhook_request with status='processing'
7. Route to appropriate processor based on webhook_endpoint.webhook_type
8. Update status to 'completed' or 'failed'
9. Return response
"""
timestamp = datetime.now(UTC)
# 1. Look up webhook_endpoint
result = await db_session.execute(
select(WebhookEndpoint)
.where(
and_(
WebhookEndpoint.webhook_secret == webhook_secret,
WebhookEndpoint.is_enabled == True
)
)
.options(selectinload(WebhookEndpoint.hotel))
)
webhook_endpoint = result.scalar_one_or_none()
if not webhook_endpoint or not webhook_endpoint.hotel.is_active:
raise HTTPException(status_code=404, detail="Webhook not found")
# 2. Parse payload
body = await request.body()
# Handle gzip compression
if request.headers.get("content-encoding", "").lower() == "gzip":
try:
body = gzip.decompress(body)
except Exception as e:
_LOGGER.error("Failed to decompress gzip payload: %s", e)
raise HTTPException(status_code=400, detail="Invalid gzip compression")
try:
payload = json.loads(body.decode("utf-8"))
except Exception as e:
_LOGGER.error("Failed to parse JSON payload: %s", e)
raise HTTPException(status_code=400, detail="Invalid JSON payload")
# 3. Hash payload (canonical JSON for consistent hashing)
payload_json_str = json.dumps(payload, sort_keys=True)
payload_hash = hashlib.sha256(payload_json_str.encode("utf-8")).hexdigest()
payload_size = len(payload_json_str.encode("utf-8"))
# Check payload size limit (10MB)
if payload_size > 10 * 1024 * 1024:
_LOGGER.error("Payload too large: %d bytes", payload_size)
raise HTTPException(status_code=413, detail="Payload too large (max 10MB)")
# 4. Check for duplicate with row-level locking
duplicate = await db_session.execute(
select(WebhookRequest)
.where(WebhookRequest.payload_hash == payload_hash)
.with_for_update(skip_locked=True)
)
existing = duplicate.scalar_one_or_none()
if existing:
if existing.status == 'completed':
# Already processed successfully
_LOGGER.info(
"Webhook already processed (webhook_id=%d, hotel=%s)",
existing.id,
webhook_endpoint.hotel_id
)
return {
"status": "success",
"message": "Webhook already processed",
"webhook_id": existing.id,
"duplicate": True,
}
elif existing.status == 'processing':
# Another worker is processing right now
_LOGGER.info(
"Webhook is being processed by another worker (webhook_id=%d)",
existing.id
)
return {
"status": "success",
"message": "Webhook is being processed",
"webhook_id": existing.id,
"duplicate": True,
}
elif existing.status == 'failed':
# Retry failed webhook
_LOGGER.info(
"Retrying failed webhook (webhook_id=%d, retry_count=%d)",
existing.id,
existing.retry_count
)
webhook_request = existing
webhook_request.retry_count += 1
webhook_request.status = 'processing'
webhook_request.processing_started_at = timestamp
else:
# 5. Create new webhook_request
webhook_request = WebhookRequest(
payload_hash=payload_hash,
webhook_endpoint_id=webhook_endpoint.id,
hotel_id=webhook_endpoint.hotel_id,
status='processing',
payload_json=payload,
payload_size_bytes=payload_size,
processing_started_at=timestamp,
created_at=timestamp,
source_ip=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
)
db_session.add(webhook_request)
await db_session.flush()
try:
# 6. Get processor for webhook_type
processor = webhook_registry.get_processor(webhook_endpoint.webhook_type)
if not processor:
raise ValueError(f"No processor for type: {webhook_endpoint.webhook_type}")
# 7. Process webhook
result = await processor.process(
payload=payload,
webhook_request=webhook_request,
hotel=webhook_endpoint.hotel,
db_session=db_session,
request=request,
)
# 8. Update status
webhook_request.status = 'completed'
webhook_request.processing_completed_at = datetime.now(UTC)
await db_session.commit()
return {
**result,
"webhook_id": webhook_request.id,
"hotel_id": webhook_endpoint.hotel_id,
}
except Exception as e:
_LOGGER.exception("Error processing webhook: %s", e)
webhook_request.status = 'failed'
webhook_request.last_error = str(e)[:2000]
webhook_request.processing_completed_at = datetime.now(UTC)
await db_session.commit()
raise HTTPException(status_code=500, detail="Error processing webhook")
@api_router.post("/webhook/wix-form")
@webhook_limiter.limit(WEBHOOK_RATE_LIMIT)
async def handle_wix_form(

View File

@@ -13,6 +13,7 @@ from sqlalchemy import (
Double,
ForeignKey,
ForeignKeyConstraint,
Index,
Integer,
String,
)
@@ -674,3 +675,114 @@ class ConversionRoom(Base):
# Relationships
conversion = relationship("Conversion", back_populates="conversion_rooms")
class Hotel(Base):
"""Hotel configuration (migrated from alpine_bits_auth in config.yaml)."""
__tablename__ = "hotels"
id = Column(Integer, primary_key=True)
# Core identification
hotel_id = Column(String(50), unique=True, nullable=False, index=True)
hotel_name = Column(String(200), nullable=False)
# AlpineBits authentication
username = Column(String(100), unique=True, nullable=False, index=True)
password_hash = Column(String(200), nullable=False) # bcrypt
# Advertising accounts
meta_account_id = Column(String(50), nullable=True)
google_account_id = Column(String(50), nullable=True)
# Push endpoint (optional)
push_endpoint_url = Column(String(500), nullable=True)
push_endpoint_token = Column(String(200), nullable=True)
push_endpoint_username = Column(String(100), nullable=True)
# Metadata
created_at = Column(DateTime(timezone=True), nullable=False)
updated_at = Column(DateTime(timezone=True), nullable=False)
is_active = Column(Boolean, default=True, nullable=False, index=True)
# Relationships
webhook_endpoints = relationship("WebhookEndpoint", back_populates="hotel")
class WebhookEndpoint(Base):
"""Webhook configurations per hotel (supports multiple webhook types per hotel)."""
__tablename__ = "webhook_endpoints"
id = Column(Integer, primary_key=True)
# Hotel association
hotel_id = Column(String(50), ForeignKey("hotels.hotel_id"), nullable=False, index=True)
# Webhook configuration
webhook_secret = Column(String(64), unique=True, nullable=False, index=True)
webhook_type = Column(String(50), nullable=False) # 'wix_form', 'generic', etc.
# Metadata
description = Column(String(200), nullable=True) # Human-readable label
is_enabled = Column(Boolean, default=True, nullable=False)
created_at = Column(DateTime(timezone=True), nullable=False)
# Relationships
hotel = relationship("Hotel", back_populates="webhook_endpoints")
webhook_requests = relationship("WebhookRequest", back_populates="webhook_endpoint")
__table_args__ = (
Index('idx_webhook_endpoint_hotel_type', 'hotel_id', 'webhook_type'),
)
class WebhookRequest(Base):
"""Tracks incoming webhooks for deduplication and retry handling."""
__tablename__ = "webhook_requests"
id = Column(Integer, primary_key=True)
# Request identification
payload_hash = Column(String(64), unique=True, nullable=False, index=True) # SHA256
webhook_endpoint_id = Column(Integer, ForeignKey("webhook_endpoints.id"), nullable=True, index=True)
hotel_id = Column(String(50), ForeignKey("hotels.hotel_id"), nullable=True, index=True)
# Processing tracking
status = Column(String(20), nullable=False, default='pending', index=True)
# Status values: 'pending', 'processing', 'completed', 'failed'
processing_started_at = Column(DateTime(timezone=True), nullable=True)
processing_completed_at = Column(DateTime(timezone=True), nullable=True)
# Retry handling
retry_count = Column(Integer, default=0)
last_error = Column(String(2000), nullable=True)
# Payload storage
payload_json = Column(JSON, nullable=True) # NULL after purge, kept for retries
payload_size_bytes = Column(Integer, nullable=True) # Track original size
purged_at = Column(DateTime(timezone=True), nullable=True) # When JSON was purged
# Metadata
created_at = Column(DateTime(timezone=True), nullable=False, index=True)
source_ip = Column(String(45), nullable=True)
user_agent = Column(String(500), nullable=True)
# Result tracking
created_customer_id = Column(Integer, ForeignKey("customers.id"), nullable=True)
created_reservation_id = Column(Integer, ForeignKey("reservations.id"), nullable=True)
# Relationships
webhook_endpoint = relationship("WebhookEndpoint", back_populates="webhook_requests")
hotel = relationship("Hotel")
customer = relationship("Customer")
reservation = relationship("Reservation")
__table_args__ = (
Index('idx_webhook_status_created', 'status', 'created_at'),
Index('idx_webhook_hotel_created', 'hotel_id', 'created_at'),
Index('idx_webhook_purge_candidate', 'status', 'purged_at', 'created_at'),
)

View File

@@ -251,6 +251,18 @@ async def run_startup_tasks(
config: Application configuration dictionary
engine: SQLAlchemy async engine (optional, for backfill tasks)
"""
# Sync config to database (hotels and webhook endpoints)
if config:
from .hotel_service import sync_config_to_database
async with sessionmaker() as session:
stats = await sync_config_to_database(session, config)
_LOGGER.info(
"Config sync: %d hotels created, %d updated, %d endpoints created",
stats["hotels_created"],
stats["hotels_updated"],
stats["endpoints_created"]
)
# Hash any existing customers that don't have hashed data
async with sessionmaker() as session:
customer_service = CustomerService(session)

View File

@@ -0,0 +1,246 @@
"""Hotel service for managing hotel configuration."""
import secrets
from datetime import UTC, datetime
from typing import Any
import bcrypt
from sqlalchemy import and_, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload
from .db import Hotel, WebhookEndpoint
from .logging_config import get_logger
_LOGGER = get_logger(__name__)
def hash_password(password: str) -> str:
"""Hash password using bcrypt.
Args:
password: Plain text password
Returns:
Bcrypt hashed password
"""
salt = bcrypt.gensalt(rounds=12)
return bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8')
def verify_password(password: str, password_hash: str) -> bool:
"""Verify password against bcrypt hash.
Args:
password: Plain text password
password_hash: Bcrypt hash to verify against
Returns:
True if password matches, False otherwise
"""
return bcrypt.checkpw(
password.encode('utf-8'),
password_hash.encode('utf-8')
)
def generate_webhook_secret() -> str:
"""Generate cryptographically secure webhook secret.
Returns:
64-character URL-safe random string
"""
return secrets.token_urlsafe(48) # 48 bytes = 64 URL-safe chars
async def sync_config_to_database(
db_session: AsyncSession,
config: dict[str, Any]
) -> dict[str, int]:
"""Sync alpine_bits_auth from config.yaml to database.
Creates/updates hotels and generates webhook_endpoints if missing.
Idempotent - safe to run on every startup.
Args:
db_session: Database session
config: Application configuration dict
Returns:
Statistics dict with counts of created/updated records
"""
stats = {"hotels_created": 0, "hotels_updated": 0, "endpoints_created": 0}
alpine_bits_auth = config.get("alpine_bits_auth", [])
if not alpine_bits_auth:
_LOGGER.info("No hotels found in alpine_bits_auth config")
return stats
for hotel_config in alpine_bits_auth:
hotel_id = hotel_config.get("hotel_id")
if not hotel_id:
_LOGGER.warning("Skipping hotel config without hotel_id: %s", hotel_config)
continue
# Check if hotel exists
result = await db_session.execute(
select(Hotel).where(Hotel.hotel_id == hotel_id)
)
hotel = result.scalar_one_or_none()
if not hotel:
# Create new hotel
password_hash = hash_password(hotel_config["password"])
hotel = Hotel(
hotel_id=hotel_id,
hotel_name=hotel_config.get("hotel_name", hotel_id),
username=hotel_config["username"],
password_hash=password_hash,
meta_account_id=hotel_config.get("meta_account"),
google_account_id=hotel_config.get("google_account"),
push_endpoint_url=hotel_config.get("push_endpoint", {}).get("url"),
push_endpoint_token=hotel_config.get("push_endpoint", {}).get("token"),
push_endpoint_username=hotel_config.get("push_endpoint", {}).get("username"),
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
is_active=True,
)
db_session.add(hotel)
await db_session.flush()
stats["hotels_created"] += 1
_LOGGER.info("Created hotel: %s", hotel_id)
else:
# Update existing hotel (config may have changed)
# Note: We do NOT update password_hash for security reasons
hotel.hotel_name = hotel_config.get("hotel_name", hotel_id)
hotel.meta_account_id = hotel_config.get("meta_account")
hotel.google_account_id = hotel_config.get("google_account")
push_endpoint = hotel_config.get("push_endpoint", {})
hotel.push_endpoint_url = push_endpoint.get("url")
hotel.push_endpoint_token = push_endpoint.get("token")
hotel.push_endpoint_username = push_endpoint.get("username")
hotel.updated_at = datetime.now(UTC)
stats["hotels_updated"] += 1
_LOGGER.debug("Updated hotel: %s", hotel_id)
# Ensure hotel has at least default webhook endpoints
result = await db_session.execute(
select(WebhookEndpoint).where(WebhookEndpoint.hotel_id == hotel_id)
)
existing_endpoints = result.scalars().all()
if not existing_endpoints:
# Create default webhook endpoints for backward compatibility
for webhook_type in ["wix_form", "generic"]:
webhook_secret = generate_webhook_secret()
endpoint = WebhookEndpoint(
hotel_id=hotel_id,
webhook_secret=webhook_secret,
webhook_type=webhook_type,
description=f"Auto-generated {webhook_type} endpoint",
is_enabled=True,
created_at=datetime.now(UTC),
)
db_session.add(endpoint)
stats["endpoints_created"] += 1
_LOGGER.info(
"Created webhook endpoint for hotel %s, type=%s, secret=%s",
hotel_id,
webhook_type,
webhook_secret
)
await db_session.commit()
_LOGGER.info(
"Config sync complete: %d hotels created, %d updated, %d endpoints created",
stats["hotels_created"],
stats["hotels_updated"],
stats["endpoints_created"]
)
return stats
class HotelService:
"""Service for hotel configuration access.
Always reads from database (synced from config at startup).
"""
def __init__(self, db_session: AsyncSession):
"""Initialize HotelService.
Args:
db_session: Database session
"""
self.db_session = db_session
async def get_hotel_by_id(self, hotel_id: str) -> Hotel | None:
"""Get hotel by hotel_id.
Args:
hotel_id: Hotel identifier
Returns:
Hotel instance or None if not found
"""
result = await self.db_session.execute(
select(Hotel)
.where(
and_(
Hotel.hotel_id == hotel_id,
Hotel.is_active == True
)
)
)
return result.scalar_one_or_none()
async def get_hotel_by_webhook_secret(
self,
webhook_secret: str
) -> tuple[Hotel, WebhookEndpoint] | tuple[None, None]:
"""Get hotel and webhook_endpoint by webhook_secret.
Args:
webhook_secret: Webhook secret string
Returns:
Tuple of (Hotel, WebhookEndpoint) or (None, None) if not found
"""
result = await self.db_session.execute(
select(WebhookEndpoint)
.where(
and_(
WebhookEndpoint.webhook_secret == webhook_secret,
WebhookEndpoint.is_enabled == True
)
)
.options(joinedload(WebhookEndpoint.hotel))
)
endpoint = result.scalar_one_or_none()
if endpoint and endpoint.hotel.is_active:
return endpoint.hotel, endpoint
return None, None
async def get_hotel_by_username(self, username: str) -> Hotel | None:
"""Get hotel by AlpineBits username.
Args:
username: AlpineBits username
Returns:
Hotel instance or None if not found
"""
result = await self.db_session.execute(
select(Hotel)
.where(
and_(
Hotel.username == username,
Hotel.is_active == True
)
)
)
return result.scalar_one_or_none()

View File

@@ -0,0 +1,433 @@
"""Webhook processor interface and implementations."""
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, Protocol
from fastapi import HTTPException, Request
from sqlalchemy.ext.asyncio import AsyncSession
from alpine_bits_python.api import _LOGGER, get_advertising_account_ids
from alpine_bits_python.auth import generate_unique_id
from alpine_bits_python.customer_service import CustomerService
from alpine_bits_python.reservation_service import ReservationService
from alpine_bits_python.schemas import ReservationData
from .db import Hotel, WebhookRequest
from .logging_config import get_logger
_LOGGER = get_logger(__name__)
class WebhookProcessorProtocol(Protocol):
"""Protocol for webhook processors."""
@property
def webhook_type(self) -> str:
"""Return webhook type identifier (e.g., 'wix_form', 'generic')."""
...
async def process(
self,
payload: dict[str, Any],
webhook_request: WebhookRequest,
hotel: Hotel,
db_session: AsyncSession,
request: Request,
) -> dict[str, Any]:
"""Process webhook payload.
Args:
payload: Parsed webhook payload
webhook_request: WebhookRequest database record
hotel: Hotel associated with this webhook
db_session: Database session
request: FastAPI Request object
Returns:
Response dict with status, message, customer_id, reservation_id
Raises:
HTTPException on processing errors
"""
...
class WebhookProcessorRegistry:
"""Registry for webhook processors."""
def __init__(self):
"""Initialize the registry."""
self._processors: dict[str, WebhookProcessorProtocol] = {}
def register(self, processor: WebhookProcessorProtocol) -> None:
"""Register a webhook processor.
Args:
processor: Processor instance to register
"""
self._processors[processor.webhook_type] = processor
_LOGGER.info("Registered webhook processor: %s", processor.webhook_type)
def get_processor(self, webhook_type: str) -> WebhookProcessorProtocol | None:
"""Get processor for webhook type.
Args:
webhook_type: Type of webhook to process
Returns:
Processor instance or None if not found
"""
return self._processors.get(webhook_type)
class WixFormProcessor:
"""Processor for Wix form webhooks."""
@property
def webhook_type(self) -> str:
"""Return webhook type identifier."""
return "wix_form"
async def process(
self,
payload: dict[str, Any],
webhook_request: WebhookRequest,
hotel: Hotel,
db_session: AsyncSession,
request: Request,
) -> dict[str, Any]:
"""Process Wix form webhook payload.
Args:
payload: Parsed webhook payload
webhook_request: WebhookRequest database record
hotel: Hotel associated with this webhook
db_session: Database session
request: FastAPI Request object
Returns:
Response dict with status and details
"""
# Import here to avoid circular dependency
from .api import process_wix_form_submission
# Call existing processing function
result = await process_wix_form_submission(request, payload, db_session)
# The existing function doesn't return customer/reservation IDs directly,
# but they would be in the database session. We'll need to extract them
# from the result or query after the fact. For now, return the result as-is.
return result
async def process_generic_webhook_submission(
request: Request, data: dict[str, Any], db
):
"""Process generic webhook submissions with nested structure.
Expected structure:
{
"hotel_data": {"hotelname": "...", "hotelcode": "..."},
"form_data": {
"sprache": "de/it/en",
"anreise": "DD.MM.YYYY",
"abreise": "DD.MM.YYYY",
"erwachsene": "N",
"kinder": "N",
"alter": {"1": "age1", "2": "age2", ...},
"anrede": "...",
"name": "...",
"nachname": "...",
"mail": "...",
"tel": "...",
"nachricht": "..."
},
"tracking_data": {
"utm_source": "...",
"utm_medium": "...",
"utm_campaign": "...",
"utm_content": "...",
"utm_term": "...",
"fbclid": "...",
"gclid": "..."
},
"timestamp": "ISO8601"
}
"""
timestamp = datetime.now().isoformat()
_LOGGER.info("Processing generic webhook submission at %s", timestamp)
# Extract nested data
hotel_data = data.get("hotel_data", {})
form_data = data.get("form_data", {})
tracking_data = data.get("tracking_data", {})
offer_data = data.get("unterkunftTyp", {})
selected_offers = []
if offer_data:
# grab keys and values. If value is "on" add the key not the value to a list of selected offers
offer_data: dict[str, str]
for key, value in offer_data.items():
if value == "on":
selected_offers.append(key)
selected_offers_str = ", ".join(selected_offers) if selected_offers else None
# Extract hotel information
hotel_code = hotel_data.get("hotelcode")
hotel_name = hotel_data.get("hotelname")
if not hotel_code:
_LOGGER.warning("No hotel_code provided in webhook data, using default")
hotel_code = request.app.state.config.get("default_hotel_code", "123")
if not hotel_name:
hotel_name = (
request.app.state.config.get("default_hotel_name") or "Frangart Inn"
)
# Extract customer information
first_name = form_data.get("name")
last_name = form_data.get("nachname")
email = form_data.get("mail")
phone_number = form_data.get("tel")
name_prefix = form_data.get("anrede")
language = form_data.get("sprache", "de")[:2]
user_comment = form_data.get("nachricht", "")
plz = form_data.get("plz", "")
city = form_data.get("stadt", "")
country = form_data.get("land", "")
# Parse dates - handle DD.MM.YYYY format
start_date_str = form_data.get("anreise")
end_date_str = form_data.get("abreise")
if not start_date_str or not end_date_str:
raise HTTPException(
status_code=400, detail="Missing required dates (anreise/abreise)"
)
try:
# Parse DD.MM.YYYY format using strptime
start_date = datetime.strptime(start_date_str, "%d.%m.%Y").date()
end_date = datetime.strptime(end_date_str, "%d.%m.%Y").date()
except ValueError as e:
_LOGGER.error(
"Error parsing dates: start=%s, end=%s, error=%s",
start_date_str,
end_date_str,
e,
)
raise HTTPException(status_code=400, detail=f"Invalid date format: {e}") from e
# Extract room/guest info
num_adults = int(form_data.get("erwachsene", 2))
num_children = int(form_data.get("kinder", 0))
# Extract children ages from nested structure
children_ages = []
if num_children > 0:
alter_data = form_data.get("alter", {})
for i in range(1, num_children + 1):
age_str = alter_data.get(str(i))
if age_str:
try:
children_ages.append(int(age_str))
except ValueError:
_LOGGER.warning("Invalid age value for child %d: %s", i, age_str)
# Extract tracking information
utm_source = None
utm_medium = None
utm_campaign = None
utm_term = None
utm_content = None
fbclid = None
gclid = None
if tracking_data:
utm_source = tracking_data.get("utm_source")
utm_medium = tracking_data.get("utm_medium")
utm_campaign = tracking_data.get("utm_campaign")
utm_term = tracking_data.get("utm_term")
utm_content = tracking_data.get("utm_content")
fbclid = tracking_data.get("fbclid")
gclid = tracking_data.get("gclid")
# Parse submission timestamp
submission_time = data.get("timestamp")
try:
if submission_time:
# Handle ISO8601 format with timezone
if submission_time.endswith("Z"):
submission_time = datetime.fromisoformat(submission_time[:-1])
elif "+" in submission_time:
# Remove timezone info (e.g., +02:00)
submission_time = datetime.fromisoformat(submission_time.split("+")[0])
else:
submission_time = datetime.fromisoformat(submission_time)
except Exception as e:
_LOGGER.exception("Error parsing submission timestamp: %s", e)
submission_time = None
# Generate unique ID
unique_id = generate_unique_id()
# Use CustomerService to handle customer creation/update with hashing
customer_service = CustomerService(db)
customer_data = {
"given_name": first_name,
"surname": last_name,
"contact_id": None,
"name_prefix": name_prefix if name_prefix != "--" else None,
"email_address": email,
"phone": phone_number if phone_number else None,
"email_newsletter": False,
"address_line": None,
"city_name": city if city else None,
"postal_code": plz if plz else None,
"country_code": country if country else None,
"gender": None,
"birth_date": None,
"language": language,
"address_catalog": False,
"name_title": None,
}
# Create/update customer
db_customer = await customer_service.get_or_create_customer(customer_data)
# Get advertising account IDs conditionally based on fbclid/gclid presence
meta_account_id, google_account_id = get_advertising_account_ids(
request.app.state.config, hotel_code, fbclid, gclid
)
# Create reservation
reservation_kwargs = {
"unique_id": unique_id,
"start_date": start_date,
"end_date": end_date,
"num_adults": num_adults,
"num_children": num_children,
"children_ages": children_ages,
"hotel_code": hotel_code,
"hotel_name": hotel_name,
"offer": selected_offers_str,
"utm_source": utm_source,
"utm_medium": utm_medium,
"utm_campaign": utm_campaign,
"utm_term": utm_term,
"utm_content": utm_content,
"user_comment": user_comment,
"fbclid": fbclid,
"gclid": gclid,
"meta_account_id": meta_account_id,
"google_account_id": google_account_id,
}
# Only include created_at if we have a valid submission_time
if submission_time:
reservation_kwargs["created_at"] = submission_time
reservation = ReservationData(**reservation_kwargs)
if reservation.md5_unique_id is None:
raise HTTPException(status_code=400, detail="Failed to generate md5_unique_id")
# Use ReservationService to create reservation
reservation_service = ReservationService(db)
db_reservation = await reservation_service.create_reservation(
reservation, db_customer.id
)
async def push_event():
# Fire event for listeners (push, etc.) - hotel-specific dispatch
dispatcher = getattr(request.app.state, "event_dispatcher", None)
if dispatcher:
# Get hotel_code from reservation to target the right listeners
hotel_code = getattr(db_reservation, "hotel_code", None)
if hotel_code and hotel_code.strip():
await dispatcher.dispatch_for_hotel(
"form_processed", hotel_code, db_customer, db_reservation
)
_LOGGER.info("Dispatched form_processed event for hotel %s", hotel_code)
else:
_LOGGER.warning(
"No hotel_code in reservation, skipping push notifications"
)
# Create task and store reference to prevent garbage collection
task = asyncio.create_task(push_event())
# Add done callback to log any exceptions
task.add_done_callback(lambda t: t.exception() if not t.cancelled() else None)
_LOGGER.info(
"Successfully processed generic webhook: customer_id=%s, reservation_id=%s",
db_customer.id,
db_reservation.id,
)
return {
"status": "success",
"message": "Generic webhook data processed successfully",
"customer_id": db_customer.id,
"reservation_id": db_reservation.id,
"timestamp": timestamp,
}
class GenericWebhookProcessor:
"""Processor for generic webhooks."""
@property
def webhook_type(self) -> str:
"""Return webhook type identifier."""
return "generic"
async def process(
self,
payload: dict[str, Any],
webhook_request: WebhookRequest,
hotel: Hotel,
db_session: AsyncSession,
request: Request,
) -> dict[str, Any]:
"""Process generic webhook payload.
Args:
payload: Parsed webhook payload
webhook_request: WebhookRequest database record
hotel: Hotel associated with this webhook
db_session: Database session
request: FastAPI Request object
Returns:
Response dict with status and details
"""
# Call existing processing function
result = await process_generic_webhook_submission(request, payload, db_session)
return result
# Global registry instance
webhook_registry = WebhookProcessorRegistry()
def initialize_webhook_processors() -> None:
"""Initialize and register all webhook processors.
This should be called during application startup.
"""
# Register built-in processors
webhook_registry.register(WixFormProcessor())
webhook_registry.register(GenericWebhookProcessor())
_LOGGER.info("Webhook processors initialized")