Files
alpinebits_python/src/alpine_bits_python/api.py
2025-10-07 09:04:10 +02:00

877 lines
29 KiB
Python

from fastapi import (
FastAPI,
HTTPException,
BackgroundTasks,
Request,
Depends,
APIRouter,
Form,
File,
UploadFile,
)
from fastapi.concurrency import asynccontextmanager
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPBasicCredentials, HTTPBasic
from .config_loader import load_config
from fastapi.responses import HTMLResponse, PlainTextResponse, Response
from .models import WixFormSubmission
from datetime import datetime, date, timezone
from .auth import generate_unique_id, validate_api_key, validate_wix_signature, generate_api_key
from .rate_limit import (
limiter,
webhook_limiter,
custom_rate_limit_handler,
DEFAULT_RATE_LIMIT,
WEBHOOK_RATE_LIMIT,
BURST_RATE_LIMIT,
)
from slowapi.errors import RateLimitExceeded
import logging
from datetime import datetime
from typing import Dict, Any, Optional, List
import json
import os
import asyncio
import gzip
import xml.etree.ElementTree as ET
from .alpinebits_server import AlpineBitsClientInfo, AlpineBitsServer, Version, AlpineBitsActionName
import urllib.parse
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from functools import partial
import httpx
from .db import (
Base,
Customer as DBCustomer,
Reservation as DBReservation,
get_database_url,
)
# Configure logging
logging.basicConfig(level=logging.INFO)
_LOGGER = logging.getLogger(__name__)
# HTTP Basic auth for AlpineBits
security_basic = HTTPBasic()
from collections import defaultdict
# --- Enhanced event dispatcher with hotel-specific routing ---
class EventDispatcher:
def __init__(self):
self.listeners = defaultdict(list)
self.hotel_listeners = defaultdict(list) # hotel_code -> list of listeners
def register(self, event_name, func):
self.listeners[event_name].append(func)
def register_hotel_listener(self, event_name, hotel_code, func):
"""Register a listener for a specific hotel"""
self.hotel_listeners[f"{event_name}:{hotel_code}"].append(func)
async def dispatch(self, event_name, *args, **kwargs):
for func in self.listeners[event_name]:
await func(*args, **kwargs)
async def dispatch_for_hotel(self, event_name, hotel_code, *args, **kwargs):
"""Dispatch event only to listeners registered for specific hotel"""
key = f"{event_name}:{hotel_code}"
for func in self.hotel_listeners[key]:
await func(*args, **kwargs)
event_dispatcher = EventDispatcher()
# Load config at startup
async def push_listener(customer: DBCustomer, reservation: DBReservation, hotel):
"""
Push listener that sends reservation data to hotel's push endpoint.
Only called for reservations that match this hotel's hotel_id.
"""
push_endpoint = hotel.get("push_endpoint")
if not push_endpoint:
_LOGGER.warning(f"No push endpoint configured for hotel {hotel.get('hotel_id')}")
return
server: AlpineBitsServer = app.state.alpine_bits_server
hotel_id = hotel['hotel_id']
reservation_hotel_id = reservation.hotel_code
# Double-check hotel matching (should be guaranteed by dispatcher)
if hotel_id != reservation_hotel_id:
_LOGGER.warning(f"Hotel ID mismatch: listener for {hotel_id}, reservation for {reservation_hotel_id}")
return
_LOGGER.info(f"Processing push notification for hotel {hotel_id}, reservation {reservation.unique_id}")
# Prepare payload for push notification
request = await server.handle_request(request_action_name=AlpineBitsActionName.OTA_HOTEL_RES_NOTIF_GUEST_REQUESTS.request_name, request_xml=(reservation, customer), client_info=None, version=Version.V2024_10)
if request.status_code != 200:
_LOGGER.error(f"Failed to generate push request for hotel {hotel_id}, reservation {reservation.unique_id}: {request.xml_content}")
return
# save push request to file
logs_dir = "logs/push_requests"
if not os.path.exists(logs_dir):
os.makedirs(logs_dir, mode=0o755, exist_ok=True)
stat_info = os.stat(logs_dir)
_LOGGER.info(
f"Created directory owner: uid:{stat_info.st_uid}, gid:{stat_info.st_gid}"
)
_LOGGER.info(f"Directory mode: {oct(stat_info.st_mode)[-3:]}")
log_filename = (
f"{logs_dir}/alpinebits_push_{hotel_id}_{reservation.unique_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xml"
)
with open(log_filename, "w", encoding="utf-8") as f:
f.write(request.xml_content)
return
headers = {"Authorization": f"Bearer {push_endpoint.get('token','')}"} if push_endpoint.get('token') else {}
""
try:
async with httpx.AsyncClient() as client:
resp = await client.post(push_endpoint["url"], json=payload, headers=headers, timeout=10)
_LOGGER.info(f"Push event fired to {push_endpoint['url']} for hotel {hotel['hotel_id']}, status: {resp.status_code}")
if resp.status_code not in [200, 201, 202]:
_LOGGER.warning(f"Push endpoint returned non-success status {resp.status_code}: {resp.text}")
except Exception as e:
_LOGGER.error(f"Push event failed for hotel {hotel['hotel_id']}: {e}")
# Optionally implement retry logic here@asynccontextmanager
async def lifespan(app: FastAPI):
# Setup DB
try:
config = load_config()
except Exception as e:
_LOGGER.error(f"Failed to load config: {str(e)}")
config = {}
DATABASE_URL = get_database_url(config)
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
app.state.engine = engine
app.state.async_sessionmaker = AsyncSessionLocal
app.state.config = config
app.state.alpine_bits_server = AlpineBitsServer(config)
app.state.event_dispatcher = event_dispatcher
# Register push listeners for hotels with push_endpoint
for hotel in config.get("alpine_bits_auth", []):
push_endpoint = hotel.get("push_endpoint")
hotel_id = hotel.get("hotel_id")
if push_endpoint and hotel_id:
# Register hotel-specific listener
event_dispatcher.register_hotel_listener(
"form_processed",
hotel_id,
partial(push_listener, hotel=hotel)
)
_LOGGER.info(f"Registered push listener for hotel {hotel_id} with endpoint {push_endpoint.get('url')}")
elif push_endpoint and not hotel_id:
_LOGGER.warning(f"Hotel has push_endpoint but no hotel_id: {hotel}")
elif hotel_id and not push_endpoint:
_LOGGER.info(f"Hotel {hotel_id} has no push_endpoint configured")
# Create tables
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
_LOGGER.info("Database tables checked/created at startup.")
yield
# Optional: Dispose engine on shutdown
await engine.dispose()
async def get_async_session(request: Request):
async_sessionmaker = request.app.state.async_sessionmaker
async with async_sessionmaker() as session:
yield session
app = FastAPI(
title="Wix Form Handler API",
description="Secure API endpoint to receive and process Wix form submissions with authentication and rate limiting",
version="1.0.0",
lifespan=lifespan,
)
# Create API router with /api prefix
api_router = APIRouter(prefix="/api", tags=["api"])
# Add rate limiting
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, custom_rate_limit_handler)
# Add CORS middleware to allow requests from Wix
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://*.wix.com",
"https://*.wixstatic.com",
"http://localhost:3000", # For development
"http://localhost:8000", # For local testing
],
allow_credentials=True,
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
async def process_form_submission(submission_data: Dict[str, Any]) -> None:
"""
Background task to process the form submission.
Add your business logic here.
"""
try:
_LOGGER.info(
f"Processing form submission: {submission_data.get('submissionId')}"
)
# Example processing - you can replace this with your actual logic
form_name = submission_data.get("formName")
contact_email = (
submission_data.get("contact", {}).get("email")
if submission_data.get("contact")
else None
)
# Extract form fields
form_fields = {
k: v for k, v in submission_data.items() if k.startswith("field:")
}
_LOGGER.info(
f"Form: {form_name}, Contact: {contact_email}, Fields: {len(form_fields)}"
)
# Here you could:
# - Save to database
# - Send emails
# - Call external APIs
# - Process the data further
except Exception as e:
_LOGGER.error(f"Error processing form submission: {str(e)}")
@api_router.get("/")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def root(request: Request):
"""Health check endpoint"""
return {
"message": "Wix Form Handler API is running",
"timestamp": datetime.now().isoformat(),
"status": "healthy",
"authentication": "required",
"rate_limits": {
"default": DEFAULT_RATE_LIMIT,
"webhook": WEBHOOK_RATE_LIMIT,
"burst": BURST_RATE_LIMIT,
},
}
@api_router.get("/health")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def health_check(request: Request):
"""Detailed health check"""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"service": "wix-form-handler",
"version": "1.0.0",
"authentication": "enabled",
"rate_limiting": "enabled",
}
# Extracted business logic for handling Wix form submissions
async def process_wix_form_submission(request: Request, data: Dict[str, Any], db):
"""
Shared business logic for handling Wix form submissions (test and production).
"""
timestamp = datetime.now().isoformat()
_LOGGER.info(f"Received Wix form data at {timestamp}")
# _LOGGER.info(f"Data keys: {list(data.keys())}")
# _LOGGER.info(f"Full data: {json.dumps(data, indent=2)}")
log_entry = {
"timestamp": timestamp,
"client_ip": request.client.host if request.client else "unknown",
"headers": dict(request.headers),
"data": data,
"origin_header": request.headers.get("origin"),
"all_headers": dict(request.headers),
}
logs_dir = "logs"
if not os.path.exists(logs_dir):
os.makedirs(logs_dir, mode=0o755, exist_ok=True)
stat_info = os.stat(logs_dir)
_LOGGER.info(
f"Created directory owner: uid:{stat_info.st_uid}, gid:{stat_info.st_gid}"
)
_LOGGER.info(f"Directory mode: {oct(stat_info.st_mode)[-3:]}")
log_filename = (
f"{logs_dir}/wix_test_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
)
with open(log_filename, "w", encoding="utf-8") as f:
json.dump(log_entry, f, indent=2, default=str, ensure_ascii=False)
file_stat = os.stat(log_filename)
_LOGGER.info(f"Created file owner: uid:{file_stat.st_uid}, gid:{file_stat.st_gid}")
_LOGGER.info(f"File mode: {oct(file_stat.st_mode)[-3:]}")
_LOGGER.info(f"Data logged to: {log_filename}")
data = data.get("data") # Handle nested "data" key if present
# save customer and reservation to DB
contact_info = data.get("contact", {})
first_name = contact_info.get("name", {}).get("first")
last_name = contact_info.get("name", {}).get("last")
email = contact_info.get("email")
phone_number = contact_info.get("phones", [{}])[0].get("e164Phone")
locale = contact_info.get("locale", "de-de")
contact_id = contact_info.get("contactId")
name_prefix = data.get("field:anrede")
email_newsletter_string = data.get("field:form_field_5a7b", "")
yes_values = {"Selezionato", "Angekreuzt", "Checked"}
email_newsletter = (email_newsletter_string in yes_values)
address_line = None
city_name = None
postal_code = None
country_code = None
gender = None
birth_date = None
language = data.get("contact", {}).get("locale", "en")[:2]
# Dates
start_date = (
data.get("field:date_picker_a7c8")
or data.get("Anreisedatum")
or data.get("submissions", [{}])[1].get("value")
)
end_date = (
data.get("field:date_picker_7e65")
or data.get("Abreisedatum")
or data.get("submissions", [{}])[2].get("value")
)
# Room/guest info
num_adults = int(data.get("field:number_7cf5") or 2)
num_children = int(data.get("field:anzahl_kinder") or 0)
children_ages = []
if num_children > 0:
for k in data.keys():
if k.startswith("field:alter_kind_"):
try:
age = int(data[k])
children_ages.append(age)
except ValueError:
_LOGGER.warning(f"Invalid age value for {k}: {data[k]}")
offer = data.get("field:angebot_auswaehlen")
# UTM and offer
utm_fields = [
("utm_Source", "utm_source"),
("utm_Medium", "utm_medium"),
("utm_Campaign", "utm_campaign"),
("utm_Term", "utm_term"),
("utm_Content", "utm_content"),
]
# get submissionId and ensure max length 35. Generate one if not present
unique_id = data.get("submissionId", generate_unique_id())
if len(unique_id) > 32:
# strip to first 35 chars
unique_id = unique_id[:32]
# use database session
# Save all relevant data to DB (including new fields)
db_customer = DBCustomer(
given_name=first_name,
surname=last_name,
contact_id=contact_id,
name_prefix=name_prefix,
email_address=email,
phone=phone_number,
email_newsletter=email_newsletter,
address_line=address_line,
city_name=city_name,
postal_code=postal_code,
country_code=country_code,
gender=gender,
birth_date=birth_date,
language=language,
address_catalog=False,
name_title=None,
)
db.add(db_customer)
await db.flush() # This assigns db_customer.id without committing
#await db.refresh(db_customer)
# Determine hotel_code and hotel_name
# Priority: 1) Form field, 2) Configuration default, 3) Hardcoded fallback
hotel_code = (
data.get("field:hotelid") or
data.get("hotelid") or
request.app.state.config.get("default_hotel_code") or
"123" # fallback
)
hotel_name = (
data.get("field:hotelname") or
data.get("hotelname") or
request.app.state.config.get("default_hotel_name") or
"Frangart Inn" # fallback
)
db_reservation = DBReservation(
customer_id=db_customer.id,
unique_id=unique_id,
start_date=date.fromisoformat(start_date) if start_date else None,
end_date=date.fromisoformat(end_date) if end_date else None,
num_adults=num_adults,
num_children=num_children,
children_ages=",".join(str(a) for a in children_ages),
offer=offer,
created_at=datetime.now(timezone.utc),
utm_source=data.get("field:utm_source"),
utm_medium=data.get("field:utm_medium"),
utm_campaign=data.get("field:utm_campaign"),
utm_term=data.get("field:utm_term"),
utm_content=data.get("field:utm_content"),
user_comment=data.get("field:long_answer_3524", ""),
fbclid=data.get("field:fbclid"),
gclid=data.get("field:gclid"),
hotel_code=hotel_code,
hotel_name=hotel_name,
)
db.add(db_reservation)
await db.commit()
await db.refresh(db_reservation)
async def push_event():
# Fire event for listeners (push, etc.) - hotel-specific dispatch
dispatcher = getattr(request.app.state, "event_dispatcher", None)
if dispatcher:
# Get hotel_code from reservation to target the right listeners
hotel_code = getattr(db_reservation, 'hotel_code', None)
if hotel_code and hotel_code.strip():
await dispatcher.dispatch_for_hotel("form_processed", hotel_code, db_customer, db_reservation)
_LOGGER.info(f"Dispatched form_processed event for hotel {hotel_code}")
else:
_LOGGER.warning("No hotel_code in reservation, skipping push notifications")
asyncio.create_task(push_event())
return {
"status": "success",
"message": "Wix form data received successfully",
"received_keys": list(data.keys()),
"data_logged_to": log_filename,
"timestamp": timestamp,
"note": "No authentication required for this endpoint",
}
@api_router.post("/webhook/wix-form")
@webhook_limiter.limit(WEBHOOK_RATE_LIMIT)
async def handle_wix_form(
request: Request, data: Dict[str, Any], db_session=Depends(get_async_session)
):
"""
Unified endpoint to handle Wix form submissions (test and production).
No authentication required for this endpoint.
"""
try:
return await process_wix_form_submission(request, data, db_session)
except Exception as e:
_LOGGER.error(f"Error in handle_wix_form: {str(e)}")
# log stacktrace
import traceback
traceback_str = traceback.format_exc()
_LOGGER.error(f"Stack trace for handle_wix_form: {traceback_str}")
raise HTTPException(
status_code=500, detail=f"Error processing Wix form data"
)
@api_router.post("/webhook/wix-form/test")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def handle_wix_form_test(
request: Request, data: Dict[str, Any], db_session=Depends(get_async_session)
):
"""
Test endpoint to verify the API is working with raw JSON data.
No authentication required for testing purposes.
"""
try:
return await process_wix_form_submission(request, data, db_session)
except Exception as e:
_LOGGER.error(f"Error in handle_wix_form_test: {str(e)}")
raise HTTPException(
status_code=500, detail=f"Error processing test data"
)
@api_router.post("/admin/generate-api-key")
@limiter.limit("5/hour") # Very restrictive for admin operations
async def generate_new_api_key(
request: Request, admin_key: str = Depends(validate_api_key)
):
"""
Admin endpoint to generate new API keys.
Requires admin API key and is heavily rate limited.
"""
if admin_key != "admin-key":
raise HTTPException(status_code=403, detail="Admin access required")
new_key = generate_api_key()
_LOGGER.info(f"Generated new API key (requested by: {admin_key})")
return {
"status": "success",
"message": "New API key generated",
"api_key": new_key,
"timestamp": datetime.now().isoformat(),
"note": "Store this key securely - it won't be shown again",
}
async def validate_basic_auth(
credentials: HTTPBasicCredentials = Depends(security_basic),
) -> str:
"""
Validate basic authentication for AlpineBits protocol.
Returns username if valid, raises HTTPException if not.
"""
# Accept any username/password pair present in config['alpine_bits_auth']
if not credentials.username or not credentials.password:
raise HTTPException(
status_code=401,
detail="ERROR: Authentication required",
headers={"WWW-Authenticate": "Basic"},
)
valid = False
config = app.state.config
for entry in config["alpine_bits_auth"]:
if (
credentials.username == entry["username"]
and credentials.password == entry["password"]
):
valid = True
break
if not valid:
raise HTTPException(
status_code=401,
detail="ERROR: Invalid credentials",
headers={"WWW-Authenticate": "Basic"},
)
_LOGGER.info(
f"AlpineBits authentication successful for user: {credentials.username} (from config)"
)
return credentials.username, credentials.password
def parse_multipart_data(content_type: str, body: bytes) -> Dict[str, Any]:
"""
Parse multipart/form-data from raw request body.
This is a simplified parser for the AlpineBits use case.
"""
if "multipart/form-data" not in content_type:
raise HTTPException(
status_code=400, detail="ERROR: Content-Type must be multipart/form-data"
)
# Extract boundary
boundary = None
for part in content_type.split(";"):
part = part.strip()
if part.startswith("boundary="):
boundary = part.split("=", 1)[1].strip('"')
break
if not boundary:
raise HTTPException(
status_code=400, detail="ERROR: Missing boundary in multipart/form-data"
)
# Simple multipart parsing
parts = body.split(f"--{boundary}".encode())
data = {}
for part in parts:
if not part.strip() or part.strip() == b"--":
continue
# Split headers and content
if b"\r\n\r\n" in part:
headers_section, content = part.split(b"\r\n\r\n", 1)
content = content.rstrip(b"\r\n")
# Parse Content-Disposition header
headers = headers_section.decode("utf-8", errors="ignore")
name = None
for line in headers.split("\n"):
if "Content-Disposition" in line and "name=" in line:
# Extract name parameter
for param in line.split(";"):
param = param.strip()
if param.startswith("name="):
name = param.split("=", 1)[1].strip('"')
break
if name:
# Handle file uploads or text content
if content.startswith(b"<"):
# Likely XML content
data[name] = content.decode("utf-8", errors="ignore")
else:
data[name] = content.decode("utf-8", errors="ignore")
return data
@api_router.post("/alpinebits/server-2024-10")
@limiter.limit("60/minute")
async def alpinebits_server_handshake(
request: Request,
credentials_tupel: tuple = Depends(validate_basic_auth),
dbsession=Depends(get_async_session),
):
"""
AlpineBits server endpoint implementing the handshake protocol.
This endpoint handles:
- Protocol version negotiation via X-AlpineBits-ClientProtocolVersion header
- Client identification via X-AlpineBits-ClientID header (optional)
- Multipart/form-data parsing for action and request parameters
- Gzip compression support
- Proper error handling with HTTP status codes
- Handshaking action processing
Authentication: HTTP Basic Auth required
Content-Type: multipart/form-data
Compression: gzip supported (check X-AlpineBits-Server-Accept-Encoding)
"""
try:
# Check required headers
client_protocol_version = request.headers.get(
"X-AlpineBits-ClientProtocolVersion"
)
if not client_protocol_version:
# Server concludes client speaks a protocol version preceding 2013-04
client_protocol_version = "pre-2013-04"
_LOGGER.info(
"No X-AlpineBits-ClientProtocolVersion header found, assuming pre-2013-04"
)
else:
_LOGGER.info(f"Client protocol version: {client_protocol_version}")
# Optional client ID
client_id = request.headers.get("X-AlpineBits-ClientID")
if client_id:
_LOGGER.info(f"Client ID: {client_id}")
# Check content encoding
content_encoding = request.headers.get("Content-Encoding")
is_compressed = content_encoding == "gzip"
if is_compressed:
_LOGGER.info("Request is gzip compressed")
# Get content type before processing
content_type = request.headers.get("Content-Type", "")
_LOGGER.info(f"Content-Type: {content_type}")
_LOGGER.info(f"Content-Encoding: {content_encoding}")
# Get request body
body = await request.body()
# Decompress if needed
if is_compressed:
try:
body = gzip.decompress(body)
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"ERROR: Failed to decompress gzip content",
)
# Check content type (after decompression)
if (
"multipart/form-data" not in content_type
and "application/x-www-form-urlencoded" not in content_type
):
raise HTTPException(
status_code=400,
detail="ERROR: Content-Type must be multipart/form-data or application/x-www-form-urlencoded",
)
# Parse multipart data
if "multipart/form-data" in content_type:
try:
form_data = parse_multipart_data(content_type, body)
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"ERROR: Failed to parse multipart/form-data",
)
elif "application/x-www-form-urlencoded" in content_type:
# Parse as urlencoded
form_data = dict(urllib.parse.parse_qsl(body.decode("utf-8")))
else:
raise HTTPException(
status_code=400,
detail="ERROR: Content-Type must be multipart/form-data or application/x-www-form-urlencoded",
)
# Check for required action parameter
action = form_data.get("action")
if not action:
raise HTTPException(
status_code=400, detail="ERROR: Missing required 'action' parameter"
)
_LOGGER.info(f"AlpineBits action: {action}")
# Get optional request XML
request_xml = form_data.get("request")
server: AlpineBitsServer = app.state.alpine_bits_server
version = Version.V2024_10
username, password = credentials_tupel
client_info = AlpineBitsClientInfo(username=username, password=password, client_id=client_id)
# Create successful handshake response
response = await server.handle_request(
action,
request_xml,
client_info=client_info,
version=version,
dbsession=dbsession,
)
response_xml = response.xml_content
# Set response headers indicating server capabilities
headers = {
"Content-Type": "application/xml; charset=utf-8",
"X-AlpineBits-Server-Accept-Encoding": "gzip", # Indicate gzip support
"X-AlpineBits-Server-Version": "2024-10",
}
return Response(
content=response_xml, status_code=response.status_code, headers=headers
)
except HTTPException:
# Re-raise HTTP exceptions (auth errors, etc.)
raise
except Exception as e:
_LOGGER.error(f"Error in AlpineBits handshake: {str(e)}")
raise HTTPException(status_code=500, detail=f"Internal server error")
@api_router.get("/admin/stats")
@limiter.limit("10/minute")
async def get_api_stats(request: Request, admin_key: str = Depends(validate_api_key)):
"""
Admin endpoint to get API usage statistics.
Requires admin API key.
"""
if admin_key != "admin-key":
raise HTTPException(status_code=403, detail="Admin access required")
# In a real application, you'd fetch this from your database/monitoring system
return {
"status": "success",
"stats": {
"uptime": "Available in production deployment",
"total_requests": "Available with monitoring setup",
"active_api_keys": len([k for k in ["wix-webhook-key", "admin-key"] if k]),
"rate_limit_backend": "redis" if os.getenv("REDIS_URL") else "memory",
},
"timestamp": datetime.now().isoformat(),
}
# Include the API router in the main app
app.include_router(api_router)
@app.get("/", response_class=HTMLResponse)
async def landing_page():
"""
Serve the under construction landing page at the root route
"""
try:
# Get the path to the HTML file
import os
html_path = os.path.join(os.path.dirname(__file__), "templates", "index.html")
with open(html_path, "r", encoding="utf-8") as f:
html_content = f.read()
return HTMLResponse(content=html_content, status_code=200)
except FileNotFoundError:
# Fallback if HTML file is not found
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>99tales - Under Construction</title>
<style>
body { font-family: Arial, sans-serif; text-align: center; padding: 50px; }
h1 { color: #333; }
</style>
</head>
<body>
<h1>🏗️ 99tales</h1>
<h2>Under Construction</h2>
<p>We're working hard to bring you something amazing!</p>
<p><a href="/api">API Documentation</a></p>
</body>
</html>
"""
return HTMLResponse(content=html_content, status_code=200)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)