1047 lines
35 KiB
Python
1047 lines
35 KiB
Python
import asyncio
|
|
import gzip
|
|
import json
|
|
import os
|
|
import urllib.parse
|
|
from collections import defaultdict
|
|
from datetime import date, datetime
|
|
from functools import partial
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import httpx
|
|
from fastapi import APIRouter, Depends, FastAPI, HTTPException, Request
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import HTMLResponse, Response
|
|
from fastapi.security import HTTPBasic, HTTPBasicCredentials
|
|
from slowapi.errors import RateLimitExceeded
|
|
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
|
|
|
|
from alpine_bits_python.schemas import ReservationData
|
|
|
|
from .alpinebits_server import (
|
|
AlpineBitsActionName,
|
|
AlpineBitsClientInfo,
|
|
AlpineBitsServer,
|
|
Version,
|
|
)
|
|
from .auth import generate_unique_id, validate_api_key
|
|
from .config_loader import load_config
|
|
from .customer_service import CustomerService
|
|
from .db import Base, get_database_url
|
|
from .db import Customer as DBCustomer
|
|
from .db import Reservation as DBReservation
|
|
from .logging_config import get_logger, setup_logging
|
|
from .rate_limit import (
|
|
BURST_RATE_LIMIT,
|
|
DEFAULT_RATE_LIMIT,
|
|
WEBHOOK_RATE_LIMIT,
|
|
custom_rate_limit_handler,
|
|
limiter,
|
|
webhook_limiter,
|
|
)
|
|
|
|
# Configure logging - will be reconfigured during lifespan with actual config
|
|
_LOGGER = get_logger(__name__)
|
|
|
|
# HTTP Basic auth for AlpineBits
|
|
security_basic = HTTPBasic()
|
|
|
|
|
|
# --- Enhanced event dispatcher with hotel-specific routing ---
|
|
class EventDispatcher:
|
|
def __init__(self):
|
|
self.listeners = defaultdict(list)
|
|
self.hotel_listeners = defaultdict(list) # hotel_code -> list of listeners
|
|
|
|
def register(self, event_name, func):
|
|
self.listeners[event_name].append(func)
|
|
|
|
def register_hotel_listener(self, event_name, hotel_code, func):
|
|
"""Register a listener for a specific hotel."""
|
|
self.hotel_listeners[f"{event_name}:{hotel_code}"].append(func)
|
|
|
|
async def dispatch(self, event_name, *args, **kwargs):
|
|
for func in self.listeners[event_name]:
|
|
await func(*args, **kwargs)
|
|
|
|
async def dispatch_for_hotel(self, event_name, hotel_code, *args, **kwargs):
|
|
"""Dispatch event only to listeners registered for specific hotel."""
|
|
key = f"{event_name}:{hotel_code}"
|
|
for func in self.hotel_listeners[key]:
|
|
await func(*args, **kwargs)
|
|
|
|
|
|
event_dispatcher = EventDispatcher()
|
|
|
|
# Load config at startup
|
|
|
|
|
|
async def push_listener(customer: DBCustomer, reservation: DBReservation, hotel):
|
|
"""Push listener that sends reservation data to hotel's push endpoint.
|
|
|
|
Only called for reservations that match this hotel's hotel_id.
|
|
"""
|
|
push_endpoint = hotel.get("push_endpoint")
|
|
if not push_endpoint:
|
|
_LOGGER.warning(
|
|
"No push endpoint configured for hotel %s", hotel.get("hotel_id")
|
|
)
|
|
return
|
|
|
|
server: AlpineBitsServer = app.state.alpine_bits_server
|
|
hotel_id = hotel["hotel_id"]
|
|
reservation_hotel_id = reservation.hotel_code
|
|
|
|
# Double-check hotel matching (should be guaranteed by dispatcher)
|
|
if hotel_id != reservation_hotel_id:
|
|
_LOGGER.warning(
|
|
"Hotel ID mismatch: listener for %s, reservation for %s",
|
|
hotel_id,
|
|
reservation_hotel_id,
|
|
)
|
|
return
|
|
|
|
_LOGGER.info(
|
|
"Processing push notification for hotel %s, reservation %s",
|
|
hotel_id,
|
|
reservation.unique_id,
|
|
)
|
|
|
|
# Prepare payload for push notification
|
|
|
|
request = await server.handle_request(
|
|
request_action_name=AlpineBitsActionName.OTA_HOTEL_RES_NOTIF_GUEST_REQUESTS.request_name,
|
|
request_xml=(reservation, customer),
|
|
client_info=None,
|
|
version=Version.V2024_10,
|
|
)
|
|
|
|
if request.status_code != 200:
|
|
_LOGGER.error(
|
|
"Failed to generate push request for hotel %s, reservation %s: %s",
|
|
hotel_id,
|
|
reservation.unique_id,
|
|
request.xml_content,
|
|
)
|
|
return
|
|
|
|
# save push request to file
|
|
|
|
logs_dir = Path("logs/push_requests")
|
|
if not logs_dir.exists():
|
|
logs_dir.mkdir(parents=True, mode=0o755, exist_ok=True)
|
|
stat_info = os.stat(logs_dir)
|
|
_LOGGER.info(
|
|
"Created directory owner: uid:%s, gid:%s",
|
|
stat_info.st_uid,
|
|
stat_info.st_gid
|
|
)
|
|
_LOGGER.info("Directory mode: %s", oct(stat_info.st_mode)[-3:])
|
|
log_filename = f"{logs_dir}/alpinebits_push_{hotel_id}_{reservation.unique_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xml"
|
|
|
|
with open(log_filename, "w", encoding="utf-8") as f:
|
|
f.write(request.xml_content)
|
|
return
|
|
|
|
headers = (
|
|
{"Authorization": f"Bearer {push_endpoint.get('token', '')}"}
|
|
if push_endpoint.get("token")
|
|
else {}
|
|
)
|
|
""
|
|
try:
|
|
async with httpx.AsyncClient() as client:
|
|
resp = await client.post(
|
|
push_endpoint["url"], json=payload, headers=headers, timeout=10
|
|
)
|
|
_LOGGER.info(
|
|
"Push event fired to %s for hotel %s, status: %s",
|
|
push_endpoint['url'],
|
|
hotel['hotel_id'],
|
|
resp.status_code
|
|
)
|
|
|
|
if resp.status_code not in [200, 201, 202]:
|
|
_LOGGER.warning(
|
|
"Push endpoint returned non-success status %s: %s",
|
|
resp.status_code,
|
|
resp.text
|
|
)
|
|
|
|
except Exception as e:
|
|
_LOGGER.exception("Push event failed for hotel %s: %s", hotel['hotel_id'], e)
|
|
# Optionally implement retry logic here@asynccontextmanager
|
|
|
|
|
|
async def lifespan(app: FastAPI):
|
|
# Setup DB
|
|
|
|
try:
|
|
config = load_config()
|
|
except Exception:
|
|
_LOGGER.exception("Failed to load config: ")
|
|
config = {}
|
|
|
|
# Setup logging from config
|
|
setup_logging(config)
|
|
_LOGGER.info("Application startup initiated")
|
|
|
|
DATABASE_URL = get_database_url(config)
|
|
engine = create_async_engine(DATABASE_URL, echo=False)
|
|
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
|
|
|
|
app.state.engine = engine
|
|
app.state.async_sessionmaker = AsyncSessionLocal
|
|
app.state.config = config
|
|
app.state.alpine_bits_server = AlpineBitsServer(config)
|
|
app.state.event_dispatcher = event_dispatcher
|
|
|
|
# Register push listeners for hotels with push_endpoint
|
|
for hotel in config.get("alpine_bits_auth", []):
|
|
push_endpoint = hotel.get("push_endpoint")
|
|
hotel_id = hotel.get("hotel_id")
|
|
|
|
if push_endpoint and hotel_id:
|
|
# Register hotel-specific listener
|
|
event_dispatcher.register_hotel_listener(
|
|
"form_processed", hotel_id, partial(push_listener, hotel=hotel)
|
|
)
|
|
_LOGGER.info(
|
|
"Registered push listener for hotel %s with endpoint %s",
|
|
hotel_id,
|
|
push_endpoint.get("url"),
|
|
)
|
|
elif push_endpoint and not hotel_id:
|
|
_LOGGER.warning("Hotel has push_endpoint but no hotel_id: %s", hotel)
|
|
elif hotel_id and not push_endpoint:
|
|
_LOGGER.info("Hotel %s has no push_endpoint configured", hotel_id)
|
|
|
|
# Create tables
|
|
async with engine.begin() as conn:
|
|
await conn.run_sync(Base.metadata.create_all)
|
|
_LOGGER.info("Database tables checked/created at startup.")
|
|
|
|
# Hash any existing customers that don't have hashed versions yet
|
|
async with AsyncSessionLocal() as session:
|
|
customer_service = CustomerService(session)
|
|
hashed_count = await customer_service.hash_existing_customers()
|
|
if hashed_count > 0:
|
|
_LOGGER.info(
|
|
"Backfilled hashed data for %d existing customers", hashed_count
|
|
)
|
|
else:
|
|
_LOGGER.info("All existing customers already have hashed data")
|
|
|
|
yield
|
|
|
|
# Optional: Dispose engine on shutdown
|
|
await engine.dispose()
|
|
|
|
|
|
async def get_async_session(request: Request):
|
|
async_sessionmaker = request.app.state.async_sessionmaker
|
|
async with async_sessionmaker() as session:
|
|
yield session
|
|
|
|
|
|
app = FastAPI(
|
|
title="Wix Form Handler API",
|
|
description="Secure API endpoint to receive and process Wix form submissions with authentication and rate limiting",
|
|
version="1.0.0",
|
|
lifespan=lifespan,
|
|
)
|
|
|
|
# Create API router with /api prefix
|
|
api_router = APIRouter(prefix="/api", tags=["api"])
|
|
|
|
# Add rate limiting
|
|
app.state.limiter = limiter
|
|
app.add_exception_handler(RateLimitExceeded, custom_rate_limit_handler)
|
|
|
|
# Add CORS middleware to allow requests from Wix
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=[
|
|
"https://*.wix.com",
|
|
"https://*.wixstatic.com",
|
|
"http://localhost:3000", # For development
|
|
"http://localhost:8000", # For local testing
|
|
],
|
|
allow_credentials=True,
|
|
allow_methods=["GET", "POST"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
@api_router.get("/")
|
|
@limiter.limit(DEFAULT_RATE_LIMIT)
|
|
async def root(request: Request):
|
|
"""Health check endpoint."""
|
|
return {
|
|
"message": "Wix Form Handler API is running",
|
|
"timestamp": datetime.now().isoformat(),
|
|
"status": "healthy",
|
|
"authentication": "required",
|
|
"rate_limits": {
|
|
"default": DEFAULT_RATE_LIMIT,
|
|
"webhook": WEBHOOK_RATE_LIMIT,
|
|
"burst": BURST_RATE_LIMIT,
|
|
},
|
|
}
|
|
|
|
|
|
@api_router.get("/health")
|
|
@limiter.limit(DEFAULT_RATE_LIMIT)
|
|
async def health_check(request: Request):
|
|
"""Detailed health check."""
|
|
return {
|
|
"status": "healthy",
|
|
"timestamp": datetime.now().isoformat(),
|
|
"service": "wix-form-handler",
|
|
"version": "1.0.0",
|
|
"authentication": "enabled",
|
|
"rate_limiting": "enabled",
|
|
}
|
|
|
|
|
|
def create_db_reservation_from_data(
|
|
reservation_model: ReservationData, db_customer_id: int
|
|
) -> DBReservation:
|
|
"""Convert ReservationData to DBReservation, handling children_ages conversion."""
|
|
data = reservation_model.model_dump(exclude_none=True)
|
|
|
|
children_list = data.pop("children_ages", [])
|
|
children_csv = ",".join(str(int(a)) for a in children_list) if children_list else ""
|
|
data["children_ages"] = children_csv
|
|
|
|
# Inject FK
|
|
data["customer_id"] = db_customer_id
|
|
|
|
return DBReservation(**data)
|
|
|
|
|
|
# Extracted business logic for handling Wix form submissions
|
|
async def process_wix_form_submission(request: Request, data: dict[str, Any], db):
|
|
"""Shared business logic for handling Wix form submissions (test and production)."""
|
|
timestamp = datetime.now().isoformat()
|
|
|
|
_LOGGER.info("Received Wix form data at %s", timestamp)
|
|
|
|
data = data.get("data") # Handle nested "data" key if present
|
|
|
|
# save customer and reservation to DB
|
|
|
|
contact_info = data.get("contact", {})
|
|
first_name = contact_info.get("name", {}).get("first")
|
|
last_name = contact_info.get("name", {}).get("last")
|
|
email = contact_info.get("email")
|
|
phone_number = contact_info.get("phones", [{}])[0].get("e164Phone")
|
|
contact_info.get("locale", "de-de")
|
|
contact_id = contact_info.get("contactId")
|
|
|
|
name_prefix = data.get("field:anrede")
|
|
email_newsletter_string = data.get("field:form_field_5a7b", "")
|
|
yes_values = {"Selezionato", "Angekreuzt", "Checked"}
|
|
email_newsletter = email_newsletter_string in yes_values
|
|
address_line = None
|
|
city_name = None
|
|
postal_code = None
|
|
country_code = None
|
|
gender = None
|
|
birth_date = None
|
|
language = data.get("contact", {}).get("locale", "en")[:2]
|
|
|
|
# Dates
|
|
start_date = (
|
|
data.get("field:date_picker_a7c8")
|
|
or data.get("Anreisedatum")
|
|
or data.get("submissions", [{}])[1].get("value")
|
|
)
|
|
end_date = (
|
|
data.get("field:date_picker_7e65")
|
|
or data.get("Abreisedatum")
|
|
or data.get("submissions", [{}])[2].get("value")
|
|
)
|
|
|
|
# Room/guest info
|
|
num_adults = int(data.get("field:number_7cf5") or 2)
|
|
num_children = int(data.get("field:anzahl_kinder") or 0)
|
|
children_ages = []
|
|
if num_children > 0:
|
|
for k in data:
|
|
if k.startswith("field:alter_kind_"):
|
|
try:
|
|
age = int(data[k])
|
|
children_ages.append(age)
|
|
except ValueError:
|
|
_LOGGER.warning("Invalid age value for %s: %s", k, data[k])
|
|
|
|
offer = data.get("field:angebot_auswaehlen")
|
|
|
|
# get submissionId and ensure max length 35. Generate one if not present
|
|
|
|
unique_id = data.get("submissionId", generate_unique_id())
|
|
|
|
# Use CustomerService to handle customer creation/update with hashing
|
|
customer_service = CustomerService(db)
|
|
|
|
customer_data = {
|
|
"given_name": first_name,
|
|
"surname": last_name,
|
|
"contact_id": contact_id,
|
|
"name_prefix": name_prefix,
|
|
"email_address": email,
|
|
"phone": phone_number,
|
|
"email_newsletter": email_newsletter,
|
|
"address_line": address_line,
|
|
"city_name": city_name,
|
|
"postal_code": postal_code,
|
|
"country_code": country_code,
|
|
"gender": gender,
|
|
"birth_date": birth_date,
|
|
"language": language,
|
|
"address_catalog": False,
|
|
"name_title": None,
|
|
}
|
|
|
|
# This automatically creates/updates both Customer and HashedCustomer
|
|
db_customer = await customer_service.get_or_create_customer(customer_data)
|
|
|
|
# Determine hotel_code and hotel_name
|
|
# Priority: 1) Form field, 2) Configuration default, 3) Hardcoded fallback
|
|
hotel_code = data.get("field:hotelid", None)
|
|
|
|
if hotel_code is None:
|
|
_LOGGER.warning("No hotel_code provided in form data, using default")
|
|
|
|
hotel_code = request.app.state.config.get("default_hotel_code", "123")
|
|
|
|
hotel_name = (
|
|
data.get("field:hotelname")
|
|
or data.get("hotelname")
|
|
or request.app.state.config.get("default_hotel_name")
|
|
or "Frangart Inn" # fallback
|
|
)
|
|
|
|
submissionTime = data.get("submissionTime") # 2025-10-07T05:48:41.855Z
|
|
try:
|
|
if submissionTime:
|
|
submissionTime = datetime.fromisoformat(
|
|
submissionTime[:-1]
|
|
) # Remove Z and convert
|
|
except Exception as e:
|
|
_LOGGER.exception("Error parsing submissionTime: %s", e)
|
|
submissionTime = None
|
|
|
|
reservation = ReservationData(
|
|
unique_id=unique_id,
|
|
start_date=date.fromisoformat(start_date),
|
|
end_date=date.fromisoformat(end_date),
|
|
num_adults=num_adults,
|
|
num_children=num_children,
|
|
children_ages=children_ages,
|
|
hotel_code=hotel_code,
|
|
hotel_name=hotel_name,
|
|
offer=offer,
|
|
created_at=submissionTime,
|
|
utm_source=data.get("field:utm_source"),
|
|
utm_medium=data.get("field:utm_medium"),
|
|
utm_campaign=data.get("field:utm_campaign"),
|
|
utm_term=data.get("field:utm_term"),
|
|
utm_content=data.get("field:utm_content"),
|
|
user_comment=data.get("field:long_answer_3524", ""),
|
|
fbclid=data.get("field:fbclid"),
|
|
gclid=data.get("field:gclid"),
|
|
)
|
|
|
|
if reservation.md5_unique_id is None:
|
|
raise HTTPException(status_code=400, detail="Failed to generate md5_unique_id")
|
|
|
|
db_reservation = create_db_reservation_from_data(reservation, db_customer.id)
|
|
db.add(db_reservation)
|
|
await db.commit()
|
|
await db.refresh(db_reservation)
|
|
|
|
async def push_event():
|
|
# Fire event for listeners (push, etc.) - hotel-specific dispatch
|
|
dispatcher = getattr(request.app.state, "event_dispatcher", None)
|
|
if dispatcher:
|
|
# Get hotel_code from reservation to target the right listeners
|
|
hotel_code = getattr(db_reservation, "hotel_code", None)
|
|
if hotel_code and hotel_code.strip():
|
|
await dispatcher.dispatch_for_hotel(
|
|
"form_processed", hotel_code, db_customer, db_reservation
|
|
)
|
|
_LOGGER.info("Dispatched form_processed event for hotel %s", hotel_code)
|
|
else:
|
|
_LOGGER.warning(
|
|
"No hotel_code in reservation, skipping push notifications"
|
|
)
|
|
|
|
# Create task and store reference to prevent it from being garbage collected
|
|
# The task runs independently and we don't need to await it here
|
|
task = asyncio.create_task(push_event())
|
|
# Add done callback to log any exceptions that occur in the background task
|
|
task.add_done_callback(lambda t: t.exception() if not t.cancelled() else None)
|
|
|
|
return {
|
|
"status": "success",
|
|
"message": "Wix form data received successfully",
|
|
"received_keys": list(data.keys()),
|
|
"timestamp": timestamp,
|
|
"note": "No authentication required for this endpoint",
|
|
}
|
|
|
|
|
|
async def validate_basic_auth(
|
|
credentials: HTTPBasicCredentials = Depends(security_basic),
|
|
) -> str:
|
|
"""Validate basic authentication for AlpineBits protocol.
|
|
|
|
Returns username if valid, raises HTTPException if not.
|
|
"""
|
|
# Accept any username/password pair present in config['alpine_bits_auth']
|
|
if not credentials.username or not credentials.password:
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="ERROR: Authentication required",
|
|
headers={"WWW-Authenticate": "Basic"},
|
|
)
|
|
valid = False
|
|
config = app.state.config
|
|
|
|
for entry in config["alpine_bits_auth"]:
|
|
if (
|
|
credentials.username == entry["username"]
|
|
and credentials.password == entry["password"]
|
|
):
|
|
valid = True
|
|
break
|
|
if not valid:
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="ERROR: Invalid credentials",
|
|
headers={"WWW-Authenticate": "Basic"},
|
|
)
|
|
_LOGGER.info(
|
|
"AlpineBits authentication successful for user: %s (from config)",
|
|
credentials.username,
|
|
)
|
|
return credentials.username, credentials.password
|
|
|
|
|
|
@api_router.post("/webhook/wix-form")
|
|
@webhook_limiter.limit(WEBHOOK_RATE_LIMIT)
|
|
async def handle_wix_form(
|
|
request: Request, data: dict[str, Any], db_session=Depends(get_async_session)
|
|
):
|
|
"""Unified endpoint to handle Wix form submissions (test and production).
|
|
|
|
No authentication required for this endpoint.
|
|
"""
|
|
try:
|
|
return await process_wix_form_submission(request, data, db_session)
|
|
except Exception as e:
|
|
_LOGGER.exception("Error in handle_wix_form: %s", e)
|
|
|
|
# Log error data to file asynchronously
|
|
import traceback
|
|
|
|
log_entry = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"client_ip": request.client.host if request.client else "unknown",
|
|
"headers": dict(request.headers),
|
|
"data": data,
|
|
"error": str(e),
|
|
"traceback": traceback.format_exc(),
|
|
}
|
|
|
|
# Use asyncio to run file I/O in thread pool to avoid blocking
|
|
logs_dir = Path("logs/errors")
|
|
await asyncio.to_thread(logs_dir.mkdir, parents=True, mode=0o755, exist_ok=True)
|
|
|
|
log_filename = logs_dir / f"wix_error_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
|
await asyncio.to_thread(
|
|
log_filename.write_text,
|
|
json.dumps(log_entry, indent=2, default=str, ensure_ascii=False),
|
|
encoding="utf-8"
|
|
)
|
|
|
|
_LOGGER.error("Error data logged to: %s", log_filename)
|
|
raise HTTPException(status_code=500, detail="Error processing Wix form data")
|
|
|
|
|
|
@api_router.post("/webhook/wix-form/test")
|
|
@limiter.limit(DEFAULT_RATE_LIMIT)
|
|
async def handle_wix_form_test(
|
|
request: Request, data: dict[str, Any], db_session=Depends(get_async_session)
|
|
):
|
|
"""Test endpoint to verify the API is working with raw JSON data.
|
|
|
|
No authentication required for testing purposes.
|
|
"""
|
|
try:
|
|
return await process_wix_form_submission(request, data, db_session)
|
|
except Exception as e:
|
|
_LOGGER.exception("Error in handle_wix_form_test: %s", e)
|
|
|
|
# Log error data to file asynchronously
|
|
import traceback
|
|
|
|
log_entry = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"client_ip": request.client.host if request.client else "unknown",
|
|
"headers": dict(request.headers),
|
|
"data": data,
|
|
"error": str(e),
|
|
"traceback": traceback.format_exc(),
|
|
}
|
|
|
|
# Use asyncio to run file I/O in thread pool to avoid blocking
|
|
logs_dir = Path("logs/errors")
|
|
await asyncio.to_thread(logs_dir.mkdir, parents=True, mode=0o755, exist_ok=True)
|
|
|
|
log_filename = logs_dir / f"wix_test_error_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
|
await asyncio.to_thread(
|
|
log_filename.write_text,
|
|
json.dumps(log_entry, indent=2, default=str, ensure_ascii=False),
|
|
encoding="utf-8"
|
|
)
|
|
|
|
_LOGGER.error("Error data logged to: %s", log_filename)
|
|
raise HTTPException(status_code=500, detail="Error processing test data")
|
|
|
|
|
|
@api_router.post("/webhook/generic")
|
|
@webhook_limiter.limit(WEBHOOK_RATE_LIMIT)
|
|
async def handle_generic_webhook(request: Request, data: dict[str, Any]):
|
|
"""Handle generic webhook endpoint for receiving JSON payloads.
|
|
|
|
Logs the data to file for later analysis. Does not process the data
|
|
or save to database since the structure is not yet known.
|
|
|
|
No authentication required for this endpoint.
|
|
"""
|
|
try:
|
|
timestamp = datetime.now().isoformat()
|
|
_LOGGER.info("Received generic webhook data at %s", timestamp)
|
|
|
|
# Create log entry with metadata
|
|
log_entry = {
|
|
"timestamp": timestamp,
|
|
"client_ip": request.client.host if request.client else "unknown",
|
|
"headers": dict(request.headers),
|
|
"data": data,
|
|
"origin_header": request.headers.get("origin"),
|
|
}
|
|
|
|
# Create logs directory if it doesn't exist
|
|
logs_dir = Path("logs/generic_webhooks")
|
|
if not logs_dir.exists():
|
|
logs_dir.mkdir(parents=True, mode=0o755, exist_ok=True)
|
|
_LOGGER.info("Created directory: %s", logs_dir)
|
|
|
|
# Generate log filename with timestamp
|
|
log_filename = (
|
|
logs_dir / f"webhook_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
|
)
|
|
|
|
# Write log file
|
|
with log_filename.open("w", encoding="utf-8") as f:
|
|
json.dump(log_entry, f, indent=2, default=str, ensure_ascii=False)
|
|
|
|
_LOGGER.info("Generic webhook data logged to: %s", log_filename)
|
|
|
|
except Exception as e:
|
|
_LOGGER.exception("Error in handle_generic_webhook")
|
|
raise HTTPException(
|
|
status_code=500, detail="Error processing generic webhook data"
|
|
) from e
|
|
else:
|
|
return {
|
|
"status": "success",
|
|
"message": "Generic webhook data received successfully",
|
|
"data_logged_to": str(log_filename),
|
|
"timestamp": timestamp,
|
|
"note": "Data logged for later analysis",
|
|
}
|
|
|
|
|
|
@api_router.put("/hoteldata/conversions_import/{filename:path}")
|
|
@limiter.limit(DEFAULT_RATE_LIMIT)
|
|
async def handle_xml_upload(
|
|
request: Request,
|
|
filename: str,
|
|
credentials_tupel: tuple = Depends(validate_basic_auth),
|
|
):
|
|
"""Endpoint for receiving XML files for conversion processing via PUT.
|
|
|
|
Requires basic authentication and saves XML files to log directory.
|
|
Supports gzip compression via Content-Encoding header.
|
|
|
|
Example: PUT /api/hoteldata/conversions_import/Reservierungen.xml
|
|
"""
|
|
try:
|
|
# Validate filename to prevent path traversal
|
|
if ".." in filename or filename.startswith("/"):
|
|
raise HTTPException(status_code=400, detail="ERROR: Invalid filename")
|
|
|
|
# Get the raw body content
|
|
body = await request.body()
|
|
|
|
if not body:
|
|
raise HTTPException(
|
|
status_code=400, detail="ERROR: No XML content provided"
|
|
)
|
|
|
|
# Check if content is gzip compressed
|
|
content_encoding = request.headers.get("content-encoding", "").lower()
|
|
is_gzipped = content_encoding == "gzip"
|
|
|
|
# Decompress if gzipped
|
|
if is_gzipped:
|
|
try:
|
|
body = gzip.decompress(body)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"ERROR: Failed to decompress gzip content: {e}",
|
|
) from e
|
|
|
|
# Try to decode as UTF-8
|
|
try:
|
|
xml_content = body.decode("utf-8")
|
|
except UnicodeDecodeError:
|
|
# If UTF-8 fails, try with latin-1 as fallback
|
|
xml_content = body.decode("latin-1")
|
|
|
|
# Basic validation that it's XML-like
|
|
if not xml_content.strip().startswith("<"):
|
|
raise HTTPException(
|
|
status_code=400, detail="ERROR: Content does not appear to be XML"
|
|
)
|
|
|
|
# Create logs directory for XML conversions
|
|
logs_dir = Path("logs/conversions_import")
|
|
if not logs_dir.exists():
|
|
logs_dir.mkdir(parents=True, mode=0o755, exist_ok=True)
|
|
_LOGGER.info("Created directory: %s", logs_dir)
|
|
|
|
# Generate filename with timestamp and authenticated user
|
|
username, _ = credentials_tupel
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
# Use the filename from the path, but add timestamp and username for uniqueness
|
|
base_filename = Path(filename).stem
|
|
extension = Path(filename).suffix or ".xml"
|
|
log_filename = logs_dir / f"{base_filename}_{username}_{timestamp}{extension}"
|
|
|
|
# Save XML content to file
|
|
log_filename.write_text(xml_content, encoding="utf-8")
|
|
|
|
_LOGGER.info(
|
|
"XML file saved to %s by user %s (original: %s)",
|
|
log_filename,
|
|
username,
|
|
filename,
|
|
)
|
|
|
|
response_headers = {
|
|
"Content-Type": "application/xml; charset=utf-8",
|
|
"X-AlpineBits-Server-Accept-Encoding": "gzip",
|
|
}
|
|
|
|
return Response(
|
|
content="Xml received", headers=response_headers, status_code=200
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception:
|
|
_LOGGER.exception("Error in handle_xml_upload")
|
|
raise HTTPException(status_code=500, detail="Error processing XML upload")
|
|
|
|
|
|
# TODO Bit sketchy. May need requests-toolkit in the future
|
|
def parse_multipart_data(content_type: str, body: bytes) -> dict[str, Any]:
|
|
"""Parse multipart/form-data from raw request body.
|
|
This is a simplified parser for the AlpineBits use case.
|
|
"""
|
|
if "multipart/form-data" not in content_type:
|
|
raise HTTPException(
|
|
status_code=400, detail="ERROR: Content-Type must be multipart/form-data"
|
|
)
|
|
|
|
# Extract boundary
|
|
boundary = None
|
|
for part in content_type.split(";"):
|
|
part = part.strip()
|
|
if part.startswith("boundary="):
|
|
boundary = part.split("=", 1)[1].strip('"')
|
|
break
|
|
|
|
if not boundary:
|
|
raise HTTPException(
|
|
status_code=400, detail="ERROR: Missing boundary in multipart/form-data"
|
|
)
|
|
|
|
# Simple multipart parsing
|
|
parts = body.split(f"--{boundary}".encode())
|
|
data = {}
|
|
|
|
for part in parts:
|
|
if not part.strip() or part.strip() == b"--":
|
|
continue
|
|
|
|
# Split headers and content
|
|
if b"\r\n\r\n" in part:
|
|
headers_section, content = part.split(b"\r\n\r\n", 1)
|
|
content = content.rstrip(b"\r\n")
|
|
|
|
# Parse Content-Disposition header
|
|
headers = headers_section.decode("utf-8", errors="ignore")
|
|
name = None
|
|
for line in headers.split("\n"):
|
|
if "Content-Disposition" in line and "name=" in line:
|
|
# Extract name parameter
|
|
for param in line.split(";"):
|
|
param = param.strip()
|
|
if param.startswith("name="):
|
|
name = param.split("=", 1)[1].strip('"')
|
|
break
|
|
|
|
if name:
|
|
# Handle file uploads or text content
|
|
if content.startswith(b"<"):
|
|
# Likely XML content
|
|
data[name] = content.decode("utf-8", errors="ignore")
|
|
else:
|
|
data[name] = content.decode("utf-8", errors="ignore")
|
|
|
|
return data
|
|
|
|
|
|
@api_router.post("/alpinebits/server-2024-10")
|
|
@limiter.limit("60/minute")
|
|
async def alpinebits_server_handshake(
|
|
request: Request,
|
|
credentials_tupel: tuple = Depends(validate_basic_auth),
|
|
dbsession=Depends(get_async_session),
|
|
):
|
|
"""AlpineBits server endpoint implementing the handshake protocol.
|
|
|
|
This endpoint handles:
|
|
- Protocol version negotiation via X-AlpineBits-ClientProtocolVersion header
|
|
- Client identification via X-AlpineBits-ClientID header (optional)
|
|
- Multipart/form-data parsing for action and request parameters
|
|
- Gzip compression support
|
|
- Proper error handling with HTTP status codes
|
|
- Handshaking action processing
|
|
|
|
Authentication: HTTP Basic Auth required
|
|
Content-Type: multipart/form-data
|
|
Compression: gzip supported (check X-AlpineBits-Server-Accept-Encoding)
|
|
"""
|
|
try:
|
|
# Check required headers
|
|
client_protocol_version = request.headers.get(
|
|
"X-AlpineBits-ClientProtocolVersion"
|
|
)
|
|
if not client_protocol_version:
|
|
# Server concludes client speaks a protocol version preceding 2013-04
|
|
client_protocol_version = "pre-2013-04"
|
|
_LOGGER.info(
|
|
"No X-AlpineBits-ClientProtocolVersion header found, assuming pre-2013-04"
|
|
)
|
|
else:
|
|
_LOGGER.info("Client protocol version: %s", client_protocol_version)
|
|
|
|
# Optional client ID
|
|
client_id = request.headers.get("X-AlpineBits-ClientID")
|
|
if client_id:
|
|
_LOGGER.info("Client ID: %s", client_id)
|
|
|
|
# Check content encoding
|
|
content_encoding = request.headers.get("Content-Encoding")
|
|
is_compressed = content_encoding == "gzip"
|
|
|
|
if is_compressed:
|
|
_LOGGER.info("Request is gzip compressed")
|
|
|
|
# Get content type before processing
|
|
content_type = request.headers.get("Content-Type", "")
|
|
|
|
_LOGGER.info("Content-Type: %s", content_type)
|
|
_LOGGER.info("Content-Encoding: %s", content_encoding)
|
|
|
|
# Get request body
|
|
body = await request.body()
|
|
|
|
# Decompress if needed
|
|
form_data = validate_alpinebits_body(is_compressed, content_type, body)
|
|
|
|
# Check for required action parameter
|
|
action = form_data.get("action")
|
|
if not action:
|
|
raise HTTPException(
|
|
status_code=400, detail="ERROR: Missing required 'action' parameter"
|
|
)
|
|
|
|
_LOGGER.info("AlpineBits action: %s", action)
|
|
|
|
# Get optional request XML
|
|
request_xml = form_data.get("request")
|
|
|
|
server: AlpineBitsServer = app.state.alpine_bits_server
|
|
|
|
version = Version.V2024_10
|
|
|
|
username, password = credentials_tupel
|
|
|
|
client_info = AlpineBitsClientInfo(
|
|
username=username, password=password, client_id=client_id
|
|
)
|
|
|
|
# Create successful handshake response
|
|
response = await server.handle_request(
|
|
action,
|
|
request_xml,
|
|
client_info=client_info,
|
|
version=version,
|
|
dbsession=dbsession,
|
|
)
|
|
|
|
response_xml = response.xml_content
|
|
|
|
# Set response headers indicating server capabilities
|
|
headers = {
|
|
"Content-Type": "application/xml; charset=utf-8",
|
|
"X-AlpineBits-Server-Accept-Encoding": "gzip", # Indicate gzip support
|
|
"X-AlpineBits-Server-Version": "2024-10",
|
|
}
|
|
|
|
if is_compressed:
|
|
# Compress response if client sent compressed request
|
|
response_xml = gzip.compress(response_xml.encode("utf-8"))
|
|
headers["Content-Encoding"] = "gzip"
|
|
|
|
return Response(
|
|
content=response_xml, status_code=response.status_code, headers=headers
|
|
)
|
|
|
|
except HTTPException:
|
|
# Re-raise HTTP exceptions (auth errors, etc.)
|
|
raise
|
|
except Exception as e:
|
|
_LOGGER.exception("Error in AlpineBits handshake: %s", e)
|
|
raise HTTPException(status_code=500, detail="Internal server error")
|
|
|
|
|
|
def validate_alpinebits_body(is_compressed, content_type, body):
|
|
"""Check if the body conforms to AlpineBits expectations."""
|
|
if is_compressed:
|
|
try:
|
|
body = gzip.decompress(body)
|
|
|
|
except Exception:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="ERROR: Failed to decompress gzip content",
|
|
)
|
|
|
|
# Check content type (after decompression)
|
|
if (
|
|
"multipart/form-data" not in content_type
|
|
and "application/x-www-form-urlencoded" not in content_type
|
|
):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="ERROR: Content-Type must be multipart/form-data or application/x-www-form-urlencoded",
|
|
)
|
|
|
|
# Parse multipart data
|
|
if "multipart/form-data" in content_type:
|
|
try:
|
|
form_data = parse_multipart_data(content_type, body)
|
|
except Exception:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="ERROR: Failed to parse multipart/form-data",
|
|
)
|
|
elif "application/x-www-form-urlencoded" in content_type:
|
|
# Parse as urlencoded
|
|
form_data = dict(urllib.parse.parse_qsl(body.decode("utf-8")))
|
|
else:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="ERROR: Content-Type must be multipart/form-data or application/x-www-form-urlencoded",
|
|
)
|
|
|
|
return form_data
|
|
|
|
|
|
@api_router.get("/admin/stats")
|
|
@limiter.limit("10/minute")
|
|
async def get_api_stats(request: Request, admin_key: str = Depends(validate_api_key)):
|
|
"""Admin endpoint to get API usage statistics.
|
|
Requires admin API key.
|
|
"""
|
|
if admin_key != "admin-key":
|
|
raise HTTPException(status_code=403, detail="Admin access required")
|
|
|
|
# In a real application, you'd fetch this from your database/monitoring system
|
|
return {
|
|
"status": "success",
|
|
"stats": {
|
|
"uptime": "Available in production deployment",
|
|
"total_requests": "Available with monitoring setup",
|
|
"active_api_keys": len([k for k in ["wix-webhook-key", "admin-key"] if k]),
|
|
"rate_limit_backend": "redis" if os.getenv("REDIS_URL") else "memory",
|
|
},
|
|
"timestamp": datetime.now().isoformat(),
|
|
}
|
|
|
|
|
|
# Include the API router in the main app
|
|
app.include_router(api_router)
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
async def landing_page():
|
|
"""Serve the under construction landing page at the root route."""
|
|
try:
|
|
# Get the path to the HTML file
|
|
html_path = os.path.join(os.path.dirname(__file__), "templates", "index.html")
|
|
|
|
with open(html_path, encoding="utf-8") as f:
|
|
html_content = f.read()
|
|
|
|
return HTMLResponse(content=html_content, status_code=200)
|
|
except FileNotFoundError:
|
|
# Fallback if HTML file is not found
|
|
html_content = """
|
|
<!DOCTYPE html>
|
|
<html>
|
|
<head>
|
|
<title>99tales - Under Construction</title>
|
|
<style>
|
|
body { font-family: Arial, sans-serif; text-align: center; padding: 50px; }
|
|
h1 { color: #333; }
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<h1>🏗️ 99tales</h1>
|
|
<h2>Under Construction</h2>
|
|
<p>We're working hard to bring you something amazing!</p>
|
|
<p><a href="/api">API Documentation</a></p>
|
|
</body>
|
|
</html>
|
|
"""
|
|
return HTMLResponse(content=html_content, status_code=200)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|