Files
alpinebits_python/src/alpine_bits_python/api.py
2025-12-03 17:59:30 +01:00

1620 lines
55 KiB
Python

"""API endpoints for the form-data and the alpinebits server."""
import asyncio
import gzip
import json
import multiprocessing
import os
import urllib.parse
import xml.dom.minidom
from collections import defaultdict
from contextlib import asynccontextmanager
from datetime import UTC, datetime, timedelta
from functools import partial
from pathlib import Path
from typing import Any
import httpx
from fast_langdetect import detect
from fastapi import APIRouter, BackgroundTasks, Depends, FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, Response
from fastapi.security import (
HTTPAuthorizationCredentials,
HTTPBasic,
HTTPBasicCredentials,
HTTPBearer,
)
from pydantic import BaseModel
from slowapi.errors import RateLimitExceeded
from sqlalchemy import and_, select, update
from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession
from sqlalchemy.orm import selectinload
from alpine_bits_python.hotel_service import HotelService
from alpine_bits_python.schemas import WebhookRequestData
from .alpinebits_server import (
AlpineBitsActionName,
AlpineBitsClientInfo,
AlpineBitsServer,
Version,
)
from .auth import validate_api_key
from .config_loader import get_username_for_hotel, load_config
from .const import HttpStatusCode, WebhookStatus
from .conversion_service import ConversionService
from .csv_import import CSVImporter
from .db import Customer as DBCustomer
from .db import (
Hotel,
ResilientAsyncSession,
SessionMaker,
WebhookEndpoint,
WebhookRequest,
create_database_engine,
)
from .db import Reservation as DBReservation
from .db_setup import run_startup_tasks
from .email_monitoring import ReservationStatsCollector
from .email_service import create_email_service
from .logging_config import get_logger, setup_logging
from .pushover_service import create_pushover_service
from .rate_limit import (
BURST_RATE_LIMIT,
DEFAULT_RATE_LIMIT,
WEBHOOK_RATE_LIMIT,
custom_rate_limit_handler,
limiter,
webhook_limiter,
)
from .webhook_processor import webhook_registry
from .worker_coordination import is_primary_worker
# Configure logging - will be reconfigured during lifespan with actual config
_LOGGER = get_logger(__name__)
# HTTP Basic auth for AlpineBits
security_basic = HTTPBasic()
# HTTP Bearer auth for API endpoints
security_bearer = HTTPBearer()
# Constants for token sanitization
TOKEN_LOG_LENGTH = 10
# Pydantic models for language detection
class LanguageDetectionRequest(BaseModel):
text: str
class LanguageDetectionResponse(BaseModel):
language_code: str
score: float
# --- Enhanced event dispatcher with hotel-specific routing ---
class EventDispatcher:
"""Simple event dispatcher for AlpineBits push requests."""
def __init__(self):
self.listeners = defaultdict(list)
self.hotel_listeners = defaultdict(list) # hotel_code -> list of listeners
def register(self, event_name, func):
self.listeners[event_name].append(func)
def register_hotel_listener(self, event_name, hotel_code, func):
"""Register a listener for a specific hotel."""
self.hotel_listeners[f"{event_name}:{hotel_code}"].append(func)
async def dispatch(self, event_name, *args, **kwargs):
for func in self.listeners[event_name]:
await func(*args, **kwargs)
async def dispatch_for_hotel(self, event_name, hotel_code, *args, **kwargs):
"""Dispatch event only to listeners registered for specific hotel."""
key = f"{event_name}:{hotel_code}"
for func in self.hotel_listeners[key]:
await func(*args, **kwargs)
event_dispatcher = EventDispatcher()
# Load config at startup
async def push_listener(customer: DBCustomer, reservation: DBReservation, hotel):
"""Push listener that sends reservation data to hotel's push endpoint.
Only called for reservations that match this hotel's hotel_id.
"""
push_endpoint = hotel.get("push_endpoint")
if not push_endpoint:
_LOGGER.warning(
"No push endpoint configured for hotel %s", hotel.get("hotel_id")
)
return
server: AlpineBitsServer = app.state.alpine_bits_server
hotel_id = hotel["hotel_id"]
reservation_hotel_id = reservation.hotel_id
# Double-check hotel matching (should be guaranteed by dispatcher)
if hotel_id != reservation_hotel_id:
_LOGGER.warning(
"Hotel ID mismatch: listener for %s, reservation for %s",
hotel_id,
reservation_hotel_id,
)
return
_LOGGER.info(
"Processing push notification for hotel %s, reservation %s",
hotel_id,
reservation.unique_id,
)
# Prepare payload for push notification
request = await server.handle_request(
request_action_name=AlpineBitsActionName.OTA_HOTEL_RES_NOTIF_GUEST_REQUESTS.request_name,
request_xml=(reservation, customer),
client_info=None,
version=Version.V2024_10,
)
if request.status_code != HttpStatusCode.OK:
_LOGGER.error(
"Failed to generate push request for hotel %s, reservation %s: %s",
hotel_id,
reservation.unique_id,
request.xml_content,
)
return
# save push request to file
logs_dir = Path("logs/push_requests")
if not logs_dir.exists():
logs_dir.mkdir(parents=True, mode=0o755, exist_ok=True)
stat_info = os.stat(logs_dir)
_LOGGER.info(
"Created directory owner: uid:%s, gid:%s",
stat_info.st_uid,
stat_info.st_gid,
)
_LOGGER.info("Directory mode: %s", oct(stat_info.st_mode)[-3:])
log_filename = f"{logs_dir}/alpinebits_push_{hotel_id}_{reservation.unique_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xml"
with open(log_filename, "w", encoding="utf-8") as f:
f.write(request.xml_content)
return
headers = (
{"Authorization": f"Bearer {push_endpoint.get('token', '')}"}
if push_endpoint.get("token")
else {}
)
""
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
push_endpoint["url"], json=payload, headers=headers, timeout=10
)
_LOGGER.info(
"Push event fired to %s for hotel %s, status: %s",
push_endpoint["url"],
hotel["hotel_id"],
resp.status_code,
)
if resp.status_code not in [200, 201, 202]:
_LOGGER.warning(
"Push endpoint returned non-success status %s: %s",
resp.status_code,
resp.text,
)
except Exception as e:
_LOGGER.exception("Push event failed for hotel %s: %s", hotel["hotel_id"], e)
# Optionally implement retry logic here
async def cleanup_stale_webhooks(
async_sessionmaker: async_sessionmaker, timeout_minutes: int = 10
) -> int:
"""Reset webhooks stuck in 'processing' (worker crashed).
Args:
async_sessionmaker: SQLAlchemy async sessionmaker
timeout_minutes: Timeout threshold in minutes
Returns:
Number of stale webhooks reset
"""
timeout_threshold = datetime.now(UTC) - timedelta(minutes=timeout_minutes)
async with async_sessionmaker() as session:
result = await session.execute(
update(WebhookRequest)
.where(
and_(
WebhookRequest.status == WebhookStatus.PROCESSING,
WebhookRequest.processing_started_at < timeout_threshold,
)
)
.values(
status=WebhookStatus.FAILED,
last_error="Processing timeout - worker may have crashed",
)
)
await session.commit()
count = result.rowcount
if count > 0:
_LOGGER.warning("Reset %d stale webhooks to 'failed'", count)
return count
async def purge_old_webhook_payloads(
async_sessionmaker: async_sessionmaker, retention_days: int = 7
) -> int:
"""Purge payload_json from old completed webhooks.
Keeps metadata for history but removes large JSON payload.
Args:
async_sessionmaker: SQLAlchemy async sessionmaker
retention_days: Days to retain payloads before purging
Returns:
Number of payloads purged
"""
cutoff_date = datetime.now(UTC) - timedelta(days=retention_days)
async with async_sessionmaker() as session:
result = await session.execute(
update(WebhookRequest)
.where(
and_(
WebhookRequest.status == WebhookStatus.COMPLETED,
WebhookRequest.created_at < cutoff_date,
WebhookRequest.purged_at.is_(None), # Not already purged
)
)
.values(payload_json=None, purged_at=datetime.now(UTC))
)
await session.commit()
count = result.rowcount
if count > 0:
_LOGGER.info("Purged payloads from %d old webhook requests", count)
return count
async def periodic_webhook_cleanup(async_sessionmaker: async_sessionmaker):
"""Run periodic cleanup tasks for webhooks.
This should be scheduled to run every 5-10 minutes.
Args:
async_sessionmaker: SQLAlchemy async sessionmaker
"""
try:
# Clean up stale webhooks (stuck in 'processing')
stale_count = await cleanup_stale_webhooks(async_sessionmaker)
# Purge old webhook payloads (older than 7 days)
purged_count = await purge_old_webhook_payloads(async_sessionmaker)
_LOGGER.debug(
"Webhook cleanup: %d stale reset, %d payloads purged",
stale_count,
purged_count,
)
except Exception as e:
_LOGGER.exception("Error during periodic webhook cleanup: %s", e)
@asynccontextmanager
async def lifespan(app: FastAPI):
# Setup DB
# Determine if this is the primary worker using file-based locking
# Only primary runs schedulers/background tasks
# In multi-worker setups, only one worker should run singleton services
is_primary, worker_lock = is_primary_worker()
_LOGGER.info(
"Worker startup: process=%s, pid=%d, primary=%s",
multiprocessing.current_process().name,
os.getpid(),
is_primary,
)
try:
config = load_config()
except Exception:
_LOGGER.exception("Failed to load config: ")
config = {}
# Get event loop for email monitoring
loop = asyncio.get_running_loop()
# Initialize email service (before logging setup so it can be used by handlers)
email_service = create_email_service(config)
# Initialize pushover service
pushover_service = create_pushover_service(config)
# Setup logging from config with unified notification monitoring
# Only primary worker should have the report scheduler running
alert_handler, report_scheduler = setup_logging(
config, email_service, pushover_service, loop, enable_scheduler=is_primary
)
_LOGGER.info("Application startup initiated (primary_worker=%s)", is_primary)
# Create database engine with schema support
engine = create_database_engine(config=config, echo=False)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
# Create resilient session wrapper for automatic connection recovery
resilient_session = ResilientAsyncSession(AsyncSessionLocal, engine)
# Create SessionMaker for concurrent processing
session_maker = SessionMaker(AsyncSessionLocal)
app.state.engine = engine
app.state.async_sessionmaker = AsyncSessionLocal
app.state.resilient_session = resilient_session
app.state.session_maker = session_maker
app.state.config = config
app.state.alpine_bits_server = AlpineBitsServer(config)
app.state.event_dispatcher = event_dispatcher
app.state.email_service = email_service
app.state.pushover_service = pushover_service
app.state.alert_handler = alert_handler
app.state.report_scheduler = report_scheduler
# Initialize webhook processors
from .webhook_processor import initialize_webhook_processors
initialize_webhook_processors()
_LOGGER.info("Webhook processors initialized")
# Register push listeners for hotels with push_endpoint
for hotel in config.get("alpine_bits_auth", []):
push_endpoint = hotel.get("push_endpoint")
hotel_id = hotel.get("hotel_id")
if push_endpoint and hotel_id:
# Register hotel-specific listener
event_dispatcher.register_hotel_listener(
"form_processed", hotel_id, partial(push_listener, hotel=hotel)
)
_LOGGER.info(
"Registered push listener for hotel %s with endpoint %s",
hotel_id,
push_endpoint.get("url"),
)
elif push_endpoint and not hotel_id:
_LOGGER.warning("Hotel has push_endpoint but no hotel_id: %s", hotel)
elif hotel_id and not push_endpoint:
_LOGGER.info("Hotel %s has no push_endpoint configured", hotel_id)
# Run startup tasks (only in primary worker to avoid race conditions)
# NOTE: Database migrations should already have been run before the app started
# via run_migrations.py or `uv run alembic upgrade head`
if is_primary:
_LOGGER.info("Running startup tasks (primary worker)...")
await run_startup_tasks(AsyncSessionLocal, config, engine)
_LOGGER.info("Startup tasks completed")
else:
_LOGGER.info("Skipping startup tasks (non-primary worker)")
# Initialize and hook up stats collector for daily reports
# Note: report_scheduler will only exist on the primary worker
if report_scheduler:
stats_collector = ReservationStatsCollector(
async_sessionmaker=AsyncSessionLocal,
config=config,
)
# Hook up the stats collector to the report scheduler
report_scheduler.set_stats_collector(stats_collector.collect_stats)
_LOGGER.info("Stats collector initialized and hooked up to report scheduler")
# Start daily report scheduler
report_scheduler.start()
_LOGGER.info("Daily report scheduler started")
# Start periodic webhook cleanup (only on primary worker)
cleanup_task = None
if is_primary:
async def run_periodic_cleanup():
"""Run cleanup tasks every 5 minutes."""
while True:
try:
await asyncio.sleep(300) # 5 minutes
await periodic_webhook_cleanup(AsyncSessionLocal)
except asyncio.CancelledError:
_LOGGER.info("Webhook cleanup task cancelled")
break
except Exception as e:
_LOGGER.exception("Error in periodic webhook cleanup: %s", e)
cleanup_task = asyncio.create_task(run_periodic_cleanup())
_LOGGER.info("Webhook periodic cleanup task started")
_LOGGER.info("Application startup complete")
yield
# Cleanup on shutdown
_LOGGER.info("Application shutdown initiated")
# Stop webhook cleanup task
if cleanup_task:
cleanup_task.cancel()
try:
await cleanup_task
except asyncio.CancelledError:
pass
_LOGGER.info("Webhook cleanup task stopped")
# Stop daily report scheduler
if report_scheduler:
report_scheduler.stop()
_LOGGER.info("Daily report scheduler stopped")
# Close alert handler (flush any remaining errors)
if alert_handler:
alert_handler.close()
_LOGGER.info("Alert handler closed")
# Shutdown email service thread pool
if email_service:
email_service.shutdown()
_LOGGER.info("Email service shut down")
# Dispose engine
await engine.dispose()
_LOGGER.info("Application shutdown complete")
# Release worker lock if this was the primary worker
if worker_lock:
worker_lock.release()
async def get_async_session(request: Request):
"""Get a database session with automatic connection recovery.
This dependency provides an async session that will automatically
retry on connection errors, disposing the pool and reconnecting.
"""
async_sessionmaker = request.app.state.async_sessionmaker
async with async_sessionmaker() as session:
yield session
def get_session_maker(request: Request) -> SessionMaker:
"""Get the SessionMaker for creating independent database sessions.
This dependency provides a SessionMaker that can be used to create
multiple independent sessions for concurrent processing tasks.
Useful for processing large datasets concurrently where each task
needs its own database transaction context.
"""
return request.app.state.session_maker
def get_resilient_session(request: Request) -> ResilientAsyncSession:
"""Get the resilient session manager from app state.
This provides access to the ResilientAsyncSession for use in handlers
that need retry capability on connection errors.
"""
return request.app.state.resilient_session
app = FastAPI(
title="Wix Form Handler API",
description="Secure API endpoint to receive and process Wix form submissions with authentication and rate limiting",
version="1.0.0",
lifespan=lifespan,
)
# Create API router with /api prefix
api_router = APIRouter(prefix="/api", tags=["api"])
# Add rate limiting
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, custom_rate_limit_handler)
# Add CORS middleware to allow requests from Wix
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://*.wix.com",
"https://*.wixstatic.com",
"http://localhost:3000", # For development
"http://localhost:8000", # For local testing
],
allow_credentials=True,
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
@api_router.get("/")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def root(request: Request):
"""Health check endpoint."""
return {
"message": "Wix Form Handler API is running",
"timestamp": datetime.now().isoformat(),
"status": "healthy",
"authentication": "required",
"rate_limits": {
"default": DEFAULT_RATE_LIMIT,
"webhook": WEBHOOK_RATE_LIMIT,
"burst": BURST_RATE_LIMIT,
},
}
@api_router.get("/health")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def health_check(request: Request):
"""Detailed health check."""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"service": "wix-form-handler",
"version": "1.0.0",
"authentication": "enabled",
"rate_limiting": "enabled",
}
@api_router.post("/detect-language", response_model=LanguageDetectionResponse)
@limiter.limit(DEFAULT_RATE_LIMIT)
async def detect_language(
request: Request,
data: LanguageDetectionRequest,
credentials: HTTPAuthorizationCredentials = Depends(security_bearer),
):
"""Detect language of text, restricted to Italian or German.
Requires Bearer token authentication.
Returns the most likely language (it or de) with confidence score.
"""
# Validate bearer token
token = credentials.credentials
config = request.app.state.config
# Check if token is valid
valid_tokens = config.get("api_tokens", [])
# If no tokens configured, reject authentication
if not valid_tokens:
_LOGGER.error("No api_tokens configured in config.yaml")
raise HTTPException(
status_code=401,
detail="Authentication token not configured on server",
)
if token not in valid_tokens:
# Log sanitized token (first TOKEN_LOG_LENGTH chars) for security
sanitized_token = (
token[:TOKEN_LOG_LENGTH] + "..." if len(token) > TOKEN_LOG_LENGTH else token
)
_LOGGER.warning("Invalid token attempt: %s", sanitized_token)
raise HTTPException(
status_code=401,
detail="Invalid authentication token",
)
try:
# Detect language with k=2 to get top 2 candidates
results = detect(data.text, k=2)
_LOGGER.info("Language detection results: %s", results)
# Filter for Italian (it) or German (de)
italian_german_results = [r for r in results if r.get("lang") in ["it", "de"]]
if italian_german_results:
# Return the best match between Italian and German
best_match = italian_german_results[0]
return_value = "Italienisch" if best_match["lang"] == "it" else "Deutsch"
return LanguageDetectionResponse(
language_code=return_value, score=best_match.get("score", 0.0)
)
# If neither Italian nor German detected in top 2, check all results
all_results = detect(data.text, k=10)
italian_german_all = [r for r in all_results if r.get("lang") in ["it", "de"]]
if italian_german_all:
best_match = italian_german_all[0]
return_value = "Italienisch" if best_match["lang"] == "it" else "Deutsch"
return LanguageDetectionResponse(
language_code=return_value, score=best_match.get("score", 0.0)
)
# Default to German if no clear detection
_LOGGER.warning(
"Could not detect Italian or German in text: %s, defaulting to 'de'",
data.text[:100],
)
return LanguageDetectionResponse(language_code="Deutsch", score=0.0)
except Exception as e:
_LOGGER.exception("Error detecting language: %s", e)
raise HTTPException(status_code=500, detail=f"Error detecting language: {e!s}")
async def validate_basic_auth(
credentials: HTTPBasicCredentials = Depends(security_basic),
) -> str:
"""Validate basic authentication for AlpineBits protocol.
Returns username if valid, raises HTTPException if not.
"""
# Accept any username/password pair present in config['alpine_bits_auth']
if not credentials.username or not credentials.password:
raise HTTPException(
status_code=401,
detail="ERROR: Authentication required",
headers={"WWW-Authenticate": "Basic"},
)
valid = False
config = app.state.config
for entry in config["alpine_bits_auth"]:
if (
credentials.username == entry["username"]
and credentials.password == entry["password"]
):
valid = True
break
if not valid:
raise HTTPException(
status_code=401,
detail="ERROR: Invalid credentials",
headers={"WWW-Authenticate": "Basic"},
)
_LOGGER.info(
"AlpineBits authentication successful for user: %s (from config)",
credentials.username,
)
return credentials.username, credentials.password
@api_router.post("/webhook/{webhook_secret}")
@webhook_limiter.limit(WEBHOOK_RATE_LIMIT)
async def handle_webhook_unified(
request: Request,
webhook_secret: str,
db_session: AsyncSession = Depends(get_async_session),
):
"""Unified webhook handler with deduplication and routing.
Supports both new secure webhook URLs and legacy endpoints:
- /webhook/{64-char-secret} - New secure endpoints
- /webhook/wix-form - Legacy Wix form endpoint (extracts hotel from payload)
- /webhook/generic - Legacy generic webhook endpoint (extracts hotel from payload)
Flow:
1. Look up webhook_endpoint by webhook_secret (or detect legacy endpoint)
2. Parse and hash payload (SHA256)
3. Check for duplicate using SELECT FOR UPDATE SKIP LOCKED
4. If duplicate and completed: return success (idempotent)
5. If duplicate and processing: return success (concurrent request)
6. Create or update webhook_request with status='processing'
7. Route to appropriate processor based on webhook_endpoint.webhook_type
8. Update status to 'completed' or 'failed'
9. Return response
"""
timestamp = datetime.now(UTC)
# 2. Parse payload first (needed for legacy endpoint detection)
body = await request.body()
# Handle gzip compression
if request.headers.get("content-encoding", "").lower() == "gzip":
try:
body = gzip.decompress(body)
except Exception as e:
_LOGGER.error("Failed to decompress gzip payload: %s", e)
raise HTTPException(status_code=400, detail="Invalid gzip compression")
try:
payload = json.loads(body.decode("utf-8"))
except Exception as e:
_LOGGER.error("Failed to parse JSON payload: %s", e)
raise HTTPException(status_code=400, detail="Invalid JSON payload")
# 1. Detect if this is a legacy endpoint or look up webhook_endpoint
webhook_endpoint: WebhookEndpoint | None = None
is_legacy = False
webhook_type = None
hotel_id_from_payload = None
# Check if webhook_secret looks like a legacy endpoint name
if webhook_secret in ("wix-form", "generic"):
is_legacy = True
webhook_type = "wix_form" if webhook_secret == "wix-form" else "generic"
# Extract hotel_id from payload based on webhook type
if webhook_type == "wix_form":
# Wix forms: field:hotelid or use default
hotel_id_from_payload = (
payload.get("data", {}).get("field:hotelid")
if isinstance(payload.get("data"), dict)
else payload.get("field:hotelid")
)
if not hotel_id_from_payload:
hotel_id_from_payload = request.app.state.config.get(
"default_hotel_code", "123"
)
_LOGGER.info(
"Legacy wix-form endpoint: using default hotel_code=%s",
hotel_id_from_payload,
)
elif webhook_type == "generic":
# Generic webhooks: hotel_data.hotelcode or use default
hotel_data = payload.get("hotel_data", {})
hotel_id_from_payload = hotel_data.get("hotelcode")
if not hotel_id_from_payload:
hotel_id_from_payload = request.app.state.config.get(
"default_hotel_code", "123"
)
_LOGGER.info(
"Legacy generic endpoint: using default hotel_code=%s",
hotel_id_from_payload,
)
_LOGGER.info(
"Legacy endpoint detected: %s, webhook_type=%s, hotel_id=%s",
webhook_secret,
webhook_type,
hotel_id_from_payload,
)
# Look up the webhook endpoint for this hotel and type
result = await db_session.execute(
select(WebhookEndpoint)
.where(
and_(
WebhookEndpoint.hotel_id == hotel_id_from_payload,
WebhookEndpoint.webhook_type == webhook_type,
WebhookEndpoint.is_enabled == True,
)
)
.options(selectinload(WebhookEndpoint.hotel))
)
webhook_endpoint = result.scalar_one_or_none()
if not webhook_endpoint:
_LOGGER.error(
"No webhook endpoint found for legacy endpoint: hotel_id=%s, type=%s",
hotel_id_from_payload,
webhook_type,
)
raise HTTPException(
status_code=404,
detail=f"No webhook configuration found for hotel {hotel_id_from_payload}",
)
else:
# New secure endpoint - look up by webhook_secret
result = await db_session.execute(
select(WebhookEndpoint)
.where(
and_(
WebhookEndpoint.webhook_secret == webhook_secret,
WebhookEndpoint.is_enabled == True,
)
)
.options(selectinload(WebhookEndpoint.hotel))
)
webhook_endpoint = result.scalar_one_or_none()
if not webhook_endpoint:
raise HTTPException(status_code=404, detail="Webhook not found")
webhook_endpoint_id = webhook_endpoint.id
webhook_hotel_id = webhook_endpoint.hotel_id
# Verify hotel is active
if not webhook_endpoint.hotel.is_active:
raise HTTPException(status_code=404, detail="Hotel is not active")
# 3. Track payload metadata with canonical hashing handled by WebhookRequestData
payload_size = len(body)
# Check payload size limit (10MB)
if payload_size > 10 * 1024 * 1024:
_LOGGER.error("Payload too large: %d bytes", payload_size)
raise HTTPException(status_code=413, detail="Payload too large (max 10MB)")
webhook_request_data = WebhookRequestData(
payload_json=payload,
webhook_endpoint_id=webhook_endpoint_id,
hotel_id=webhook_hotel_id,
status=WebhookStatus.PROCESSING,
processing_started_at=timestamp,
created_at=timestamp,
source_ip=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
)
payload_hash = webhook_request_data.payload_hash
# 4. Check for duplicate with row-level locking
duplicate = await db_session.execute(
select(WebhookRequest)
.where(WebhookRequest.payload_hash == payload_hash)
.with_for_update(skip_locked=True)
)
existing = duplicate.scalar_one_or_none()
if existing:
if existing.status == WebhookStatus.COMPLETED:
# Already processed successfully
_LOGGER.info(
"Webhook already processed (webhook_id=%d, hotel=%s)",
existing.id,
webhook_endpoint.hotel_id,
)
return {
"status": "success",
"message": "Webhook already processed",
"webhook_id": existing.id,
"duplicate": True,
}
if existing.status == WebhookStatus.PROCESSING:
# Another worker is processing right now
_LOGGER.info(
"Webhook is being processed by another worker (webhook_id=%d)",
existing.id,
)
return {
"status": "success",
"message": "Webhook is being processed",
"webhook_id": existing.id,
"duplicate": True,
}
if existing.status == WebhookStatus.FAILED:
# Retry failed webhook
_LOGGER.info(
"Retrying failed webhook (webhook_id=%d, retry_count=%d)",
existing.id,
existing.retry_count,
)
webhook_request = existing
webhook_request.retry_count += 1
webhook_request.status = WebhookStatus.PROCESSING
webhook_request.processing_started_at = timestamp
else:
# 5. Create new webhook_request from validated data
webhook_request = WebhookRequest(**webhook_request_data.model_dump())
db_session.add(webhook_request)
await db_session.flush()
webhook_request_id = webhook_request.id
try:
# 6. Get processor for webhook_type
processor = webhook_registry.get_processor(webhook_endpoint.webhook_type)
if not processor:
raise ValueError(f"No processor for type: {webhook_endpoint.webhook_type}")
# Persist the webhook row before handing off to processors
await db_session.commit()
# 7. Process webhook with simplified interface
result = await processor.process(
webhook_request=webhook_request,
db_session=db_session,
config=request.app.state.config,
event_dispatcher=request.app.state.event_dispatcher,
)
if not db_session.in_transaction():
await db_session.begin()
completion_values = {
"status": WebhookStatus.COMPLETED,
"processing_completed_at": datetime.now(UTC),
}
if isinstance(result, dict):
created_customer_id = result.get("customer_id")
created_reservation_id = result.get("reservation_id")
if created_customer_id:
completion_values["created_customer_id"] = created_customer_id
if created_reservation_id:
completion_values["created_reservation_id"] = created_reservation_id
await db_session.execute(
update(WebhookRequest)
.where(WebhookRequest.id == webhook_request_id)
.values(**completion_values)
)
await db_session.commit()
return {
**result,
"webhook_id": webhook_request_id,
"hotel_id": webhook_hotel_id,
}
except Exception as e:
_LOGGER.exception("Error processing webhook: %s", e)
await db_session.rollback()
if not db_session.in_transaction():
await db_session.begin()
await db_session.execute(
update(WebhookRequest)
.where(WebhookRequest.id == webhook_request_id)
.values(
status=WebhookStatus.FAILED,
last_error=str(e)[:2000],
processing_completed_at=datetime.now(UTC),
)
)
await db_session.commit()
raise HTTPException(status_code=500, detail="Error processing webhook")
async def _process_csv_import_background(
csv_content: str,
filename: str,
hotel_code: str,
session_maker: SessionMaker,
config: dict[str, Any],
log_filename: Path,
):
"""Background task to process CSV import with automatic acknowledgement.
This runs in a separate asyncio task after the HTTP response is sent.
Handles both file saving and database processing.
All imported reservations are automatically acknowledged using the username
associated with the hotel_code from the config.
Args:
csv_content: CSV content as string
filename: Original filename
hotel_code: Hotel code (mandatory) - used to get username for acknowledgements
session_maker: SessionMaker for creating database sessions
config: Application configuration
log_filename: Path to save the CSV file
"""
try:
# First, save the CSV file (in background)
await asyncio.to_thread(log_filename.write_text, csv_content, encoding="utf-8")
_LOGGER.debug("CSV file saved to %s", log_filename)
# Now process the CSV import
_LOGGER.info("Starting database processing of %s", filename)
# Get username for acknowledgements from config
username = get_username_for_hotel(config, hotel_code)
# Create a new session for this background task
db_session = await session_maker.create_session()
try:
importer = CSVImporter(db_session, config)
# Import with pre-acknowledgement enabled
stats = await importer.import_csv_file(
str(log_filename),
hotel_code,
dryrun=False,
pre_acknowledge=True,
client_id=hotel_code,
username=username,
)
_LOGGER.info("CSV import complete for %s: %s", filename, stats)
finally:
await db_session.close()
except Exception:
_LOGGER.exception("Error processing CSV import in background for %s", filename)
@api_router.put("/admin/import-csv/{hotel_code}/{filename:path}")
@limiter.limit(BURST_RATE_LIMIT)
async def import_csv_endpoint(
request: Request,
background_tasks: BackgroundTasks,
hotel_code: str,
filename: str,
credentials: tuple = Depends(validate_basic_auth),
db_session=Depends(get_async_session),
session_maker: SessionMaker = Depends(get_session_maker),
):
"""Import reservations from CSV data sent via PUT request.
This endpoint allows importing historical form data into the system.
It creates customers and reservations, avoiding duplicates based on:
- Name, email, reservation dates
- fbclid/gclid tracking IDs
Returns immediately with 202 Accepted while processing continues in background.
All imported reservations are automatically acknowledged using the username
associated with the hotel_code in the config.
Requires basic authentication and saves CSV files to log directory.
Supports gzip compression via Content-Encoding header.
Args:
hotel_code: Hotel code (mandatory) - used to get username for acknowledgements
filename: Name for the CSV file (used for logging)
credentials: Basic auth credentials
Example: PUT /api/admin/import-csv/39054_001/reservations.csv
"""
try:
# Validate filename to prevent path traversal
if ".." in filename or filename.startswith("/"):
raise HTTPException(status_code=400, detail="ERROR: Invalid filename")
# Get the raw body content
body = await request.body()
if not body:
raise HTTPException(
status_code=400, detail="ERROR: No CSV content provided"
)
# Check if content is gzip compressed
content_encoding = request.headers.get("content-encoding", "").lower()
is_gzipped = content_encoding == "gzip"
# Decompress if gzipped
if is_gzipped:
try:
body = gzip.decompress(body)
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"ERROR: Failed to decompress gzip content: {e}",
) from e
# Try to decode as UTF-8
try:
csv_content = body.decode("utf-8")
except UnicodeDecodeError:
# If UTF-8 fails, try with latin-1 as fallback
csv_content = body.decode("latin-1")
# Basic validation that it looks like CSV
if not csv_content.strip():
raise HTTPException(status_code=400, detail="ERROR: CSV content is empty")
# Create logs directory for CSV imports (blocking, but fast)
logs_dir = Path("logs/csv_imports")
logs_dir.mkdir(parents=True, mode=0o755, exist_ok=True)
# Generate filename with timestamp and authenticated user
username, _ = credentials
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
base_filename = Path(filename).stem
extension = Path(filename).suffix or ".csv"
log_filename = logs_dir / f"{base_filename}_{username}_{timestamp}{extension}"
_LOGGER.info(
"CSV file queued for processing: %s by user %s (original: %s)",
log_filename,
username,
filename,
)
# Schedule background processing using FastAPI's BackgroundTasks
# This handles both file saving AND database processing
# This ensures the response is sent immediately
background_tasks.add_task(
_process_csv_import_background,
csv_content,
filename,
hotel_code,
session_maker,
request.app.state.config,
log_filename,
)
response_headers = {
"Content-Type": "application/json; charset=utf-8",
}
# Return immediate acknowledgment
return Response(
content=json.dumps(
{
"status": "accepted",
"message": "CSV file received and queued for processing",
"filename": filename,
"timestamp": datetime.now().isoformat(),
}
),
headers=response_headers,
status_code=202,
)
except HTTPException:
raise
except Exception:
_LOGGER.exception("Error in import_csv_endpoint")
raise HTTPException(status_code=500, detail="Error processing CSV upload")
async def _process_conversion_xml_background(
xml_content: str,
filename: str,
session_maker: SessionMaker,
log_filename: Path,
hotel: Hotel,
):
"""Background task to process conversion XML.
This runs in a separate asyncio task after the HTTP response is sent.
Handles both file prettification and database processing.
"""
try:
# First, prettify and save the XML file (in background)
try:
dom = xml.dom.minidom.parseString(xml_content)
pretty_xml = dom.toprettyxml(indent=" ")
# Remove extra blank lines that toprettyxml adds
pretty_xml = "\n".join(
[line for line in pretty_xml.split("\n") if line.strip()]
)
await asyncio.to_thread(
log_filename.write_text, pretty_xml, encoding="utf-8"
)
_LOGGER.debug("XML file prettified and saved to %s", log_filename)
except Exception as e:
# If formatting fails, save the original content
_LOGGER.warning("Failed to format XML: %s. Saving unformatted.", str(e))
await asyncio.to_thread(
log_filename.write_text, xml_content, encoding="utf-8"
)
# Now process the conversion XML
_LOGGER.info("Starting database processing of %s", filename)
conversion_service = ConversionService(session_maker, hotel.hotel_id)
processing_stats = await conversion_service.process_conversion_xml(xml_content, run_full_guest_matching=True)
await conversion_service.classify_regular_guests(24)
_LOGGER.info(
"Conversion processing complete for %s: %s", filename, processing_stats
)
except Exception:
_LOGGER.exception(
"Error processing conversion XML in background for %s", filename
)
@api_router.put("/hoteldata/conversions_import/{filename:path}")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def handle_xml_upload(
request: Request,
background_tasks: BackgroundTasks,
filename: str,
credentials_tupel: tuple = Depends(validate_basic_auth),
db_session=Depends(get_async_session),
session_maker: SessionMaker = Depends(get_session_maker),
):
"""Endpoint for receiving XML files for conversion processing via PUT.
Processes conversion data from hotel PMS:
- Parses reservation and daily sales XML data
- Matches to existing reservations using truncated tracking IDs (fbclid/gclid)
- Links conversions to customers and hashed_customers
- Stores daily sales revenue data
Returns immediately with 202 Accepted while processing continues in background.
Requires basic authentication and saves XML files to log directory.
Supports gzip compression via Content-Encoding header.
Example: PUT /api/hoteldata/conversions_import/Reservierungen.xml
"""
try:
# Validate filename to prevent path traversal
if ".." in filename or filename.startswith("/"):
raise HTTPException(status_code=400, detail="ERROR: Invalid filename")
# Get the raw body content
body = await request.body()
if not body:
raise HTTPException(
status_code=400, detail="ERROR: No XML content provided"
)
# Check if content is gzip compressed
content_encoding = request.headers.get("content-encoding", "").lower()
is_gzipped = content_encoding == "gzip"
# Decompress if gzipped
if is_gzipped:
try:
body = gzip.decompress(body)
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"ERROR: Failed to decompress gzip content: {e}",
) from e
# Try to decode as UTF-8
try:
xml_content = body.decode("utf-8")
except UnicodeDecodeError:
# If UTF-8 fails, try with latin-1 as fallback
xml_content = body.decode("latin-1")
# Basic validation that it's XML-like
if not xml_content.strip().startswith("<"):
raise HTTPException(
status_code=400, detail="ERROR: Content does not appear to be XML"
)
# Create logs directory for XML conversions (blocking, but fast)
logs_dir = Path("logs/conversions_import")
logs_dir.mkdir(parents=True, mode=0o755, exist_ok=True)
# Generate filename with timestamp and authenticated user
username, _ = credentials_tupel
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
base_filename = Path(filename).stem
extension = Path(filename).suffix or ".xml"
log_filename = logs_dir / f"{base_filename}_{username}_{timestamp}{extension}"
hotel_service = HotelService(db_session)
hotel = await hotel_service.get_hotel_by_username(username)
_LOGGER.info(
"XML file queued for processing: %s by user %s (original: %s)",
log_filename,
username,
filename,
)
# Schedule background processing using FastAPI's BackgroundTasks
# This handles both file prettification/saving AND database processing
# This ensures the response is sent immediately
background_tasks.add_task(
_process_conversion_xml_background,
xml_content,
filename,
session_maker,
log_filename,
hotel,
)
response_headers = {
"Content-Type": "application/xml; charset=utf-8",
"X-AlpineBits-Server-Accept-Encoding": "gzip",
}
# Return immediate acknowledgment
response_content = f"""<?xml version="1.0" encoding="UTF-8"?>
<response>
<status>accepted</status>
<message>XML file received and queued for processing</message>
<filename>{filename}</filename>
<timestamp>{datetime.now().isoformat()}</timestamp>
</response>"""
return Response(
content=response_content, headers=response_headers, status_code=202
)
except HTTPException:
raise
except Exception:
_LOGGER.exception("Error in handle_xml_upload")
raise HTTPException(status_code=500, detail="Error processing XML upload")
# TODO Bit sketchy. May need requests-toolkit in the future
def parse_multipart_data(content_type: str, body: bytes) -> dict[str, Any]:
"""Parse multipart/form-data from raw request body.
This is a simplified parser for the AlpineBits use case.
"""
if "multipart/form-data" not in content_type:
raise HTTPException(
status_code=400, detail="ERROR: Content-Type must be multipart/form-data"
)
# Extract boundary
boundary = None
for part in content_type.split(";"):
part = part.strip()
if part.startswith("boundary="):
boundary = part.split("=", 1)[1].strip('"')
break
if not boundary:
raise HTTPException(
status_code=400, detail="ERROR: Missing boundary in multipart/form-data"
)
# Simple multipart parsing
parts = body.split(f"--{boundary}".encode())
data = {}
for part in parts:
if not part.strip() or part.strip() == b"--":
continue
# Split headers and content
if b"\r\n\r\n" in part:
headers_section, content = part.split(b"\r\n\r\n", 1)
content = content.rstrip(b"\r\n")
# Parse Content-Disposition header
headers = headers_section.decode("utf-8", errors="ignore")
name = None
for line in headers.split("\n"):
if "Content-Disposition" in line and "name=" in line:
# Extract name parameter
for param in line.split(";"):
param = param.strip()
if param.startswith("name="):
name = param.split("=", 1)[1].strip('"')
break
if name:
# Handle file uploads or text content
if content.startswith(b"<"):
# Likely XML content
data[name] = content.decode("utf-8", errors="ignore")
else:
data[name] = content.decode("utf-8", errors="ignore")
return data
@api_router.post("/alpinebits/server-2024-10")
@limiter.limit("60/minute")
async def alpinebits_server_handshake(
request: Request,
credentials_tupel: tuple = Depends(validate_basic_auth),
dbsession=Depends(get_async_session),
):
"""AlpineBits server endpoint implementing the handshake protocol.
This endpoint handles:
- Protocol version negotiation via X-AlpineBits-ClientProtocolVersion header
- Client identification via X-AlpineBits-ClientID header (optional)
- Multipart/form-data parsing for action and request parameters
- Gzip compression support
- Proper error handling with HTTP status codes
- Handshaking action processing
Authentication: HTTP Basic Auth required
Content-Type: multipart/form-data
Compression: gzip supported (check X-AlpineBits-Server-Accept-Encoding)
"""
try:
# Check required headers
client_protocol_version = request.headers.get(
"X-AlpineBits-ClientProtocolVersion"
)
if not client_protocol_version:
# Server concludes client speaks a protocol version preceding 2013-04
client_protocol_version = "pre-2013-04"
_LOGGER.info(
"No X-AlpineBits-ClientProtocolVersion header found, assuming pre-2013-04"
)
else:
_LOGGER.info("Client protocol version: %s", client_protocol_version)
# Optional client ID
client_id = request.headers.get("X-AlpineBits-ClientID")
if client_id:
_LOGGER.info("Client ID: %s", client_id)
# Check content encoding
content_encoding = request.headers.get("Content-Encoding")
is_compressed = content_encoding == "gzip"
if is_compressed:
_LOGGER.info("Request is gzip compressed")
# Get content type before processing
content_type = request.headers.get("Content-Type", "")
_LOGGER.info("Content-Type: %s", content_type)
_LOGGER.info("Content-Encoding: %s", content_encoding)
# Get request body
body = await request.body()
# Decompress if needed
form_data = validate_alpinebits_body(is_compressed, content_type, body)
# Check for required action parameter
action = form_data.get("action")
if not action:
raise HTTPException(
status_code=400, detail="ERROR: Missing required 'action' parameter"
)
_LOGGER.info("AlpineBits action: %s", action)
# Get optional request XML
request_xml = form_data.get("request")
server: AlpineBitsServer = app.state.alpine_bits_server
version = Version.V2024_10
username, password = credentials_tupel
client_info = AlpineBitsClientInfo(
username=username, password=password, client_id=client_id
)
# Create successful handshake response
response = await server.handle_request(
action,
request_xml,
client_info=client_info,
version=version,
dbsession=dbsession,
)
response_xml = response.xml_content
# Set response headers indicating server capabilities
headers = {
"Content-Type": "application/xml; charset=utf-8",
"X-AlpineBits-Server-Accept-Encoding": "gzip", # Indicate gzip support
"X-AlpineBits-Server-Version": "2024-10",
}
if is_compressed:
# Compress response if client sent compressed request
response_xml = gzip.compress(response_xml.encode("utf-8"))
headers["Content-Encoding"] = "gzip"
return Response(
content=response_xml, status_code=response.status_code, headers=headers
)
except HTTPException:
# Re-raise HTTP exceptions (auth errors, etc.)
raise
except Exception as e:
_LOGGER.exception("Error in AlpineBits handshake: %s", e)
raise HTTPException(status_code=500, detail="Internal server error")
def validate_alpinebits_body(is_compressed, content_type, body):
"""Check if the body conforms to AlpineBits expectations."""
if is_compressed:
try:
body = gzip.decompress(body)
except Exception:
raise HTTPException(
status_code=400,
detail="ERROR: Failed to decompress gzip content",
)
# Check content type (after decompression)
if (
"multipart/form-data" not in content_type
and "application/x-www-form-urlencoded" not in content_type
):
raise HTTPException(
status_code=400,
detail="ERROR: Content-Type must be multipart/form-data or application/x-www-form-urlencoded",
)
# Parse multipart data
if "multipart/form-data" in content_type:
try:
form_data = parse_multipart_data(content_type, body)
except Exception:
raise HTTPException(
status_code=400,
detail="ERROR: Failed to parse multipart/form-data",
)
elif "application/x-www-form-urlencoded" in content_type:
# Parse as urlencoded
form_data = dict(urllib.parse.parse_qsl(body.decode("utf-8")))
else:
raise HTTPException(
status_code=400,
detail="ERROR: Content-Type must be multipart/form-data or application/x-www-form-urlencoded",
)
return form_data
@api_router.get("/admin/stats")
@limiter.limit("10/minute")
async def get_api_stats(request: Request, admin_key: str = Depends(validate_api_key)):
"""Admin endpoint to get API usage statistics.
Requires admin API key.
"""
if admin_key != "admin-key":
raise HTTPException(status_code=403, detail="Admin access required")
# In a real application, you'd fetch this from your database/monitoring system
return {
"status": "success",
"stats": {
"uptime": "Available in production deployment",
"total_requests": "Available with monitoring setup",
"active_api_keys": len([k for k in ["wix-webhook-key", "admin-key"] if k]),
"rate_limit_backend": "redis" if os.getenv("REDIS_URL") else "memory",
},
"timestamp": datetime.now().isoformat(),
}
# Include the API router in the main app
app.include_router(api_router)
@app.get("/", response_class=HTMLResponse)
async def landing_page():
"""Serve the under construction landing page at the root route."""
try:
# Get the path to the HTML file
html_path = os.path.join(os.path.dirname(__file__), "templates", "index.html")
with open(html_path, encoding="utf-8") as f:
html_content = f.read()
return HTMLResponse(content=html_content, status_code=200)
except FileNotFoundError:
# Fallback if HTML file is not found
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>99tales - Under Construction</title>
<style>
body { font-family: Arial, sans-serif; text-align: center; padding: 50px; }
h1 { color: #333; }
</style>
</head>
<body>
<h1>🏗️ 99tales</h1>
<h2>Under Construction</h2>
<p>We're working hard to bring you something amazing!</p>
<p><a href="/api">API Documentation</a></p>
</body>
</html>
"""
return HTMLResponse(content=html_content, status_code=200)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)