Files
alpinebits_python/src/alpine_bits_python/api.py
2025-10-09 10:04:34 +02:00

944 lines
32 KiB
Python

import asyncio
import gzip
import json
import logging
import os
import urllib.parse
from collections import defaultdict
from datetime import date, datetime
from functools import partial
from pathlib import Path
from typing import Any
import httpx
from fastapi import APIRouter, Depends, FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, Response
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from slowapi.errors import RateLimitExceeded
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from alpine_bits_python.schemas import ReservationData
from .alpinebits_server import (
AlpineBitsActionName,
AlpineBitsClientInfo,
AlpineBitsServer,
Version,
)
from .auth import generate_api_key, generate_unique_id, validate_api_key
from .config_loader import load_config
from .db import Base, get_database_url
from .db import Customer as DBCustomer
from .db import Reservation as DBReservation
from .rate_limit import (
BURST_RATE_LIMIT,
DEFAULT_RATE_LIMIT,
WEBHOOK_RATE_LIMIT,
custom_rate_limit_handler,
limiter,
webhook_limiter,
)
# Configure logging
logging.basicConfig(level=logging.INFO)
_LOGGER = logging.getLogger(__name__)
# HTTP Basic auth for AlpineBits
security_basic = HTTPBasic()
# --- Enhanced event dispatcher with hotel-specific routing ---
class EventDispatcher:
def __init__(self):
self.listeners = defaultdict(list)
self.hotel_listeners = defaultdict(list) # hotel_code -> list of listeners
def register(self, event_name, func):
self.listeners[event_name].append(func)
def register_hotel_listener(self, event_name, hotel_code, func):
"""Register a listener for a specific hotel"""
self.hotel_listeners[f"{event_name}:{hotel_code}"].append(func)
async def dispatch(self, event_name, *args, **kwargs):
for func in self.listeners[event_name]:
await func(*args, **kwargs)
async def dispatch_for_hotel(self, event_name, hotel_code, *args, **kwargs):
"""Dispatch event only to listeners registered for specific hotel"""
key = f"{event_name}:{hotel_code}"
for func in self.hotel_listeners[key]:
await func(*args, **kwargs)
event_dispatcher = EventDispatcher()
# Load config at startup
async def push_listener(customer: DBCustomer, reservation: DBReservation, hotel):
"""Push listener that sends reservation data to hotel's push endpoint.
Only called for reservations that match this hotel's hotel_id.
"""
push_endpoint = hotel.get("push_endpoint")
if not push_endpoint:
_LOGGER.warning(
f"No push endpoint configured for hotel {hotel.get('hotel_id')}"
)
return
server: AlpineBitsServer = app.state.alpine_bits_server
hotel_id = hotel["hotel_id"]
reservation_hotel_id = reservation.hotel_code
# Double-check hotel matching (should be guaranteed by dispatcher)
if hotel_id != reservation_hotel_id:
_LOGGER.warning(
f"Hotel ID mismatch: listener for {hotel_id}, reservation for {reservation_hotel_id}"
)
return
_LOGGER.info(
f"Processing push notification for hotel {hotel_id}, reservation {reservation.unique_id}"
)
# Prepare payload for push notification
request = await server.handle_request(
request_action_name=AlpineBitsActionName.OTA_HOTEL_RES_NOTIF_GUEST_REQUESTS.request_name,
request_xml=(reservation, customer),
client_info=None,
version=Version.V2024_10,
)
if request.status_code != 200:
_LOGGER.error(
f"Failed to generate push request for hotel {hotel_id}, reservation {reservation.unique_id}: {request.xml_content}"
)
return
# save push request to file
logs_dir = "logs/push_requests"
if not os.path.exists(logs_dir):
os.makedirs(logs_dir, mode=0o755, exist_ok=True)
stat_info = os.stat(logs_dir)
_LOGGER.info(
f"Created directory owner: uid:{stat_info.st_uid}, gid:{stat_info.st_gid}"
)
_LOGGER.info(f"Directory mode: {oct(stat_info.st_mode)[-3:]}")
log_filename = f"{logs_dir}/alpinebits_push_{hotel_id}_{reservation.unique_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xml"
with open(log_filename, "w", encoding="utf-8") as f:
f.write(request.xml_content)
return
headers = (
{"Authorization": f"Bearer {push_endpoint.get('token', '')}"}
if push_endpoint.get("token")
else {}
)
""
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
push_endpoint["url"], json=payload, headers=headers, timeout=10
)
_LOGGER.info(
f"Push event fired to {push_endpoint['url']} for hotel {hotel['hotel_id']}, status: {resp.status_code}"
)
if resp.status_code not in [200, 201, 202]:
_LOGGER.warning(
f"Push endpoint returned non-success status {resp.status_code}: {resp.text}"
)
except Exception as e:
_LOGGER.error(f"Push event failed for hotel {hotel['hotel_id']}: {e}")
# Optionally implement retry logic here@asynccontextmanager
async def lifespan(app: FastAPI):
# Setup DB
try:
config = load_config()
except Exception as e:
_LOGGER.error(f"Failed to load config: {e!s}")
config = {}
DATABASE_URL = get_database_url(config)
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
app.state.engine = engine
app.state.async_sessionmaker = AsyncSessionLocal
app.state.config = config
app.state.alpine_bits_server = AlpineBitsServer(config)
app.state.event_dispatcher = event_dispatcher
# Register push listeners for hotels with push_endpoint
for hotel in config.get("alpine_bits_auth", []):
push_endpoint = hotel.get("push_endpoint")
hotel_id = hotel.get("hotel_id")
if push_endpoint and hotel_id:
# Register hotel-specific listener
event_dispatcher.register_hotel_listener(
"form_processed", hotel_id, partial(push_listener, hotel=hotel)
)
_LOGGER.info(
f"Registered push listener for hotel {hotel_id} with endpoint {push_endpoint.get('url')}"
)
elif push_endpoint and not hotel_id:
_LOGGER.warning(f"Hotel has push_endpoint but no hotel_id: {hotel}")
elif hotel_id and not push_endpoint:
_LOGGER.info(f"Hotel {hotel_id} has no push_endpoint configured")
# Create tables
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
_LOGGER.info("Database tables checked/created at startup.")
yield
# Optional: Dispose engine on shutdown
await engine.dispose()
async def get_async_session(request: Request):
async_sessionmaker = request.app.state.async_sessionmaker
async with async_sessionmaker() as session:
yield session
app = FastAPI(
title="Wix Form Handler API",
description="Secure API endpoint to receive and process Wix form submissions with authentication and rate limiting",
version="1.0.0",
lifespan=lifespan,
)
# Create API router with /api prefix
api_router = APIRouter(prefix="/api", tags=["api"])
# Add rate limiting
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, custom_rate_limit_handler)
# Add CORS middleware to allow requests from Wix
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://*.wix.com",
"https://*.wixstatic.com",
"http://localhost:3000", # For development
"http://localhost:8000", # For local testing
],
allow_credentials=True,
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
@api_router.get("/")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def root(request: Request):
"""Health check endpoint"""
return {
"message": "Wix Form Handler API is running",
"timestamp": datetime.now().isoformat(),
"status": "healthy",
"authentication": "required",
"rate_limits": {
"default": DEFAULT_RATE_LIMIT,
"webhook": WEBHOOK_RATE_LIMIT,
"burst": BURST_RATE_LIMIT,
},
}
@api_router.get("/health")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def health_check(request: Request):
"""Detailed health check"""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"service": "wix-form-handler",
"version": "1.0.0",
"authentication": "enabled",
"rate_limiting": "enabled",
}
def create_db_reservation_from_data(
reservation_model: ReservationData, db_customer_id: int
) -> DBReservation:
"""Convert ReservationData to DBReservation, handling children_ages conversion."""
data = reservation_model.model_dump(exclude_none=True)
children_list = data.pop("children_ages", [])
children_csv = ",".join(str(int(a)) for a in children_list) if children_list else ""
data["children_ages"] = children_csv
# Inject FK
data["customer_id"] = db_customer_id
return DBReservation(**data)
# Extracted business logic for handling Wix form submissions
async def process_wix_form_submission(request: Request, data: dict[str, Any], db):
"""Shared business logic for handling Wix form submissions (test and production)."""
timestamp = datetime.now().isoformat()
_LOGGER.info(f"Received Wix form data at {timestamp}")
# _LOGGER.info(f"Data keys: {list(data.keys())}")
# _LOGGER.info(f"Full data: {json.dumps(data, indent=2)}")
log_entry = {
"timestamp": timestamp,
"client_ip": request.client.host if request.client else "unknown",
"headers": dict(request.headers),
"data": data,
"origin_header": request.headers.get("origin"),
"all_headers": dict(request.headers),
}
logs_dir = "logs"
if not os.path.exists(logs_dir):
os.makedirs(logs_dir, mode=0o755, exist_ok=True)
stat_info = os.stat(logs_dir)
_LOGGER.info(
f"Created directory owner: uid:{stat_info.st_uid}, gid:{stat_info.st_gid}"
)
_LOGGER.info(f"Directory mode: {oct(stat_info.st_mode)[-3:]}")
log_filename = (
f"{logs_dir}/wix_test_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
)
with open(log_filename, "w", encoding="utf-8") as f:
json.dump(log_entry, f, indent=2, default=str, ensure_ascii=False)
file_stat = os.stat(log_filename)
_LOGGER.info(f"Created file owner: uid:{file_stat.st_uid}, gid:{file_stat.st_gid}")
_LOGGER.info(f"File mode: {oct(file_stat.st_mode)[-3:]}")
_LOGGER.info(f"Data logged to: {log_filename}")
data = data.get("data") # Handle nested "data" key if present
# save customer and reservation to DB
contact_info = data.get("contact", {})
first_name = contact_info.get("name", {}).get("first")
last_name = contact_info.get("name", {}).get("last")
email = contact_info.get("email")
phone_number = contact_info.get("phones", [{}])[0].get("e164Phone")
locale = contact_info.get("locale", "de-de")
contact_id = contact_info.get("contactId")
name_prefix = data.get("field:anrede")
email_newsletter_string = data.get("field:form_field_5a7b", "")
yes_values = {"Selezionato", "Angekreuzt", "Checked"}
email_newsletter = email_newsletter_string in yes_values
address_line = None
city_name = None
postal_code = None
country_code = None
gender = None
birth_date = None
language = data.get("contact", {}).get("locale", "en")[:2]
# Dates
start_date = (
data.get("field:date_picker_a7c8")
or data.get("Anreisedatum")
or data.get("submissions", [{}])[1].get("value")
)
end_date = (
data.get("field:date_picker_7e65")
or data.get("Abreisedatum")
or data.get("submissions", [{}])[2].get("value")
)
# Room/guest info
num_adults = int(data.get("field:number_7cf5") or 2)
num_children = int(data.get("field:anzahl_kinder") or 0)
children_ages = []
if num_children > 0:
for k in data.keys():
if k.startswith("field:alter_kind_"):
try:
age = int(data[k])
children_ages.append(age)
except ValueError:
_LOGGER.warning(f"Invalid age value for {k}: {data[k]}")
offer = data.get("field:angebot_auswaehlen")
# get submissionId and ensure max length 35. Generate one if not present
unique_id = data.get("submissionId", generate_unique_id())
# use database session
# Save all relevant data to DB (including new fields)
db_customer = DBCustomer(
given_name=first_name,
surname=last_name,
contact_id=contact_id,
name_prefix=name_prefix,
email_address=email,
phone=phone_number,
email_newsletter=email_newsletter,
address_line=address_line,
city_name=city_name,
postal_code=postal_code,
country_code=country_code,
gender=gender,
birth_date=birth_date,
language=language,
address_catalog=False,
name_title=None,
)
db.add(db_customer)
await db.flush() # This assigns db_customer.id without committing
# await db.refresh(db_customer)
# Determine hotel_code and hotel_name
# Priority: 1) Form field, 2) Configuration default, 3) Hardcoded fallback
hotel_code = (
data.get("field:hotelid")
or data.get("hotelid")
or request.app.state.config.get("default_hotel_code")
or "123" # fallback
)
hotel_name = (
data.get("field:hotelname")
or data.get("hotelname")
or request.app.state.config.get("default_hotel_name")
or "Frangart Inn" # fallback
)
submissionTime = data.get("submissionTime") # 2025-10-07T05:48:41.855Z
try:
if submissionTime:
submissionTime = datetime.fromisoformat(
submissionTime[:-1]
) # Remove Z and convert
except Exception as e:
_LOGGER.error("Error parsing submissionTime: %s", e)
submissionTime = None
reservation = ReservationData(
unique_id=unique_id,
start_date=date.fromisoformat(start_date),
end_date=date.fromisoformat(end_date),
num_adults=num_adults,
num_children=num_children,
children_ages=children_ages,
hotel_code=hotel_code,
hotel_name=hotel_name,
offer=offer,
created_at=submissionTime,
utm_source=data.get("field:utm_source"),
utm_medium=data.get("field:utm_medium"),
utm_campaign=data.get("field:utm_campaign"),
utm_term=data.get("field:utm_term"),
utm_content=data.get("field:utm_content"),
user_comment=data.get("field:long_answer_3524", ""),
fbclid=data.get("field:fbclid"),
gclid=data.get("field:gclid"),
)
if reservation.md5_unique_id is None:
raise HTTPException(status_code=400, detail="Failed to generate md5_unique_id")
db_reservation = create_db_reservation_from_data(reservation, db_customer.id)
db.add(db_reservation)
await db.commit()
await db.refresh(db_reservation)
async def push_event():
# Fire event for listeners (push, etc.) - hotel-specific dispatch
dispatcher = getattr(request.app.state, "event_dispatcher", None)
if dispatcher:
# Get hotel_code from reservation to target the right listeners
hotel_code = getattr(db_reservation, "hotel_code", None)
if hotel_code and hotel_code.strip():
await dispatcher.dispatch_for_hotel(
"form_processed", hotel_code, db_customer, db_reservation
)
_LOGGER.info("Dispatched form_processed event for hotel %s", hotel_code)
else:
_LOGGER.warning(
"No hotel_code in reservation, skipping push notifications"
)
asyncio.create_task(push_event())
return {
"status": "success",
"message": "Wix form data received successfully",
"received_keys": list(data.keys()),
"data_logged_to": log_filename,
"timestamp": timestamp,
"note": "No authentication required for this endpoint",
}
async def validate_basic_auth(
credentials: HTTPBasicCredentials = Depends(security_basic),
) -> str:
"""Validate basic authentication for AlpineBits protocol.
Returns username if valid, raises HTTPException if not.
"""
# Accept any username/password pair present in config['alpine_bits_auth']
if not credentials.username or not credentials.password:
raise HTTPException(
status_code=401,
detail="ERROR: Authentication required",
headers={"WWW-Authenticate": "Basic"},
)
valid = False
config = app.state.config
for entry in config["alpine_bits_auth"]:
if (
credentials.username == entry["username"]
and credentials.password == entry["password"]
):
valid = True
break
if not valid:
raise HTTPException(
status_code=401,
detail="ERROR: Invalid credentials",
headers={"WWW-Authenticate": "Basic"},
)
_LOGGER.info(
"AlpineBits authentication successful for user: %s (from config)",
credentials.username,
)
return credentials.username, credentials.password
@api_router.post("/webhook/wix-form")
@webhook_limiter.limit(WEBHOOK_RATE_LIMIT)
async def handle_wix_form(
request: Request, data: dict[str, Any], db_session=Depends(get_async_session)
):
"""Unified endpoint to handle Wix form submissions (test and production).
No authentication required for this endpoint.
"""
try:
return await process_wix_form_submission(request, data, db_session)
except Exception as e:
_LOGGER.error(f"Error in handle_wix_form: {e!s}")
# log stacktrace
import traceback
traceback_str = traceback.format_exc()
_LOGGER.error(f"Stack trace for handle_wix_form: {traceback_str}")
raise HTTPException(status_code=500, detail="Error processing Wix form data")
@api_router.post("/webhook/wix-form/test")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def handle_wix_form_test(
request: Request, data: dict[str, Any], db_session=Depends(get_async_session)
):
"""Test endpoint to verify the API is working with raw JSON data.
No authentication required for testing purposes.
"""
try:
return await process_wix_form_submission(request, data, db_session)
except Exception as e:
_LOGGER.error(f"Error in handle_wix_form_test: {e!s}")
raise HTTPException(status_code=500, detail="Error processing test data")
@api_router.post("/hoteldata/conversions_import")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def handle_xml_upload(
request: Request, credentials_tupel: tuple = Depends(validate_basic_auth)
):
"""Endpoint for receiving XML files for conversion processing.
Requires basic authentication and saves XML files to log directory.
Supports gzip compression via Content-Encoding header.
"""
try:
# Get the raw body content
body = await request.body()
if not body:
raise HTTPException(
status_code=400, detail="ERROR: No XML content provided"
)
# Check if content is gzip compressed
content_encoding = request.headers.get("content-encoding", "").lower()
is_gzipped = content_encoding == "gzip"
# Decompress if gzipped
if is_gzipped:
try:
body = gzip.decompress(body)
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"ERROR: Failed to decompress gzip content: {e}",
) from e
# Try to decode as UTF-8
try:
xml_content = body.decode("utf-8")
except UnicodeDecodeError:
# If UTF-8 fails, try with latin-1 as fallback
xml_content = body.decode("latin-1")
# Basic validation that it's XML-like
if not xml_content.strip().startswith("<"):
raise HTTPException(
status_code=400, detail="ERROR: Content does not appear to be XML"
)
# Create logs directory for XML conversions
logs_dir = Path("logs/conversions_import")
if not logs_dir.exists():
logs_dir.mkdir(parents=True, mode=0o755, exist_ok=True)
_LOGGER.info("Created directory: %s", logs_dir)
# Generate filename with timestamp and authenticated user
username, _ = credentials_tupel
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_filename = logs_dir / f"xml_import_{username}_{timestamp}.xml"
# Save XML content to file
log_filename.write_text(xml_content, encoding="utf-8")
_LOGGER.info("XML file saved to %s by user %s", log_filename, username)
response_headers = {
"Content-Type": "application/xml; charset=utf-8",
"X-AlpineBits-Server-Accept-Encoding": "gzip",
}
return Response(
content="Xml received", headers=response_headers, status_code=200
)
except HTTPException:
raise
except Exception:
_LOGGER.exception("Error in handle_xml_upload")
raise HTTPException(status_code=500, detail="Error processing XML upload")
# UNUSED
@api_router.post("/admin/generate-api-key")
@limiter.limit("5/hour") # Very restrictive for admin operations
async def generate_new_api_key(
request: Request, admin_key: str = Depends(validate_api_key)
):
"""Admin endpoint to generate new API keys.
Requires admin API key and is heavily rate limited.
"""
if admin_key != "admin-key":
raise HTTPException(status_code=403, detail="Admin access required")
new_key = generate_api_key()
_LOGGER.info(f"Generated new API key (requested by: {admin_key})")
return {
"status": "success",
"message": "New API key generated",
"api_key": new_key,
"timestamp": datetime.now().isoformat(),
"note": "Store this key securely - it won't be shown again",
}
# TODO Bit sketchy. May need requests-toolkit in the future
def parse_multipart_data(content_type: str, body: bytes) -> dict[str, Any]:
"""Parse multipart/form-data from raw request body.
This is a simplified parser for the AlpineBits use case.
"""
if "multipart/form-data" not in content_type:
raise HTTPException(
status_code=400, detail="ERROR: Content-Type must be multipart/form-data"
)
# Extract boundary
boundary = None
for part in content_type.split(";"):
part = part.strip()
if part.startswith("boundary="):
boundary = part.split("=", 1)[1].strip('"')
break
if not boundary:
raise HTTPException(
status_code=400, detail="ERROR: Missing boundary in multipart/form-data"
)
# Simple multipart parsing
parts = body.split(f"--{boundary}".encode())
data = {}
for part in parts:
if not part.strip() or part.strip() == b"--":
continue
# Split headers and content
if b"\r\n\r\n" in part:
headers_section, content = part.split(b"\r\n\r\n", 1)
content = content.rstrip(b"\r\n")
# Parse Content-Disposition header
headers = headers_section.decode("utf-8", errors="ignore")
name = None
for line in headers.split("\n"):
if "Content-Disposition" in line and "name=" in line:
# Extract name parameter
for param in line.split(";"):
param = param.strip()
if param.startswith("name="):
name = param.split("=", 1)[1].strip('"')
break
if name:
# Handle file uploads or text content
if content.startswith(b"<"):
# Likely XML content
data[name] = content.decode("utf-8", errors="ignore")
else:
data[name] = content.decode("utf-8", errors="ignore")
return data
@api_router.post("/alpinebits/server-2024-10")
@limiter.limit("60/minute")
async def alpinebits_server_handshake(
request: Request,
credentials_tupel: tuple = Depends(validate_basic_auth),
dbsession=Depends(get_async_session),
):
"""AlpineBits server endpoint implementing the handshake protocol.
This endpoint handles:
- Protocol version negotiation via X-AlpineBits-ClientProtocolVersion header
- Client identification via X-AlpineBits-ClientID header (optional)
- Multipart/form-data parsing for action and request parameters
- Gzip compression support
- Proper error handling with HTTP status codes
- Handshaking action processing
Authentication: HTTP Basic Auth required
Content-Type: multipart/form-data
Compression: gzip supported (check X-AlpineBits-Server-Accept-Encoding)
"""
try:
# Check required headers
client_protocol_version = request.headers.get(
"X-AlpineBits-ClientProtocolVersion"
)
if not client_protocol_version:
# Server concludes client speaks a protocol version preceding 2013-04
client_protocol_version = "pre-2013-04"
_LOGGER.info(
"No X-AlpineBits-ClientProtocolVersion header found, assuming pre-2013-04"
)
else:
_LOGGER.info("Client protocol version: %s", client_protocol_version)
# Optional client ID
client_id = request.headers.get("X-AlpineBits-ClientID")
if client_id:
_LOGGER.info("Client ID: %s", client_id)
# Check content encoding
content_encoding = request.headers.get("Content-Encoding")
is_compressed = content_encoding == "gzip"
if is_compressed:
_LOGGER.info("Request is gzip compressed")
# Get content type before processing
content_type = request.headers.get("Content-Type", "")
_LOGGER.info("Content-Type: %s", content_type)
_LOGGER.info("Content-Encoding: %s", content_encoding)
# Get request body
body = await request.body()
# Decompress if needed
form_data = validate_alpinebits_body(is_compressed, content_type, body)
# Check for required action parameter
action = form_data.get("action")
if not action:
raise HTTPException(
status_code=400, detail="ERROR: Missing required 'action' parameter"
)
_LOGGER.info(f"AlpineBits action: {action}")
# Get optional request XML
request_xml = form_data.get("request")
server: AlpineBitsServer = app.state.alpine_bits_server
version = Version.V2024_10
username, password = credentials_tupel
client_info = AlpineBitsClientInfo(
username=username, password=password, client_id=client_id
)
# Create successful handshake response
response = await server.handle_request(
action,
request_xml,
client_info=client_info,
version=version,
dbsession=dbsession,
)
response_xml = response.xml_content
# Set response headers indicating server capabilities
headers = {
"Content-Type": "application/xml; charset=utf-8",
"X-AlpineBits-Server-Accept-Encoding": "gzip", # Indicate gzip support
"X-AlpineBits-Server-Version": "2024-10",
}
if is_compressed:
# Compress response if client sent compressed request
response_xml = gzip.compress(response_xml.encode("utf-8"))
headers["Content-Encoding"] = "gzip"
return Response(
content=response_xml, status_code=response.status_code, headers=headers
)
except HTTPException:
# Re-raise HTTP exceptions (auth errors, etc.)
raise
except Exception as e:
_LOGGER.error(f"Error in AlpineBits handshake: {e!s}")
raise HTTPException(status_code=500, detail="Internal server error")
def validate_alpinebits_body(is_compressed, content_type, body):
"""Check if the body conforms to AlpineBits expectations."""
if is_compressed:
try:
body = gzip.decompress(body)
except Exception:
raise HTTPException(
status_code=400,
detail="ERROR: Failed to decompress gzip content",
)
# Check content type (after decompression)
if (
"multipart/form-data" not in content_type
and "application/x-www-form-urlencoded" not in content_type
):
raise HTTPException(
status_code=400,
detail="ERROR: Content-Type must be multipart/form-data or application/x-www-form-urlencoded",
)
# Parse multipart data
if "multipart/form-data" in content_type:
try:
form_data = parse_multipart_data(content_type, body)
except Exception:
raise HTTPException(
status_code=400,
detail="ERROR: Failed to parse multipart/form-data",
)
elif "application/x-www-form-urlencoded" in content_type:
# Parse as urlencoded
form_data = dict(urllib.parse.parse_qsl(body.decode("utf-8")))
else:
raise HTTPException(
status_code=400,
detail="ERROR: Content-Type must be multipart/form-data or application/x-www-form-urlencoded",
)
return form_data
@api_router.get("/admin/stats")
@limiter.limit("10/minute")
async def get_api_stats(request: Request, admin_key: str = Depends(validate_api_key)):
"""Admin endpoint to get API usage statistics.
Requires admin API key.
"""
if admin_key != "admin-key":
raise HTTPException(status_code=403, detail="Admin access required")
# In a real application, you'd fetch this from your database/monitoring system
return {
"status": "success",
"stats": {
"uptime": "Available in production deployment",
"total_requests": "Available with monitoring setup",
"active_api_keys": len([k for k in ["wix-webhook-key", "admin-key"] if k]),
"rate_limit_backend": "redis" if os.getenv("REDIS_URL") else "memory",
},
"timestamp": datetime.now().isoformat(),
}
# Include the API router in the main app
app.include_router(api_router)
@app.get("/", response_class=HTMLResponse)
async def landing_page():
"""Serve the under construction landing page at the root route."""
try:
# Get the path to the HTML file
html_path = os.path.join(os.path.dirname(__file__), "templates", "index.html")
with open(html_path, encoding="utf-8") as f:
html_content = f.read()
return HTMLResponse(content=html_content, status_code=200)
except FileNotFoundError:
# Fallback if HTML file is not found
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>99tales - Under Construction</title>
<style>
body { font-family: Arial, sans-serif; text-align: center; padding: 50px; }
h1 { color: #333; }
</style>
</head>
<body>
<h1>🏗️ 99tales</h1>
<h2>Under Construction</h2>
<p>We're working hard to bring you something amazing!</p>
<p><a href="/api">API Documentation</a></p>
</body>
</html>
"""
return HTMLResponse(content=html_content, status_code=200)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)