Files
alpinebits_python/src/alpine_bits_python/api.py
2025-11-20 23:01:04 +01:00

1899 lines
65 KiB
Python

"""API endpoints for the form-data and the alpinebits server."""
import asyncio
import gzip
import json
import multiprocessing
import os
import traceback
import urllib.parse
import xml.dom.minidom
from collections import defaultdict
from datetime import date, datetime
from functools import partial
from pathlib import Path
from typing import Any
import httpx
from fast_langdetect import detect
from fastapi import APIRouter, BackgroundTasks, Depends, FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, Response
from fastapi.security import (
HTTPAuthorizationCredentials,
HTTPBasic,
HTTPBasicCredentials,
HTTPBearer,
)
from pydantic import BaseModel
from slowapi.errors import RateLimitExceeded
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import async_sessionmaker
from alpine_bits_python.schemas import ReservationData
from .alpinebits_server import (
AlpineBitsActionName,
AlpineBitsClientInfo,
AlpineBitsServer,
Version,
)
from .auth import generate_unique_id, validate_api_key
from .config_loader import load_config, get_username_for_hotel
from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT, HttpStatusCode
from .conversion_service import ConversionService
from .csv_import import CSVImporter
from .customer_service import CustomerService
from .db import Customer as DBCustomer
from .db import Reservation as DBReservation
from .db import ResilientAsyncSession, SessionMaker, create_database_engine
from .db_setup import run_startup_tasks
from .email_monitoring import ReservationStatsCollector
from .email_service import create_email_service
from .logging_config import get_logger, setup_logging
from .pushover_service import create_pushover_service
from .rate_limit import (
BURST_RATE_LIMIT,
DEFAULT_RATE_LIMIT,
WEBHOOK_RATE_LIMIT,
custom_rate_limit_handler,
limiter,
webhook_limiter,
)
from .reservation_service import ReservationService
from .worker_coordination import is_primary_worker
# Configure logging - will be reconfigured during lifespan with actual config
_LOGGER = get_logger(__name__)
# HTTP Basic auth for AlpineBits
security_basic = HTTPBasic()
# HTTP Bearer auth for API endpoints
security_bearer = HTTPBearer()
# Constants for token sanitization
TOKEN_LOG_LENGTH = 10
def get_advertising_account_ids(
config: dict[str, Any], hotel_code: str, fbclid: str | None, gclid: str | None
) -> tuple[str | None, str | None]:
"""Get advertising account IDs based on hotel config and click IDs.
Args:
config: Application configuration dict
hotel_code: Hotel identifier to look up in config
fbclid: Facebook click ID (if present, meta_account_id will be returned)
gclid: Google click ID (if present, google_account_id will be returned)
Returns:
Tuple of (meta_account_id, google_account_id) based on conditional logic:
- meta_account_id is set only if fbclid is present AND hotel has
meta_account configured
- google_account_id is set only if gclid is present AND hotel has
google_account configured
"""
meta_account_id = None
google_account_id = None
# Look up hotel in config
alpine_bits_auth = config.get("alpine_bits_auth", [])
for hotel in alpine_bits_auth:
if hotel.get(CONF_HOTEL_ID) == hotel_code:
# Conditionally set meta_account_id if fbclid is present
if fbclid:
meta_account_id = hotel.get(CONF_META_ACCOUNT)
# Conditionally set google_account_id if gclid is present
if gclid:
google_account_id = hotel.get(CONF_GOOGLE_ACCOUNT)
break
return meta_account_id, google_account_id
# Pydantic models for language detection
class LanguageDetectionRequest(BaseModel):
text: str
class LanguageDetectionResponse(BaseModel):
language_code: str
score: float
# --- Enhanced event dispatcher with hotel-specific routing ---
class EventDispatcher:
"""Simple event dispatcher for AlpineBits push requests."""
def __init__(self):
self.listeners = defaultdict(list)
self.hotel_listeners = defaultdict(list) # hotel_code -> list of listeners
def register(self, event_name, func):
self.listeners[event_name].append(func)
def register_hotel_listener(self, event_name, hotel_code, func):
"""Register a listener for a specific hotel."""
self.hotel_listeners[f"{event_name}:{hotel_code}"].append(func)
async def dispatch(self, event_name, *args, **kwargs):
for func in self.listeners[event_name]:
await func(*args, **kwargs)
async def dispatch_for_hotel(self, event_name, hotel_code, *args, **kwargs):
"""Dispatch event only to listeners registered for specific hotel."""
key = f"{event_name}:{hotel_code}"
for func in self.hotel_listeners[key]:
await func(*args, **kwargs)
event_dispatcher = EventDispatcher()
# Load config at startup
async def push_listener(customer: DBCustomer, reservation: DBReservation, hotel):
"""Push listener that sends reservation data to hotel's push endpoint.
Only called for reservations that match this hotel's hotel_id.
"""
push_endpoint = hotel.get("push_endpoint")
if not push_endpoint:
_LOGGER.warning(
"No push endpoint configured for hotel %s", hotel.get("hotel_id")
)
return
server: AlpineBitsServer = app.state.alpine_bits_server
hotel_id = hotel["hotel_id"]
reservation_hotel_id = reservation.hotel_code
# Double-check hotel matching (should be guaranteed by dispatcher)
if hotel_id != reservation_hotel_id:
_LOGGER.warning(
"Hotel ID mismatch: listener for %s, reservation for %s",
hotel_id,
reservation_hotel_id,
)
return
_LOGGER.info(
"Processing push notification for hotel %s, reservation %s",
hotel_id,
reservation.unique_id,
)
# Prepare payload for push notification
request = await server.handle_request(
request_action_name=AlpineBitsActionName.OTA_HOTEL_RES_NOTIF_GUEST_REQUESTS.request_name,
request_xml=(reservation, customer),
client_info=None,
version=Version.V2024_10,
)
if request.status_code != HttpStatusCode.OK:
_LOGGER.error(
"Failed to generate push request for hotel %s, reservation %s: %s",
hotel_id,
reservation.unique_id,
request.xml_content,
)
return
# save push request to file
logs_dir = Path("logs/push_requests")
if not logs_dir.exists():
logs_dir.mkdir(parents=True, mode=0o755, exist_ok=True)
stat_info = os.stat(logs_dir)
_LOGGER.info(
"Created directory owner: uid:%s, gid:%s",
stat_info.st_uid,
stat_info.st_gid,
)
_LOGGER.info("Directory mode: %s", oct(stat_info.st_mode)[-3:])
log_filename = f"{logs_dir}/alpinebits_push_{hotel_id}_{reservation.unique_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xml"
with open(log_filename, "w", encoding="utf-8") as f:
f.write(request.xml_content)
return
headers = (
{"Authorization": f"Bearer {push_endpoint.get('token', '')}"}
if push_endpoint.get("token")
else {}
)
""
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
push_endpoint["url"], json=payload, headers=headers, timeout=10
)
_LOGGER.info(
"Push event fired to %s for hotel %s, status: %s",
push_endpoint["url"],
hotel["hotel_id"],
resp.status_code,
)
if resp.status_code not in [200, 201, 202]:
_LOGGER.warning(
"Push endpoint returned non-success status %s: %s",
resp.status_code,
resp.text,
)
except Exception as e:
_LOGGER.exception("Push event failed for hotel %s: %s", hotel["hotel_id"], e)
# Optionally implement retry logic here@asynccontextmanager
async def lifespan(app: FastAPI):
# Setup DB
# Determine if this is the primary worker using file-based locking
# Only primary runs schedulers/background tasks
# In multi-worker setups, only one worker should run singleton services
is_primary, worker_lock = is_primary_worker()
_LOGGER.info(
"Worker startup: process=%s, pid=%d, primary=%s",
multiprocessing.current_process().name,
os.getpid(),
is_primary,
)
try:
config = load_config()
except Exception:
_LOGGER.exception("Failed to load config: ")
config = {}
# Get event loop for email monitoring
loop = asyncio.get_running_loop()
# Initialize email service (before logging setup so it can be used by handlers)
email_service = create_email_service(config)
# Initialize pushover service
pushover_service = create_pushover_service(config)
# Setup logging from config with unified notification monitoring
# Only primary worker should have the report scheduler running
alert_handler, report_scheduler = setup_logging(
config, email_service, pushover_service, loop, enable_scheduler=is_primary
)
_LOGGER.info("Application startup initiated (primary_worker=%s)", is_primary)
# Create database engine with schema support
engine = create_database_engine(config=config, echo=False)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
# Create resilient session wrapper for automatic connection recovery
resilient_session = ResilientAsyncSession(AsyncSessionLocal, engine)
# Create SessionMaker for concurrent processing
session_maker = SessionMaker(AsyncSessionLocal)
app.state.engine = engine
app.state.async_sessionmaker = AsyncSessionLocal
app.state.resilient_session = resilient_session
app.state.session_maker = session_maker
app.state.config = config
app.state.alpine_bits_server = AlpineBitsServer(config)
app.state.event_dispatcher = event_dispatcher
app.state.email_service = email_service
app.state.pushover_service = pushover_service
app.state.alert_handler = alert_handler
app.state.report_scheduler = report_scheduler
# Register push listeners for hotels with push_endpoint
for hotel in config.get("alpine_bits_auth", []):
push_endpoint = hotel.get("push_endpoint")
hotel_id = hotel.get("hotel_id")
if push_endpoint and hotel_id:
# Register hotel-specific listener
event_dispatcher.register_hotel_listener(
"form_processed", hotel_id, partial(push_listener, hotel=hotel)
)
_LOGGER.info(
"Registered push listener for hotel %s with endpoint %s",
hotel_id,
push_endpoint.get("url"),
)
elif push_endpoint and not hotel_id:
_LOGGER.warning("Hotel has push_endpoint but no hotel_id: %s", hotel)
elif hotel_id and not push_endpoint:
_LOGGER.info("Hotel %s has no push_endpoint configured", hotel_id)
# Run startup tasks (only in primary worker to avoid race conditions)
# NOTE: Database migrations should already have been run before the app started
# via run_migrations.py or `uv run alembic upgrade head`
if is_primary:
_LOGGER.info("Running startup tasks (primary worker)...")
await run_startup_tasks(AsyncSessionLocal, config, engine)
_LOGGER.info("Startup tasks completed")
else:
_LOGGER.info("Skipping startup tasks (non-primary worker)")
# Initialize and hook up stats collector for daily reports
# Note: report_scheduler will only exist on the primary worker
if report_scheduler:
stats_collector = ReservationStatsCollector(
async_sessionmaker=AsyncSessionLocal,
config=config,
)
# Hook up the stats collector to the report scheduler
report_scheduler.set_stats_collector(stats_collector.collect_stats)
_LOGGER.info("Stats collector initialized and hooked up to report scheduler")
# Start daily report scheduler
report_scheduler.start()
_LOGGER.info("Daily report scheduler started")
_LOGGER.info("Application startup complete")
yield
# Cleanup on shutdown
_LOGGER.info("Application shutdown initiated")
# Stop daily report scheduler
if report_scheduler:
report_scheduler.stop()
_LOGGER.info("Daily report scheduler stopped")
# Close alert handler (flush any remaining errors)
if alert_handler:
alert_handler.close()
_LOGGER.info("Alert handler closed")
# Shutdown email service thread pool
if email_service:
email_service.shutdown()
_LOGGER.info("Email service shut down")
# Dispose engine
await engine.dispose()
_LOGGER.info("Application shutdown complete")
# Release worker lock if this was the primary worker
if worker_lock:
worker_lock.release()
async def get_async_session(request: Request):
"""Get a database session with automatic connection recovery.
This dependency provides an async session that will automatically
retry on connection errors, disposing the pool and reconnecting.
"""
async_sessionmaker = request.app.state.async_sessionmaker
async with async_sessionmaker() as session:
yield session
def get_session_maker(request: Request) -> SessionMaker:
"""Get the SessionMaker for creating independent database sessions.
This dependency provides a SessionMaker that can be used to create
multiple independent sessions for concurrent processing tasks.
Useful for processing large datasets concurrently where each task
needs its own database transaction context.
"""
return request.app.state.session_maker
def get_resilient_session(request: Request) -> ResilientAsyncSession:
"""Get the resilient session manager from app state.
This provides access to the ResilientAsyncSession for use in handlers
that need retry capability on connection errors.
"""
return request.app.state.resilient_session
app = FastAPI(
title="Wix Form Handler API",
description="Secure API endpoint to receive and process Wix form submissions with authentication and rate limiting",
version="1.0.0",
lifespan=lifespan,
)
# Create API router with /api prefix
api_router = APIRouter(prefix="/api", tags=["api"])
# Add rate limiting
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, custom_rate_limit_handler)
# Add CORS middleware to allow requests from Wix
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://*.wix.com",
"https://*.wixstatic.com",
"http://localhost:3000", # For development
"http://localhost:8000", # For local testing
],
allow_credentials=True,
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
@api_router.get("/")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def root(request: Request):
"""Health check endpoint."""
return {
"message": "Wix Form Handler API is running",
"timestamp": datetime.now().isoformat(),
"status": "healthy",
"authentication": "required",
"rate_limits": {
"default": DEFAULT_RATE_LIMIT,
"webhook": WEBHOOK_RATE_LIMIT,
"burst": BURST_RATE_LIMIT,
},
}
@api_router.get("/health")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def health_check(request: Request):
"""Detailed health check."""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"service": "wix-form-handler",
"version": "1.0.0",
"authentication": "enabled",
"rate_limiting": "enabled",
}
@api_router.post("/detect-language", response_model=LanguageDetectionResponse)
@limiter.limit(DEFAULT_RATE_LIMIT)
async def detect_language(
request: Request,
data: LanguageDetectionRequest,
credentials: HTTPAuthorizationCredentials = Depends(security_bearer),
):
"""Detect language of text, restricted to Italian or German.
Requires Bearer token authentication.
Returns the most likely language (it or de) with confidence score.
"""
# Validate bearer token
token = credentials.credentials
config = request.app.state.config
# Check if token is valid
valid_tokens = config.get("api_tokens", [])
# If no tokens configured, reject authentication
if not valid_tokens:
_LOGGER.error("No api_tokens configured in config.yaml")
raise HTTPException(
status_code=401,
detail="Authentication token not configured on server",
)
if token not in valid_tokens:
# Log sanitized token (first TOKEN_LOG_LENGTH chars) for security
sanitized_token = (
token[:TOKEN_LOG_LENGTH] + "..." if len(token) > TOKEN_LOG_LENGTH else token
)
_LOGGER.warning("Invalid token attempt: %s", sanitized_token)
raise HTTPException(
status_code=401,
detail="Invalid authentication token",
)
try:
# Detect language with k=2 to get top 2 candidates
results = detect(data.text, k=2)
_LOGGER.info("Language detection results: %s", results)
# Filter for Italian (it) or German (de)
italian_german_results = [r for r in results if r.get("lang") in ["it", "de"]]
if italian_german_results:
# Return the best match between Italian and German
best_match = italian_german_results[0]
return_value = "Italienisch" if best_match["lang"] == "it" else "Deutsch"
return LanguageDetectionResponse(
language_code=return_value, score=best_match.get("score", 0.0)
)
# If neither Italian nor German detected in top 2, check all results
all_results = detect(data.text, k=10)
italian_german_all = [r for r in all_results if r.get("lang") in ["it", "de"]]
if italian_german_all:
best_match = italian_german_all[0]
return_value = "Italienisch" if best_match["lang"] == "it" else "Deutsch"
return LanguageDetectionResponse(
language_code=return_value, score=best_match.get("score", 0.0)
)
# Default to German if no clear detection
_LOGGER.warning(
"Could not detect Italian or German in text: %s, defaulting to 'de'",
data.text[:100],
)
return LanguageDetectionResponse(language_code="Deutsch", score=0.0)
except Exception as e:
_LOGGER.exception("Error detecting language: %s", e)
raise HTTPException(status_code=500, detail=f"Error detecting language: {e!s}")
# Extracted business logic for handling Wix form submissions
async def process_wix_form_submission(request: Request, data: dict[str, Any], db):
"""Shared business logic for handling Wix form submissions (test and production)."""
timestamp = datetime.now().isoformat()
_LOGGER.info("Received Wix form data at %s", timestamp)
data = data.get("data") # Handle nested "data" key if present
# save customer and reservation to DB
contact_info = data.get("contact", {})
first_name = contact_info.get("name", {}).get("first")
last_name = contact_info.get("name", {}).get("last")
email = contact_info.get("email")
phone_number = contact_info.get("phones", [{}])[0].get("e164Phone")
contact_info.get("locale", "de-de")
contact_id = contact_info.get("contactId")
name_prefix = data.get("field:anrede")
email_newsletter = data.get("field:form_field_5a7b", False)
# if email_newsletter is a string, attempt to convert to boolean, else false
if isinstance(email_newsletter, str):
email_newsletter = email_newsletter.lower() in [
"yes",
"true",
"1",
"on",
"selezionato",
"angekreuzt",
]
address_line = None
city_name = None
postal_code = None
country_code = None
gender = None
birth_date = None
language = data.get("contact", {}).get("locale", "en")[:2]
# Dates
start_date = (
data.get("field:date_picker_a7c8")
or data.get("Anreisedatum")
or data.get("submissions", [{}])[1].get("value")
)
end_date = (
data.get("field:date_picker_7e65")
or data.get("Abreisedatum")
or data.get("submissions", [{}])[2].get("value")
)
# Room/guest info
num_adults = int(data.get("field:number_7cf5") or 1)
num_children = int(data.get("field:anzahl_kinder") or 0)
children_ages = []
if num_children > 0:
# Collect all child age fields, then take only the first num_children
# This handles form updates that may send extra padded/zero fields
temp_ages = []
for k in data:
if k.startswith("field:alter_kind_"):
if data[k] is None or data[k] == "":
continue
try:
age = int(data[k])
temp_ages.append(age)
except ValueError:
_LOGGER.warning("Invalid age value for %s: %s", k, data[k])
# Only keep the first num_children ages, regardless of their values
children_ages = temp_ages[:num_children]
offer = data.get("field:angebot_auswaehlen")
# get submissionId and ensure max length 35. Generate one if not present
unique_id = data.get("submissionId", generate_unique_id())
# Use CustomerService to handle customer creation/update with hashing
customer_service = CustomerService(db)
customer_data = {
"given_name": first_name,
"surname": last_name,
"contact_id": contact_id,
"name_prefix": name_prefix,
"email_address": email,
"phone": phone_number,
"email_newsletter": email_newsletter,
"address_line": address_line,
"city_name": city_name,
"postal_code": postal_code,
"country_code": country_code,
"gender": gender,
"birth_date": birth_date,
"language": language,
"address_catalog": False,
"name_title": None,
}
# This automatically creates/updates both Customer and HashedCustomer
db_customer = await customer_service.get_or_create_customer(customer_data)
# Determine hotel_code and hotel_name
# Priority: 1) Form field, 2) Configuration default, 3) Hardcoded fallback
hotel_code = data.get("field:hotelid", None)
if hotel_code is None:
_LOGGER.warning("No hotel_code provided in form data, using default")
hotel_code = request.app.state.config.get("default_hotel_code", "123")
hotel_name = (
data.get("field:hotelname")
or data.get("hotelname")
or request.app.state.config.get("default_hotel_name")
or "Frangart Inn" # fallback
)
submissionTime = data.get("submissionTime") # 2025-10-07T05:48:41.855Z
try:
if submissionTime:
submissionTime = datetime.fromisoformat(
submissionTime[:-1]
) # Remove Z and convert
except Exception as e:
_LOGGER.exception("Error parsing submissionTime: %s", e)
submissionTime = None
# Extract fbclid and gclid for conditional account ID lookup
fbclid = data.get("field:fbclid")
gclid = data.get("field:gclid")
# Get advertising account IDs conditionally based on fbclid/gclid presence
meta_account_id, google_account_id = get_advertising_account_ids(
request.app.state.config, hotel_code, fbclid, gclid
)
reservation = ReservationData(
unique_id=unique_id,
start_date=date.fromisoformat(start_date),
end_date=date.fromisoformat(end_date),
num_adults=num_adults,
num_children=num_children,
children_ages=children_ages,
hotel_code=hotel_code,
hotel_name=hotel_name,
offer=offer,
created_at=submissionTime,
utm_source=data.get("field:utm_source"),
utm_medium=data.get("field:utm_medium"),
utm_campaign=data.get("field:utm_campaign"),
utm_term=data.get("field:utm_term"),
utm_content=data.get("field:utm_content"),
user_comment=data.get("field:long_answer_3524", ""),
fbclid=fbclid,
gclid=gclid,
meta_account_id=meta_account_id,
google_account_id=google_account_id,
)
if reservation.md5_unique_id is None:
raise HTTPException(status_code=400, detail="Failed to generate md5_unique_id")
# Use ReservationService to create reservation
reservation_service = ReservationService(db)
db_reservation = await reservation_service.create_reservation(
reservation, db_customer.id
)
async def push_event():
# Fire event for listeners (push, etc.) - hotel-specific dispatch
dispatcher = getattr(request.app.state, "event_dispatcher", None)
if dispatcher:
# Get hotel_code from reservation to target the right listeners
hotel_code = getattr(db_reservation, "hotel_code", None)
if hotel_code and hotel_code.strip():
await dispatcher.dispatch_for_hotel(
"form_processed", hotel_code, db_customer, db_reservation
)
_LOGGER.info("Dispatched form_processed event for hotel %s", hotel_code)
else:
_LOGGER.warning(
"No hotel_code in reservation, skipping push notifications"
)
# Create task and store reference to prevent it from being garbage collected
# The task runs independently and we don't need to await it here
task = asyncio.create_task(push_event())
# Add done callback to log any exceptions that occur in the background task
task.add_done_callback(lambda t: t.exception() if not t.cancelled() else None)
return {
"status": "success",
"message": "Wix form data received successfully",
"received_keys": list(data.keys()),
"timestamp": timestamp,
"note": "No authentication required for this endpoint",
}
async def process_generic_webhook_submission(
request: Request, data: dict[str, Any], db
):
"""Process generic webhook submissions with nested structure.
Expected structure:
{
"hotel_data": {"hotelname": "...", "hotelcode": "..."},
"form_data": {
"sprache": "de/it/en",
"anreise": "DD.MM.YYYY",
"abreise": "DD.MM.YYYY",
"erwachsene": "N",
"kinder": "N",
"alter": {"1": "age1", "2": "age2", ...},
"anrede": "...",
"name": "...",
"nachname": "...",
"mail": "...",
"tel": "...",
"nachricht": "..."
},
"tracking_data": {
"utm_source": "...",
"utm_medium": "...",
"utm_campaign": "...",
"utm_content": "...",
"utm_term": "...",
"fbclid": "...",
"gclid": "..."
},
"timestamp": "ISO8601"
}
"""
timestamp = datetime.now().isoformat()
_LOGGER.info("Processing generic webhook submission at %s", timestamp)
# Extract nested data
hotel_data = data.get("hotel_data", {})
form_data = data.get("form_data", {})
tracking_data = data.get("tracking_data", {})
offer_data = data.get("unterkunftTyp", {})
selected_offers = []
if offer_data:
# grab keys and values. If value is "on" add the key not the value to a list of selected offers
offer_data: dict[str, str]
for key, value in offer_data.items():
if value == "on":
selected_offers.append(key)
selected_offers_str = ", ".join(selected_offers) if selected_offers else None
# Extract hotel information
hotel_code = hotel_data.get("hotelcode")
hotel_name = hotel_data.get("hotelname")
if not hotel_code:
_LOGGER.warning("No hotel_code provided in webhook data, using default")
hotel_code = request.app.state.config.get("default_hotel_code", "123")
if not hotel_name:
hotel_name = (
request.app.state.config.get("default_hotel_name") or "Frangart Inn"
)
# Extract customer information
first_name = form_data.get("name")
last_name = form_data.get("nachname")
email = form_data.get("mail")
phone_number = form_data.get("tel")
name_prefix = form_data.get("anrede")
language = form_data.get("sprache", "de")[:2]
user_comment = form_data.get("nachricht", "")
plz = form_data.get("plz", "")
city = form_data.get("stadt", "")
country = form_data.get("land", "")
# Parse dates - handle DD.MM.YYYY format
start_date_str = form_data.get("anreise")
end_date_str = form_data.get("abreise")
if not start_date_str or not end_date_str:
raise HTTPException(
status_code=400, detail="Missing required dates (anreise/abreise)"
)
try:
# Parse DD.MM.YYYY format using strptime
start_date = datetime.strptime(start_date_str, "%d.%m.%Y").date()
end_date = datetime.strptime(end_date_str, "%d.%m.%Y").date()
except ValueError as e:
_LOGGER.error(
"Error parsing dates: start=%s, end=%s, error=%s",
start_date_str,
end_date_str,
e,
)
raise HTTPException(status_code=400, detail=f"Invalid date format: {e}") from e
# Extract room/guest info
num_adults = int(form_data.get("erwachsene", 2))
num_children = int(form_data.get("kinder", 0))
# Extract children ages from nested structure
children_ages = []
if num_children > 0:
alter_data = form_data.get("alter", {})
for i in range(1, num_children + 1):
age_str = alter_data.get(str(i))
if age_str:
try:
children_ages.append(int(age_str))
except ValueError:
_LOGGER.warning("Invalid age value for child %d: %s", i, age_str)
# Extract tracking information
utm_source = None
utm_medium = None
utm_campaign = None
utm_term = None
utm_content = None
fbclid = None
gclid = None
if tracking_data:
utm_source = tracking_data.get("utm_source")
utm_medium = tracking_data.get("utm_medium")
utm_campaign = tracking_data.get("utm_campaign")
utm_term = tracking_data.get("utm_term")
utm_content = tracking_data.get("utm_content")
fbclid = tracking_data.get("fbclid")
gclid = tracking_data.get("gclid")
# Parse submission timestamp
submission_time = data.get("timestamp")
try:
if submission_time:
# Handle ISO8601 format with timezone
if submission_time.endswith("Z"):
submission_time = datetime.fromisoformat(submission_time[:-1])
elif "+" in submission_time:
# Remove timezone info (e.g., +02:00)
submission_time = datetime.fromisoformat(submission_time.split("+")[0])
else:
submission_time = datetime.fromisoformat(submission_time)
except Exception as e:
_LOGGER.exception("Error parsing submission timestamp: %s", e)
submission_time = None
# Generate unique ID
unique_id = generate_unique_id()
# Use CustomerService to handle customer creation/update with hashing
customer_service = CustomerService(db)
customer_data = {
"given_name": first_name,
"surname": last_name,
"contact_id": None,
"name_prefix": name_prefix if name_prefix != "--" else None,
"email_address": email,
"phone": phone_number if phone_number else None,
"email_newsletter": False,
"address_line": None,
"city_name": city if city else None,
"postal_code": plz if plz else None,
"country_code": country if country else None,
"gender": None,
"birth_date": None,
"language": language,
"address_catalog": False,
"name_title": None,
}
# Create/update customer
db_customer = await customer_service.get_or_create_customer(customer_data)
# Get advertising account IDs conditionally based on fbclid/gclid presence
meta_account_id, google_account_id = get_advertising_account_ids(
request.app.state.config, hotel_code, fbclid, gclid
)
# Create reservation
reservation_kwargs = {
"unique_id": unique_id,
"start_date": start_date,
"end_date": end_date,
"num_adults": num_adults,
"num_children": num_children,
"children_ages": children_ages,
"hotel_code": hotel_code,
"hotel_name": hotel_name,
"offer": selected_offers_str,
"utm_source": utm_source,
"utm_medium": utm_medium,
"utm_campaign": utm_campaign,
"utm_term": utm_term,
"utm_content": utm_content,
"user_comment": user_comment,
"fbclid": fbclid,
"gclid": gclid,
"meta_account_id": meta_account_id,
"google_account_id": google_account_id,
}
# Only include created_at if we have a valid submission_time
if submission_time:
reservation_kwargs["created_at"] = submission_time
reservation = ReservationData(**reservation_kwargs)
if reservation.md5_unique_id is None:
raise HTTPException(status_code=400, detail="Failed to generate md5_unique_id")
# Use ReservationService to create reservation
reservation_service = ReservationService(db)
db_reservation = await reservation_service.create_reservation(
reservation, db_customer.id
)
async def push_event():
# Fire event for listeners (push, etc.) - hotel-specific dispatch
dispatcher = getattr(request.app.state, "event_dispatcher", None)
if dispatcher:
# Get hotel_code from reservation to target the right listeners
hotel_code = getattr(db_reservation, "hotel_code", None)
if hotel_code and hotel_code.strip():
await dispatcher.dispatch_for_hotel(
"form_processed", hotel_code, db_customer, db_reservation
)
_LOGGER.info("Dispatched form_processed event for hotel %s", hotel_code)
else:
_LOGGER.warning(
"No hotel_code in reservation, skipping push notifications"
)
# Create task and store reference to prevent garbage collection
task = asyncio.create_task(push_event())
# Add done callback to log any exceptions
task.add_done_callback(lambda t: t.exception() if not t.cancelled() else None)
_LOGGER.info(
"Successfully processed generic webhook: customer_id=%s, reservation_id=%s",
db_customer.id,
db_reservation.id,
)
return {
"status": "success",
"message": "Generic webhook data processed successfully",
"customer_id": db_customer.id,
"reservation_id": db_reservation.id,
"timestamp": timestamp,
}
async def validate_basic_auth(
credentials: HTTPBasicCredentials = Depends(security_basic),
) -> str:
"""Validate basic authentication for AlpineBits protocol.
Returns username if valid, raises HTTPException if not.
"""
# Accept any username/password pair present in config['alpine_bits_auth']
if not credentials.username or not credentials.password:
raise HTTPException(
status_code=401,
detail="ERROR: Authentication required",
headers={"WWW-Authenticate": "Basic"},
)
valid = False
config = app.state.config
for entry in config["alpine_bits_auth"]:
if (
credentials.username == entry["username"]
and credentials.password == entry["password"]
):
valid = True
break
if not valid:
raise HTTPException(
status_code=401,
detail="ERROR: Invalid credentials",
headers={"WWW-Authenticate": "Basic"},
)
_LOGGER.info(
"AlpineBits authentication successful for user: %s (from config)",
credentials.username,
)
return credentials.username, credentials.password
@api_router.post("/webhook/wix-form")
@webhook_limiter.limit(WEBHOOK_RATE_LIMIT)
async def handle_wix_form(
request: Request, data: dict[str, Any], db_session=Depends(get_async_session)
):
"""Unified endpoint to handle Wix form submissions (test and production).
No authentication required for this endpoint.
"""
try:
return await process_wix_form_submission(request, data, db_session)
except IntegrityError as e:
# Handle duplicate submissions gracefully - likely same form sent twice
# or race condition between workers
if "unique constraint" in str(e).lower() and "unique_id" in str(e).lower():
_LOGGER.warning(
"Duplicate submission detected (unique_id already exists). "
"Returning success to prevent retry. Error: %s",
str(e),
)
# Return success since the reservation already exists
return {"status": "success", "message": "Reservation already processed"}
# Re-raise if it's a different integrity error
raise
except Exception as e:
_LOGGER.exception("Error in handle_wix_form")
log_entry = {
"timestamp": datetime.now().isoformat(),
"client_ip": request.client.host if request.client else "unknown",
"headers": dict(request.headers),
"data": data,
"error": str(e),
}
# Use asyncio to run file I/O in thread pool to avoid blocking
logs_dir = Path("logs/errors")
await asyncio.to_thread(logs_dir.mkdir, parents=True, mode=0o755, exist_ok=True)
log_filename = (
logs_dir / f"wix_error_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
)
await asyncio.to_thread(
log_filename.write_text,
json.dumps(log_entry, indent=2, default=str, ensure_ascii=False),
encoding="utf-8",
)
_LOGGER.error("Error data logged to: %s", log_filename)
raise HTTPException(status_code=500, detail="Error processing Wix form data")
@api_router.post("/webhook/wix-form/test")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def handle_wix_form_test(
request: Request, data: dict[str, Any], db_session=Depends(get_async_session)
):
"""Test endpoint to verify the API is working with raw JSON data.
No authentication required for testing purposes.
"""
try:
return await process_wix_form_submission(request, data, db_session)
except Exception as e:
_LOGGER.exception("Error in handle_wix_form_test: %s", e)
# Log error data to file asynchronously
import traceback
log_entry = {
"timestamp": datetime.now().isoformat(),
"client_ip": request.client.host if request.client else "unknown",
"headers": dict(request.headers),
"data": data,
"error": str(e),
"traceback": traceback.format_exc(),
}
# Use asyncio to run file I/O in thread pool to avoid blocking
logs_dir = Path("logs/errors")
await asyncio.to_thread(logs_dir.mkdir, parents=True, mode=0o755, exist_ok=True)
log_filename = (
logs_dir / f"wix_test_error_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
)
await asyncio.to_thread(
log_filename.write_text,
json.dumps(log_entry, indent=2, default=str, ensure_ascii=False),
encoding="utf-8",
)
_LOGGER.error("Error data logged to: %s", log_filename)
raise HTTPException(status_code=500, detail="Error processing test data")
async def _process_csv_import_background(
csv_content: str,
filename: str,
hotel_code: str,
session_maker: SessionMaker,
config: dict[str, Any],
log_filename: Path,
):
"""Background task to process CSV import with automatic acknowledgement.
This runs in a separate asyncio task after the HTTP response is sent.
Handles both file saving and database processing.
All imported reservations are automatically acknowledged using the username
associated with the hotel_code from the config.
Args:
csv_content: CSV content as string
filename: Original filename
hotel_code: Hotel code (mandatory) - used to get username for acknowledgements
session_maker: SessionMaker for creating database sessions
config: Application configuration
log_filename: Path to save the CSV file
"""
try:
# First, save the CSV file (in background)
await asyncio.to_thread(log_filename.write_text, csv_content, encoding="utf-8")
_LOGGER.debug("CSV file saved to %s", log_filename)
# Now process the CSV import
_LOGGER.info("Starting database processing of %s", filename)
# Get username for acknowledgements from config
username = get_username_for_hotel(config, hotel_code)
# Create a new session for this background task
db_session = await session_maker.create_session()
try:
importer = CSVImporter(db_session, config)
# Import with pre-acknowledgement enabled
stats = await importer.import_csv_file(
str(log_filename),
hotel_code,
dryrun=False,
pre_acknowledge=True,
client_id=hotel_code,
username=username
)
_LOGGER.info(
"CSV import complete for %s: %s", filename, stats
)
finally:
await db_session.close()
except Exception:
_LOGGER.exception(
"Error processing CSV import in background for %s", filename
)
@api_router.put("/admin/import-csv/{hotel_code}/{filename:path}")
@limiter.limit(BURST_RATE_LIMIT)
async def import_csv_endpoint(
request: Request,
background_tasks: BackgroundTasks,
hotel_code: str,
filename: str,
credentials: tuple = Depends(validate_basic_auth),
db_session=Depends(get_async_session),
session_maker: SessionMaker = Depends(get_session_maker),
):
"""Import reservations from CSV data sent via PUT request.
This endpoint allows importing historical form data into the system.
It creates customers and reservations, avoiding duplicates based on:
- Name, email, reservation dates
- fbclid/gclid tracking IDs
Returns immediately with 202 Accepted while processing continues in background.
All imported reservations are automatically acknowledged using the username
associated with the hotel_code in the config.
Requires basic authentication and saves CSV files to log directory.
Supports gzip compression via Content-Encoding header.
Args:
hotel_code: Hotel code (mandatory) - used to get username for acknowledgements
filename: Name for the CSV file (used for logging)
credentials: Basic auth credentials
Example: PUT /api/admin/import-csv/39054_001/reservations.csv
"""
try:
# Validate filename to prevent path traversal
if ".." in filename or filename.startswith("/"):
raise HTTPException(status_code=400, detail="ERROR: Invalid filename")
# Get the raw body content
body = await request.body()
if not body:
raise HTTPException(
status_code=400, detail="ERROR: No CSV content provided"
)
# Check if content is gzip compressed
content_encoding = request.headers.get("content-encoding", "").lower()
is_gzipped = content_encoding == "gzip"
# Decompress if gzipped
if is_gzipped:
try:
body = gzip.decompress(body)
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"ERROR: Failed to decompress gzip content: {e}",
) from e
# Try to decode as UTF-8
try:
csv_content = body.decode("utf-8")
except UnicodeDecodeError:
# If UTF-8 fails, try with latin-1 as fallback
csv_content = body.decode("latin-1")
# Basic validation that it looks like CSV
if not csv_content.strip():
raise HTTPException(
status_code=400, detail="ERROR: CSV content is empty"
)
# Create logs directory for CSV imports (blocking, but fast)
logs_dir = Path("logs/csv_imports")
logs_dir.mkdir(parents=True, mode=0o755, exist_ok=True)
# Generate filename with timestamp and authenticated user
username, _ = credentials
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
base_filename = Path(filename).stem
extension = Path(filename).suffix or ".csv"
log_filename = logs_dir / f"{base_filename}_{username}_{timestamp}{extension}"
_LOGGER.info(
"CSV file queued for processing: %s by user %s (original: %s)",
log_filename,
username,
filename,
)
# Schedule background processing using FastAPI's BackgroundTasks
# This handles both file saving AND database processing
# This ensures the response is sent immediately
background_tasks.add_task(
_process_csv_import_background,
csv_content,
filename,
hotel_code,
session_maker,
request.app.state.config,
log_filename,
)
response_headers = {
"Content-Type": "application/json; charset=utf-8",
}
# Return immediate acknowledgment
return Response(
content=json.dumps({
"status": "accepted",
"message": "CSV file received and queued for processing",
"filename": filename,
"timestamp": datetime.now().isoformat(),
}),
headers=response_headers,
status_code=202,
)
except HTTPException:
raise
except Exception:
_LOGGER.exception("Error in import_csv_endpoint")
raise HTTPException(status_code=500, detail="Error processing CSV upload")
@api_router.post("/webhook/generic")
@webhook_limiter.limit(WEBHOOK_RATE_LIMIT)
async def handle_generic_webhook(
request: Request, db_session=Depends(get_async_session)
):
"""Handle generic webhook endpoint for receiving JSON payloads.
Supports gzip compression, extracts customer and reservation data,
saves to database, and triggers push notifications.
No authentication required for this endpoint.
"""
# Store data for error logging if needed
data = None
try:
timestamp = datetime.now().isoformat()
_LOGGER.info("Received generic webhook data at %s", timestamp)
# Get the raw body content
body = await request.body()
if not body:
raise HTTPException(status_code=400, detail="ERROR: No content provided")
# Check if content is gzip compressed
content_encoding = request.headers.get("content-encoding", "").lower()
is_gzipped = content_encoding == "gzip"
# Decompress if gzipped
if is_gzipped:
try:
body = gzip.decompress(body)
_LOGGER.info("Successfully decompressed gzip content")
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"ERROR: Failed to decompress gzip content: {e}",
) from e
# Parse JSON
try:
data = json.loads(body.decode("utf-8"))
except json.JSONDecodeError as e:
raise HTTPException(
status_code=400,
detail=f"ERROR: Invalid JSON content: {e}",
) from e
if True:
# log to file for now
logs_dir = Path("logs/generic_webhooks")
await asyncio.to_thread(
logs_dir.mkdir, parents=True, mode=0o755, exist_ok=True
)
log_filename = (
logs_dir
/ f"generic_webhook_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
)
await asyncio.to_thread(
log_filename.write_text,
json.dumps(data, indent=2, default=str, ensure_ascii=False),
encoding="utf-8",
)
# Process the webhook data and save to database
await process_generic_webhook_submission(request, data, db_session)
return {
"status": "success",
"message": "Generic webhook data received and processed successfully",
"timestamp": timestamp,
}
except HTTPException:
raise
except Exception as e:
_LOGGER.exception("Error in handle_generic_webhook")
# Log error data to file asynchronously (only on error)
error_log_entry = {
"timestamp": datetime.now().isoformat(),
"client_ip": request.client.host if request.client else "unknown",
"headers": dict(request.headers),
"data": data, # Include the parsed data if available
"error": str(e),
"traceback": traceback.format_exc(),
}
# Use asyncio to run file I/O in thread pool to avoid blocking
error_logs_dir = Path("logs/errors")
await asyncio.to_thread(
error_logs_dir.mkdir, parents=True, mode=0o755, exist_ok=True
)
error_log_filename = error_logs_dir / (
f"generic_webhook_error_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
)
await asyncio.to_thread(
error_log_filename.write_text,
json.dumps(error_log_entry, indent=2, default=str, ensure_ascii=False),
encoding="utf-8",
)
_LOGGER.error("Error data logged to: %s", error_log_filename)
raise HTTPException(
status_code=500, detail="Error processing generic webhook data"
) from e
async def _process_conversion_xml_background(
xml_content: str,
filename: str,
session_maker: SessionMaker,
log_filename: Path,
):
"""Background task to process conversion XML.
This runs in a separate asyncio task after the HTTP response is sent.
Handles both file prettification and database processing.
"""
try:
# First, prettify and save the XML file (in background)
try:
dom = xml.dom.minidom.parseString(xml_content)
pretty_xml = dom.toprettyxml(indent=" ")
# Remove extra blank lines that toprettyxml adds
pretty_xml = "\n".join(
[line for line in pretty_xml.split("\n") if line.strip()]
)
await asyncio.to_thread(
log_filename.write_text, pretty_xml, encoding="utf-8"
)
_LOGGER.debug("XML file prettified and saved to %s", log_filename)
except Exception as e:
# If formatting fails, save the original content
_LOGGER.warning("Failed to format XML: %s. Saving unformatted.", str(e))
await asyncio.to_thread(
log_filename.write_text, xml_content, encoding="utf-8"
)
# Now process the conversion XML
_LOGGER.info("Starting database processing of %s", filename)
conversion_service = ConversionService(session_maker)
processing_stats = await conversion_service.process_conversion_xml(xml_content)
_LOGGER.info(
"Conversion processing complete for %s: %s", filename, processing_stats
)
except Exception:
_LOGGER.exception(
"Error processing conversion XML in background for %s", filename
)
@api_router.put("/hoteldata/conversions_import/{filename:path}")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def handle_xml_upload(
request: Request,
background_tasks: BackgroundTasks,
filename: str,
credentials_tupel: tuple = Depends(validate_basic_auth),
db_session=Depends(get_async_session),
session_maker: SessionMaker = Depends(get_session_maker),
):
"""Endpoint for receiving XML files for conversion processing via PUT.
Processes conversion data from hotel PMS:
- Parses reservation and daily sales XML data
- Matches to existing reservations using truncated tracking IDs (fbclid/gclid)
- Links conversions to customers and hashed_customers
- Stores daily sales revenue data
Returns immediately with 202 Accepted while processing continues in background.
Requires basic authentication and saves XML files to log directory.
Supports gzip compression via Content-Encoding header.
Example: PUT /api/hoteldata/conversions_import/Reservierungen.xml
"""
try:
# Validate filename to prevent path traversal
if ".." in filename or filename.startswith("/"):
raise HTTPException(status_code=400, detail="ERROR: Invalid filename")
# Get the raw body content
body = await request.body()
if not body:
raise HTTPException(
status_code=400, detail="ERROR: No XML content provided"
)
# Check if content is gzip compressed
content_encoding = request.headers.get("content-encoding", "").lower()
is_gzipped = content_encoding == "gzip"
# Decompress if gzipped
if is_gzipped:
try:
body = gzip.decompress(body)
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"ERROR: Failed to decompress gzip content: {e}",
) from e
# Try to decode as UTF-8
try:
xml_content = body.decode("utf-8")
except UnicodeDecodeError:
# If UTF-8 fails, try with latin-1 as fallback
xml_content = body.decode("latin-1")
# Basic validation that it's XML-like
if not xml_content.strip().startswith("<"):
raise HTTPException(
status_code=400, detail="ERROR: Content does not appear to be XML"
)
# Create logs directory for XML conversions (blocking, but fast)
logs_dir = Path("logs/conversions_import")
logs_dir.mkdir(parents=True, mode=0o755, exist_ok=True)
# Generate filename with timestamp and authenticated user
username, _ = credentials_tupel
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
base_filename = Path(filename).stem
extension = Path(filename).suffix or ".xml"
log_filename = logs_dir / f"{base_filename}_{username}_{timestamp}{extension}"
_LOGGER.info(
"XML file queued for processing: %s by user %s (original: %s)",
log_filename,
username,
filename,
)
# Schedule background processing using FastAPI's BackgroundTasks
# This handles both file prettification/saving AND database processing
# This ensures the response is sent immediately
background_tasks.add_task(
_process_conversion_xml_background,
xml_content,
filename,
session_maker,
log_filename,
)
response_headers = {
"Content-Type": "application/xml; charset=utf-8",
"X-AlpineBits-Server-Accept-Encoding": "gzip",
}
# Return immediate acknowledgment
response_content = f"""<?xml version="1.0" encoding="UTF-8"?>
<response>
<status>accepted</status>
<message>XML file received and queued for processing</message>
<filename>{filename}</filename>
<timestamp>{datetime.now().isoformat()}</timestamp>
</response>"""
return Response(
content=response_content, headers=response_headers, status_code=202
)
except HTTPException:
raise
except Exception:
_LOGGER.exception("Error in handle_xml_upload")
raise HTTPException(status_code=500, detail="Error processing XML upload")
# TODO Bit sketchy. May need requests-toolkit in the future
def parse_multipart_data(content_type: str, body: bytes) -> dict[str, Any]:
"""Parse multipart/form-data from raw request body.
This is a simplified parser for the AlpineBits use case.
"""
if "multipart/form-data" not in content_type:
raise HTTPException(
status_code=400, detail="ERROR: Content-Type must be multipart/form-data"
)
# Extract boundary
boundary = None
for part in content_type.split(";"):
part = part.strip()
if part.startswith("boundary="):
boundary = part.split("=", 1)[1].strip('"')
break
if not boundary:
raise HTTPException(
status_code=400, detail="ERROR: Missing boundary in multipart/form-data"
)
# Simple multipart parsing
parts = body.split(f"--{boundary}".encode())
data = {}
for part in parts:
if not part.strip() or part.strip() == b"--":
continue
# Split headers and content
if b"\r\n\r\n" in part:
headers_section, content = part.split(b"\r\n\r\n", 1)
content = content.rstrip(b"\r\n")
# Parse Content-Disposition header
headers = headers_section.decode("utf-8", errors="ignore")
name = None
for line in headers.split("\n"):
if "Content-Disposition" in line and "name=" in line:
# Extract name parameter
for param in line.split(";"):
param = param.strip()
if param.startswith("name="):
name = param.split("=", 1)[1].strip('"')
break
if name:
# Handle file uploads or text content
if content.startswith(b"<"):
# Likely XML content
data[name] = content.decode("utf-8", errors="ignore")
else:
data[name] = content.decode("utf-8", errors="ignore")
return data
@api_router.post("/alpinebits/server-2024-10")
@limiter.limit("60/minute")
async def alpinebits_server_handshake(
request: Request,
credentials_tupel: tuple = Depends(validate_basic_auth),
dbsession=Depends(get_async_session),
):
"""AlpineBits server endpoint implementing the handshake protocol.
This endpoint handles:
- Protocol version negotiation via X-AlpineBits-ClientProtocolVersion header
- Client identification via X-AlpineBits-ClientID header (optional)
- Multipart/form-data parsing for action and request parameters
- Gzip compression support
- Proper error handling with HTTP status codes
- Handshaking action processing
Authentication: HTTP Basic Auth required
Content-Type: multipart/form-data
Compression: gzip supported (check X-AlpineBits-Server-Accept-Encoding)
"""
try:
# Check required headers
client_protocol_version = request.headers.get(
"X-AlpineBits-ClientProtocolVersion"
)
if not client_protocol_version:
# Server concludes client speaks a protocol version preceding 2013-04
client_protocol_version = "pre-2013-04"
_LOGGER.info(
"No X-AlpineBits-ClientProtocolVersion header found, assuming pre-2013-04"
)
else:
_LOGGER.info("Client protocol version: %s", client_protocol_version)
# Optional client ID
client_id = request.headers.get("X-AlpineBits-ClientID")
if client_id:
_LOGGER.info("Client ID: %s", client_id)
# Check content encoding
content_encoding = request.headers.get("Content-Encoding")
is_compressed = content_encoding == "gzip"
if is_compressed:
_LOGGER.info("Request is gzip compressed")
# Get content type before processing
content_type = request.headers.get("Content-Type", "")
_LOGGER.info("Content-Type: %s", content_type)
_LOGGER.info("Content-Encoding: %s", content_encoding)
# Get request body
body = await request.body()
# Decompress if needed
form_data = validate_alpinebits_body(is_compressed, content_type, body)
# Check for required action parameter
action = form_data.get("action")
if not action:
raise HTTPException(
status_code=400, detail="ERROR: Missing required 'action' parameter"
)
_LOGGER.info("AlpineBits action: %s", action)
# Get optional request XML
request_xml = form_data.get("request")
server: AlpineBitsServer = app.state.alpine_bits_server
version = Version.V2024_10
username, password = credentials_tupel
client_info = AlpineBitsClientInfo(
username=username, password=password, client_id=client_id
)
# Create successful handshake response
response = await server.handle_request(
action,
request_xml,
client_info=client_info,
version=version,
dbsession=dbsession,
)
response_xml = response.xml_content
# Set response headers indicating server capabilities
headers = {
"Content-Type": "application/xml; charset=utf-8",
"X-AlpineBits-Server-Accept-Encoding": "gzip", # Indicate gzip support
"X-AlpineBits-Server-Version": "2024-10",
}
if is_compressed:
# Compress response if client sent compressed request
response_xml = gzip.compress(response_xml.encode("utf-8"))
headers["Content-Encoding"] = "gzip"
return Response(
content=response_xml, status_code=response.status_code, headers=headers
)
except HTTPException:
# Re-raise HTTP exceptions (auth errors, etc.)
raise
except Exception as e:
_LOGGER.exception("Error in AlpineBits handshake: %s", e)
raise HTTPException(status_code=500, detail="Internal server error")
def validate_alpinebits_body(is_compressed, content_type, body):
"""Check if the body conforms to AlpineBits expectations."""
if is_compressed:
try:
body = gzip.decompress(body)
except Exception:
raise HTTPException(
status_code=400,
detail="ERROR: Failed to decompress gzip content",
)
# Check content type (after decompression)
if (
"multipart/form-data" not in content_type
and "application/x-www-form-urlencoded" not in content_type
):
raise HTTPException(
status_code=400,
detail="ERROR: Content-Type must be multipart/form-data or application/x-www-form-urlencoded",
)
# Parse multipart data
if "multipart/form-data" in content_type:
try:
form_data = parse_multipart_data(content_type, body)
except Exception:
raise HTTPException(
status_code=400,
detail="ERROR: Failed to parse multipart/form-data",
)
elif "application/x-www-form-urlencoded" in content_type:
# Parse as urlencoded
form_data = dict(urllib.parse.parse_qsl(body.decode("utf-8")))
else:
raise HTTPException(
status_code=400,
detail="ERROR: Content-Type must be multipart/form-data or application/x-www-form-urlencoded",
)
return form_data
@api_router.get("/admin/stats")
@limiter.limit("10/minute")
async def get_api_stats(request: Request, admin_key: str = Depends(validate_api_key)):
"""Admin endpoint to get API usage statistics.
Requires admin API key.
"""
if admin_key != "admin-key":
raise HTTPException(status_code=403, detail="Admin access required")
# In a real application, you'd fetch this from your database/monitoring system
return {
"status": "success",
"stats": {
"uptime": "Available in production deployment",
"total_requests": "Available with monitoring setup",
"active_api_keys": len([k for k in ["wix-webhook-key", "admin-key"] if k]),
"rate_limit_backend": "redis" if os.getenv("REDIS_URL") else "memory",
},
"timestamp": datetime.now().isoformat(),
}
# Include the API router in the main app
app.include_router(api_router)
@app.get("/", response_class=HTMLResponse)
async def landing_page():
"""Serve the under construction landing page at the root route."""
try:
# Get the path to the HTML file
html_path = os.path.join(os.path.dirname(__file__), "templates", "index.html")
with open(html_path, encoding="utf-8") as f:
html_content = f.read()
return HTMLResponse(content=html_content, status_code=200)
except FileNotFoundError:
# Fallback if HTML file is not found
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>99tales - Under Construction</title>
<style>
body { font-family: Arial, sans-serif; text-align: center; padding: 50px; }
h1 { color: #333; }
</style>
</head>
<body>
<h1>🏗️ 99tales</h1>
<h2>Under Construction</h2>
<p>We're working hard to bring you something amazing!</p>
<p><a href="/api">API Documentation</a></p>
</body>
</html>
"""
return HTMLResponse(content=html_content, status_code=200)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)