1635 lines
56 KiB
Python
1635 lines
56 KiB
Python
"""API endpoints for the form-data and the alpinebits server."""
|
|
|
|
import asyncio
|
|
import gzip
|
|
import json
|
|
import multiprocessing
|
|
import os
|
|
import urllib.parse
|
|
import xml.dom.minidom
|
|
from collections import defaultdict
|
|
from contextlib import asynccontextmanager
|
|
from datetime import UTC, datetime, timedelta
|
|
from functools import partial
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import httpx
|
|
from fast_langdetect import detect
|
|
from fastapi import APIRouter, BackgroundTasks, Depends, FastAPI, HTTPException, Request
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import HTMLResponse, Response
|
|
from fastapi.security import (
|
|
HTTPAuthorizationCredentials,
|
|
HTTPBasic,
|
|
HTTPBasicCredentials,
|
|
HTTPBearer,
|
|
)
|
|
from pydantic import BaseModel
|
|
from slowapi.errors import RateLimitExceeded
|
|
from sqlalchemy import and_, select, update
|
|
from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from alpine_bits_python.hotel_service import HotelService
|
|
from alpine_bits_python.schemas import WebhookRequestData
|
|
|
|
from .alpinebits_server import (
|
|
AlpineBitsActionName,
|
|
AlpineBitsClientInfo,
|
|
AlpineBitsServer,
|
|
Version,
|
|
)
|
|
from .auth import validate_api_key
|
|
from .config_loader import get_username_for_hotel, load_config
|
|
from .const import HttpStatusCode, WebhookStatus
|
|
from .conversion_service import ConversionService
|
|
from .csv_import import CSVImporter
|
|
from .db import Customer as DBCustomer
|
|
from .db import (
|
|
Hotel,
|
|
ResilientAsyncSession,
|
|
SessionMaker,
|
|
WebhookEndpoint,
|
|
WebhookRequest,
|
|
create_database_engine,
|
|
)
|
|
from .db import Reservation as DBReservation
|
|
from .db_setup import run_startup_tasks
|
|
from .email_monitoring import ReservationStatsCollector
|
|
from .email_service import create_email_service
|
|
from .logging_config import get_logger, setup_logging
|
|
from .pushover_service import create_pushover_service
|
|
from .rate_limit import (
|
|
BURST_RATE_LIMIT,
|
|
DEFAULT_RATE_LIMIT,
|
|
WEBHOOK_RATE_LIMIT,
|
|
custom_rate_limit_handler,
|
|
limiter,
|
|
webhook_limiter,
|
|
)
|
|
from .webhook_processor import webhook_registry
|
|
from .worker_coordination import is_primary_worker
|
|
|
|
# Configure logging - will be reconfigured during lifespan with actual config
|
|
_LOGGER = get_logger(__name__)
|
|
|
|
# HTTP Basic auth for AlpineBits
|
|
security_basic = HTTPBasic()
|
|
# HTTP Bearer auth for API endpoints
|
|
security_bearer = HTTPBearer()
|
|
|
|
# Constants for token sanitization
|
|
TOKEN_LOG_LENGTH = 10
|
|
|
|
|
|
# Pydantic models for language detection
|
|
class LanguageDetectionRequest(BaseModel):
|
|
text: str
|
|
|
|
|
|
class LanguageDetectionResponse(BaseModel):
|
|
language_code: str
|
|
score: float
|
|
|
|
|
|
# --- Enhanced event dispatcher with hotel-specific routing ---
|
|
class EventDispatcher:
|
|
"""Simple event dispatcher for AlpineBits push requests."""
|
|
|
|
def __init__(self):
|
|
self.listeners = defaultdict(list)
|
|
self.hotel_listeners = defaultdict(list) # hotel_code -> list of listeners
|
|
|
|
def register(self, event_name, func):
|
|
self.listeners[event_name].append(func)
|
|
|
|
def register_hotel_listener(self, event_name, hotel_code, func):
|
|
"""Register a listener for a specific hotel."""
|
|
self.hotel_listeners[f"{event_name}:{hotel_code}"].append(func)
|
|
|
|
async def dispatch(self, event_name, *args, **kwargs):
|
|
for func in self.listeners[event_name]:
|
|
await func(*args, **kwargs)
|
|
|
|
async def dispatch_for_hotel(self, event_name, hotel_code, *args, **kwargs):
|
|
"""Dispatch event only to listeners registered for specific hotel."""
|
|
key = f"{event_name}:{hotel_code}"
|
|
for func in self.hotel_listeners[key]:
|
|
await func(*args, **kwargs)
|
|
|
|
|
|
event_dispatcher = EventDispatcher()
|
|
|
|
# Load config at startup
|
|
|
|
|
|
async def push_listener(customer: DBCustomer, reservation: DBReservation, hotel):
|
|
"""Push listener that sends reservation data to hotel's push endpoint.
|
|
|
|
Only called for reservations that match this hotel's hotel_id.
|
|
"""
|
|
push_endpoint = hotel.get("push_endpoint")
|
|
if not push_endpoint:
|
|
_LOGGER.warning(
|
|
"No push endpoint configured for hotel %s", hotel.get("hotel_id")
|
|
)
|
|
return
|
|
|
|
server: AlpineBitsServer = app.state.alpine_bits_server
|
|
hotel_id = hotel["hotel_id"]
|
|
reservation_hotel_id = reservation.hotel_id
|
|
|
|
# Double-check hotel matching (should be guaranteed by dispatcher)
|
|
if hotel_id != reservation_hotel_id:
|
|
_LOGGER.warning(
|
|
"Hotel ID mismatch: listener for %s, reservation for %s",
|
|
hotel_id,
|
|
reservation_hotel_id,
|
|
)
|
|
return
|
|
|
|
_LOGGER.info(
|
|
"Processing push notification for hotel %s, reservation %s",
|
|
hotel_id,
|
|
reservation.unique_id,
|
|
)
|
|
|
|
# Prepare payload for push notification
|
|
|
|
request = await server.handle_request(
|
|
request_action_name=AlpineBitsActionName.OTA_HOTEL_RES_NOTIF_GUEST_REQUESTS.request_name,
|
|
request_xml=(reservation, customer),
|
|
client_info=None,
|
|
version=Version.V2024_10,
|
|
)
|
|
|
|
if request.status_code != HttpStatusCode.OK:
|
|
_LOGGER.error(
|
|
"Failed to generate push request for hotel %s, reservation %s: %s",
|
|
hotel_id,
|
|
reservation.unique_id,
|
|
request.xml_content,
|
|
)
|
|
return
|
|
|
|
# save push request to file
|
|
|
|
logs_dir = Path("logs/push_requests")
|
|
if not logs_dir.exists():
|
|
logs_dir.mkdir(parents=True, mode=0o755, exist_ok=True)
|
|
stat_info = os.stat(logs_dir)
|
|
_LOGGER.info(
|
|
"Created directory owner: uid:%s, gid:%s",
|
|
stat_info.st_uid,
|
|
stat_info.st_gid,
|
|
)
|
|
_LOGGER.info("Directory mode: %s", oct(stat_info.st_mode)[-3:])
|
|
log_filename = f"{logs_dir}/alpinebits_push_{hotel_id}_{reservation.unique_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xml"
|
|
|
|
with open(log_filename, "w", encoding="utf-8") as f:
|
|
f.write(request.xml_content)
|
|
return
|
|
|
|
headers = (
|
|
{"Authorization": f"Bearer {push_endpoint.get('token', '')}"}
|
|
if push_endpoint.get("token")
|
|
else {}
|
|
)
|
|
""
|
|
try:
|
|
async with httpx.AsyncClient() as client:
|
|
resp = await client.post(
|
|
push_endpoint["url"], json=payload, headers=headers, timeout=10
|
|
)
|
|
_LOGGER.info(
|
|
"Push event fired to %s for hotel %s, status: %s",
|
|
push_endpoint["url"],
|
|
hotel["hotel_id"],
|
|
resp.status_code,
|
|
)
|
|
|
|
if resp.status_code not in [200, 201, 202]:
|
|
_LOGGER.warning(
|
|
"Push endpoint returned non-success status %s: %s",
|
|
resp.status_code,
|
|
resp.text,
|
|
)
|
|
|
|
except Exception as e:
|
|
_LOGGER.exception("Push event failed for hotel %s: %s", hotel["hotel_id"], e)
|
|
# Optionally implement retry logic here
|
|
|
|
|
|
async def cleanup_stale_webhooks(
|
|
async_sessionmaker: async_sessionmaker, timeout_minutes: int = 10
|
|
) -> int:
|
|
"""Reset webhooks stuck in 'processing' (worker crashed).
|
|
|
|
Args:
|
|
async_sessionmaker: SQLAlchemy async sessionmaker
|
|
timeout_minutes: Timeout threshold in minutes
|
|
|
|
Returns:
|
|
Number of stale webhooks reset
|
|
|
|
"""
|
|
timeout_threshold = datetime.now(UTC) - timedelta(minutes=timeout_minutes)
|
|
|
|
async with async_sessionmaker() as session:
|
|
result = await session.execute(
|
|
update(WebhookRequest)
|
|
.where(
|
|
and_(
|
|
WebhookRequest.status == WebhookStatus.PROCESSING,
|
|
WebhookRequest.processing_started_at < timeout_threshold,
|
|
)
|
|
)
|
|
.values(
|
|
status=WebhookStatus.FAILED,
|
|
last_error="Processing timeout - worker may have crashed",
|
|
)
|
|
)
|
|
await session.commit()
|
|
count = result.rowcount
|
|
|
|
if count > 0:
|
|
_LOGGER.warning("Reset %d stale webhooks to 'failed'", count)
|
|
|
|
return count
|
|
|
|
|
|
async def purge_old_webhook_payloads(
|
|
async_sessionmaker: async_sessionmaker, retention_days: int = 7
|
|
) -> int:
|
|
"""Purge payload_json from old completed webhooks.
|
|
|
|
Keeps metadata for history but removes large JSON payload.
|
|
|
|
Args:
|
|
async_sessionmaker: SQLAlchemy async sessionmaker
|
|
retention_days: Days to retain payloads before purging
|
|
|
|
Returns:
|
|
Number of payloads purged
|
|
|
|
"""
|
|
cutoff_date = datetime.now(UTC) - timedelta(days=retention_days)
|
|
|
|
async with async_sessionmaker() as session:
|
|
result = await session.execute(
|
|
update(WebhookRequest)
|
|
.where(
|
|
and_(
|
|
WebhookRequest.status == WebhookStatus.COMPLETED,
|
|
WebhookRequest.created_at < cutoff_date,
|
|
WebhookRequest.purged_at.is_(None), # Not already purged
|
|
)
|
|
)
|
|
.values(payload_json=None, purged_at=datetime.now(UTC))
|
|
)
|
|
await session.commit()
|
|
count = result.rowcount
|
|
|
|
if count > 0:
|
|
_LOGGER.info("Purged payloads from %d old webhook requests", count)
|
|
|
|
return count
|
|
|
|
|
|
async def periodic_webhook_cleanup(async_sessionmaker: async_sessionmaker):
|
|
"""Run periodic cleanup tasks for webhooks.
|
|
|
|
This should be scheduled to run every 5-10 minutes.
|
|
|
|
Args:
|
|
async_sessionmaker: SQLAlchemy async sessionmaker
|
|
|
|
"""
|
|
try:
|
|
# Clean up stale webhooks (stuck in 'processing')
|
|
stale_count = await cleanup_stale_webhooks(async_sessionmaker)
|
|
|
|
# Purge old webhook payloads (older than 7 days)
|
|
purged_count = await purge_old_webhook_payloads(async_sessionmaker)
|
|
|
|
_LOGGER.debug(
|
|
"Webhook cleanup: %d stale reset, %d payloads purged",
|
|
stale_count,
|
|
purged_count,
|
|
)
|
|
except Exception as e:
|
|
_LOGGER.exception("Error during periodic webhook cleanup: %s", e)
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
# Setup DB
|
|
|
|
# Determine if this is the primary worker using file-based locking
|
|
# Only primary runs schedulers/background tasks
|
|
# In multi-worker setups, only one worker should run singleton services
|
|
is_primary, worker_lock = is_primary_worker()
|
|
|
|
_LOGGER.info(
|
|
"Worker startup: process=%s, pid=%d, primary=%s",
|
|
multiprocessing.current_process().name,
|
|
os.getpid(),
|
|
is_primary,
|
|
)
|
|
|
|
try:
|
|
config = load_config()
|
|
except Exception:
|
|
_LOGGER.exception("Failed to load config: ")
|
|
config = {}
|
|
|
|
# Get event loop for email monitoring
|
|
loop = asyncio.get_running_loop()
|
|
|
|
# Initialize email service (before logging setup so it can be used by handlers)
|
|
email_service = create_email_service(config)
|
|
|
|
# Initialize pushover service
|
|
pushover_service = create_pushover_service(config)
|
|
|
|
# Setup logging from config with unified notification monitoring
|
|
# Only primary worker should have the report scheduler running
|
|
alert_handler, report_scheduler = setup_logging(
|
|
config, email_service, pushover_service, loop, enable_scheduler=is_primary
|
|
)
|
|
_LOGGER.info("Application startup initiated (primary_worker=%s)", is_primary)
|
|
|
|
# Create database engine with schema support
|
|
engine = create_database_engine(config=config, echo=False)
|
|
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
|
|
|
|
# Create resilient session wrapper for automatic connection recovery
|
|
resilient_session = ResilientAsyncSession(AsyncSessionLocal, engine)
|
|
|
|
# Create SessionMaker for concurrent processing
|
|
session_maker = SessionMaker(AsyncSessionLocal)
|
|
|
|
app.state.engine = engine
|
|
app.state.async_sessionmaker = AsyncSessionLocal
|
|
app.state.resilient_session = resilient_session
|
|
app.state.session_maker = session_maker
|
|
app.state.config = config
|
|
app.state.alpine_bits_server = AlpineBitsServer(config)
|
|
app.state.event_dispatcher = event_dispatcher
|
|
app.state.email_service = email_service
|
|
app.state.pushover_service = pushover_service
|
|
app.state.alert_handler = alert_handler
|
|
app.state.report_scheduler = report_scheduler
|
|
|
|
# Initialize webhook processors
|
|
from .webhook_processor import initialize_webhook_processors
|
|
|
|
initialize_webhook_processors()
|
|
_LOGGER.info("Webhook processors initialized")
|
|
|
|
# Register push listeners for hotels with push_endpoint
|
|
for hotel in config.get("alpine_bits_auth", []):
|
|
push_endpoint = hotel.get("push_endpoint")
|
|
hotel_id = hotel.get("hotel_id")
|
|
|
|
if push_endpoint and hotel_id:
|
|
# Register hotel-specific listener
|
|
event_dispatcher.register_hotel_listener(
|
|
"form_processed", hotel_id, partial(push_listener, hotel=hotel)
|
|
)
|
|
_LOGGER.info(
|
|
"Registered push listener for hotel %s with endpoint %s",
|
|
hotel_id,
|
|
push_endpoint.get("url"),
|
|
)
|
|
elif push_endpoint and not hotel_id:
|
|
_LOGGER.warning("Hotel has push_endpoint but no hotel_id: %s", hotel)
|
|
elif hotel_id and not push_endpoint:
|
|
_LOGGER.info("Hotel %s has no push_endpoint configured", hotel_id)
|
|
|
|
# Run startup tasks (only in primary worker to avoid race conditions)
|
|
# NOTE: Database migrations should already have been run before the app started
|
|
# via run_migrations.py or `uv run alembic upgrade head`
|
|
if is_primary:
|
|
_LOGGER.info("Running startup tasks (primary worker)...")
|
|
await run_startup_tasks(AsyncSessionLocal, config, engine)
|
|
_LOGGER.info("Startup tasks completed")
|
|
else:
|
|
_LOGGER.info("Skipping startup tasks (non-primary worker)")
|
|
|
|
# Initialize and hook up stats collector for daily reports
|
|
# Note: report_scheduler will only exist on the primary worker
|
|
if report_scheduler:
|
|
stats_collector = ReservationStatsCollector(
|
|
async_sessionmaker=AsyncSessionLocal,
|
|
config=config,
|
|
)
|
|
# Hook up the stats collector to the report scheduler
|
|
report_scheduler.set_stats_collector(stats_collector.collect_stats)
|
|
_LOGGER.info("Stats collector initialized and hooked up to report scheduler")
|
|
|
|
# Start daily report scheduler
|
|
report_scheduler.start()
|
|
_LOGGER.info("Daily report scheduler started")
|
|
|
|
# Start periodic webhook cleanup (only on primary worker)
|
|
cleanup_task = None
|
|
if is_primary:
|
|
|
|
async def run_periodic_cleanup():
|
|
"""Run cleanup tasks every 5 minutes."""
|
|
while True:
|
|
try:
|
|
await asyncio.sleep(300) # 5 minutes
|
|
await periodic_webhook_cleanup(AsyncSessionLocal)
|
|
except asyncio.CancelledError:
|
|
_LOGGER.info("Webhook cleanup task cancelled")
|
|
break
|
|
except Exception as e:
|
|
_LOGGER.exception("Error in periodic webhook cleanup: %s", e)
|
|
|
|
cleanup_task = asyncio.create_task(run_periodic_cleanup())
|
|
_LOGGER.info("Webhook periodic cleanup task started")
|
|
|
|
_LOGGER.info("Application startup complete")
|
|
|
|
yield
|
|
|
|
# Cleanup on shutdown
|
|
_LOGGER.info("Application shutdown initiated")
|
|
|
|
# Stop webhook cleanup task
|
|
if cleanup_task:
|
|
cleanup_task.cancel()
|
|
try:
|
|
await cleanup_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
_LOGGER.info("Webhook cleanup task stopped")
|
|
|
|
# Stop daily report scheduler
|
|
if report_scheduler:
|
|
report_scheduler.stop()
|
|
_LOGGER.info("Daily report scheduler stopped")
|
|
|
|
# Close alert handler (flush any remaining errors)
|
|
if alert_handler:
|
|
alert_handler.close()
|
|
_LOGGER.info("Alert handler closed")
|
|
|
|
# Shutdown email service thread pool
|
|
if email_service:
|
|
email_service.shutdown()
|
|
_LOGGER.info("Email service shut down")
|
|
|
|
# Dispose engine
|
|
await engine.dispose()
|
|
_LOGGER.info("Application shutdown complete")
|
|
|
|
# Release worker lock if this was the primary worker
|
|
if worker_lock:
|
|
worker_lock.release()
|
|
|
|
|
|
async def get_async_session(request: Request):
|
|
"""Get a database session with automatic connection recovery.
|
|
|
|
This dependency provides an async session that will automatically
|
|
retry on connection errors, disposing the pool and reconnecting.
|
|
"""
|
|
async_sessionmaker = request.app.state.async_sessionmaker
|
|
async with async_sessionmaker() as session:
|
|
yield session
|
|
|
|
|
|
def get_session_maker(request: Request) -> SessionMaker:
|
|
"""Get the SessionMaker for creating independent database sessions.
|
|
|
|
This dependency provides a SessionMaker that can be used to create
|
|
multiple independent sessions for concurrent processing tasks.
|
|
Useful for processing large datasets concurrently where each task
|
|
needs its own database transaction context.
|
|
"""
|
|
return request.app.state.session_maker
|
|
|
|
|
|
def get_resilient_session(request: Request) -> ResilientAsyncSession:
|
|
"""Get the resilient session manager from app state.
|
|
|
|
This provides access to the ResilientAsyncSession for use in handlers
|
|
that need retry capability on connection errors.
|
|
"""
|
|
return request.app.state.resilient_session
|
|
|
|
|
|
app = FastAPI(
|
|
title="Wix Form Handler API",
|
|
description="Secure API endpoint to receive and process Wix form submissions with authentication and rate limiting",
|
|
version="1.0.0",
|
|
lifespan=lifespan,
|
|
)
|
|
|
|
# Create API router with /api prefix
|
|
api_router = APIRouter(prefix="/api", tags=["api"])
|
|
|
|
# Add rate limiting
|
|
app.state.limiter = limiter
|
|
app.add_exception_handler(RateLimitExceeded, custom_rate_limit_handler)
|
|
|
|
# Add CORS middleware to allow requests from Wix
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=[
|
|
"https://*.wix.com",
|
|
"https://*.wixstatic.com",
|
|
"http://localhost:3000", # For development
|
|
"http://localhost:8000", # For local testing
|
|
],
|
|
allow_credentials=True,
|
|
allow_methods=["GET", "POST"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
@api_router.get("/")
|
|
@limiter.limit(DEFAULT_RATE_LIMIT)
|
|
async def root(request: Request):
|
|
"""Health check endpoint."""
|
|
return {
|
|
"message": "Wix Form Handler API is running",
|
|
"timestamp": datetime.now().isoformat(),
|
|
"status": "healthy",
|
|
"authentication": "required",
|
|
"rate_limits": {
|
|
"default": DEFAULT_RATE_LIMIT,
|
|
"webhook": WEBHOOK_RATE_LIMIT,
|
|
"burst": BURST_RATE_LIMIT,
|
|
},
|
|
}
|
|
|
|
|
|
@api_router.get("/health")
|
|
@limiter.limit(DEFAULT_RATE_LIMIT)
|
|
async def health_check(request: Request):
|
|
"""Detailed health check."""
|
|
return {
|
|
"status": "healthy",
|
|
"timestamp": datetime.now().isoformat(),
|
|
"service": "wix-form-handler",
|
|
"version": "1.0.0",
|
|
"authentication": "enabled",
|
|
"rate_limiting": "enabled",
|
|
}
|
|
|
|
|
|
@api_router.post("/detect-language", response_model=LanguageDetectionResponse)
|
|
@limiter.limit(DEFAULT_RATE_LIMIT)
|
|
async def detect_language(
|
|
request: Request,
|
|
data: LanguageDetectionRequest,
|
|
credentials: HTTPAuthorizationCredentials = Depends(security_bearer),
|
|
):
|
|
"""Detect language of text, restricted to Italian or German.
|
|
|
|
Requires Bearer token authentication.
|
|
Returns the most likely language (it or de) with confidence score.
|
|
"""
|
|
# Validate bearer token
|
|
token = credentials.credentials
|
|
config = request.app.state.config
|
|
|
|
# Check if token is valid
|
|
valid_tokens = config.get("api_tokens", [])
|
|
|
|
# If no tokens configured, reject authentication
|
|
if not valid_tokens:
|
|
_LOGGER.error("No api_tokens configured in config.yaml")
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Authentication token not configured on server",
|
|
)
|
|
|
|
if token not in valid_tokens:
|
|
# Log sanitized token (first TOKEN_LOG_LENGTH chars) for security
|
|
sanitized_token = (
|
|
token[:TOKEN_LOG_LENGTH] + "..." if len(token) > TOKEN_LOG_LENGTH else token
|
|
)
|
|
_LOGGER.warning("Invalid token attempt: %s", sanitized_token)
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Invalid authentication token",
|
|
)
|
|
|
|
try:
|
|
# Detect language with k=2 to get top 2 candidates
|
|
results = detect(data.text, k=2)
|
|
|
|
_LOGGER.info("Language detection results: %s", results)
|
|
|
|
# Filter for Italian (it) or German (de)
|
|
italian_german_results = [r for r in results if r.get("lang") in ["it", "de"]]
|
|
|
|
if italian_german_results:
|
|
# Return the best match between Italian and German
|
|
best_match = italian_german_results[0]
|
|
|
|
return_value = "Italienisch" if best_match["lang"] == "it" else "Deutsch"
|
|
return LanguageDetectionResponse(
|
|
language_code=return_value, score=best_match.get("score", 0.0)
|
|
)
|
|
# If neither Italian nor German detected in top 2, check all results
|
|
all_results = detect(data.text, k=10)
|
|
italian_german_all = [r for r in all_results if r.get("lang") in ["it", "de"]]
|
|
|
|
if italian_german_all:
|
|
best_match = italian_german_all[0]
|
|
|
|
return_value = "Italienisch" if best_match["lang"] == "it" else "Deutsch"
|
|
return LanguageDetectionResponse(
|
|
language_code=return_value, score=best_match.get("score", 0.0)
|
|
)
|
|
|
|
# Default to German if no clear detection
|
|
_LOGGER.warning(
|
|
"Could not detect Italian or German in text: %s, defaulting to 'de'",
|
|
data.text[:100],
|
|
)
|
|
return LanguageDetectionResponse(language_code="Deutsch", score=0.0)
|
|
|
|
except Exception as e:
|
|
_LOGGER.exception("Error detecting language: %s", e)
|
|
raise HTTPException(status_code=500, detail=f"Error detecting language: {e!s}")
|
|
|
|
|
|
async def validate_basic_auth(
|
|
credentials: HTTPBasicCredentials = Depends(security_basic),
|
|
db_session=Depends(get_async_session),
|
|
) -> tuple[str, str]:
|
|
"""Validate basic authentication for AlpineBits protocol.
|
|
|
|
Returns username if valid, raises HTTPException if not.
|
|
"""
|
|
# Accept any username/password pair present in config['alpine_bits_auth']
|
|
if not credentials.username or not credentials.password:
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="ERROR: Authentication required",
|
|
headers={"WWW-Authenticate": "Basic"},
|
|
)
|
|
hotel_service = HotelService(db_session)
|
|
hotel = await hotel_service.authenticate_hotel(
|
|
credentials.username, credentials.password
|
|
)
|
|
|
|
if hotel:
|
|
_LOGGER.info(
|
|
"AlpineBits authentication successful for user: %s (from database)",
|
|
credentials.username,
|
|
)
|
|
return credentials.username, credentials.password
|
|
|
|
# Fallback to config-defined credentials for legacy scenarios
|
|
config = app.state.config
|
|
valid = False
|
|
for entry in config.get("alpine_bits_auth", []):
|
|
if (
|
|
credentials.username == entry.get("username")
|
|
and credentials.password == entry.get("password")
|
|
):
|
|
valid = True
|
|
_LOGGER.warning(
|
|
"AlpineBits authentication for user %s matched legacy config entry",
|
|
credentials.username,
|
|
)
|
|
break
|
|
|
|
if not valid:
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="ERROR: Invalid credentials",
|
|
headers={"WWW-Authenticate": "Basic"},
|
|
)
|
|
|
|
return credentials.username, credentials.password
|
|
|
|
|
|
@api_router.post("/webhook/{webhook_secret}")
|
|
@webhook_limiter.limit(WEBHOOK_RATE_LIMIT)
|
|
async def handle_webhook_unified(
|
|
request: Request,
|
|
webhook_secret: str,
|
|
db_session: AsyncSession = Depends(get_async_session),
|
|
):
|
|
"""Unified webhook handler with deduplication and routing.
|
|
|
|
Supports both new secure webhook URLs and legacy endpoints:
|
|
- /webhook/{64-char-secret} - New secure endpoints
|
|
- /webhook/wix-form - Legacy Wix form endpoint (extracts hotel from payload)
|
|
- /webhook/generic - Legacy generic webhook endpoint (extracts hotel from payload)
|
|
|
|
Flow:
|
|
1. Look up webhook_endpoint by webhook_secret (or detect legacy endpoint)
|
|
2. Parse and hash payload (SHA256)
|
|
3. Check for duplicate using SELECT FOR UPDATE SKIP LOCKED
|
|
4. If duplicate and completed: return success (idempotent)
|
|
5. If duplicate and processing: return success (concurrent request)
|
|
6. Create or update webhook_request with status='processing'
|
|
7. Route to appropriate processor based on webhook_endpoint.webhook_type
|
|
8. Update status to 'completed' or 'failed'
|
|
9. Return response
|
|
"""
|
|
timestamp = datetime.now(UTC)
|
|
|
|
# 2. Parse payload first (needed for legacy endpoint detection)
|
|
body = await request.body()
|
|
|
|
# Handle gzip compression
|
|
if request.headers.get("content-encoding", "").lower() == "gzip":
|
|
try:
|
|
body = gzip.decompress(body)
|
|
except Exception as e:
|
|
_LOGGER.error("Failed to decompress gzip payload: %s", e)
|
|
raise HTTPException(status_code=400, detail="Invalid gzip compression")
|
|
|
|
try:
|
|
payload = json.loads(body.decode("utf-8"))
|
|
except Exception as e:
|
|
_LOGGER.error("Failed to parse JSON payload: %s", e)
|
|
raise HTTPException(status_code=400, detail="Invalid JSON payload")
|
|
|
|
# 1. Detect if this is a legacy endpoint or look up webhook_endpoint
|
|
webhook_endpoint: WebhookEndpoint | None = None
|
|
is_legacy = False
|
|
webhook_type = None
|
|
hotel_id_from_payload = None
|
|
|
|
# Check if webhook_secret looks like a legacy endpoint name
|
|
if webhook_secret in ("wix-form", "generic"):
|
|
is_legacy = True
|
|
webhook_type = "wix_form" if webhook_secret == "wix-form" else "generic"
|
|
|
|
# Extract hotel_id from payload based on webhook type
|
|
if webhook_type == "wix_form":
|
|
# Wix forms: field:hotelid or use default
|
|
hotel_id_from_payload = (
|
|
payload.get("data", {}).get("field:hotelid")
|
|
if isinstance(payload.get("data"), dict)
|
|
else payload.get("field:hotelid")
|
|
)
|
|
if not hotel_id_from_payload:
|
|
hotel_id_from_payload = request.app.state.config.get(
|
|
"default_hotel_code", "123"
|
|
)
|
|
_LOGGER.info(
|
|
"Legacy wix-form endpoint: using default hotel_code=%s",
|
|
hotel_id_from_payload,
|
|
)
|
|
elif webhook_type == "generic":
|
|
# Generic webhooks: hotel_data.hotelcode or use default
|
|
hotel_data = payload.get("hotel_data", {})
|
|
hotel_id_from_payload = hotel_data.get("hotelcode")
|
|
if not hotel_id_from_payload:
|
|
hotel_id_from_payload = request.app.state.config.get(
|
|
"default_hotel_code", "123"
|
|
)
|
|
_LOGGER.info(
|
|
"Legacy generic endpoint: using default hotel_code=%s",
|
|
hotel_id_from_payload,
|
|
)
|
|
|
|
_LOGGER.info(
|
|
"Legacy endpoint detected: %s, webhook_type=%s, hotel_id=%s",
|
|
webhook_secret,
|
|
webhook_type,
|
|
hotel_id_from_payload,
|
|
)
|
|
|
|
# Look up the webhook endpoint for this hotel and type
|
|
result = await db_session.execute(
|
|
select(WebhookEndpoint)
|
|
.where(
|
|
and_(
|
|
WebhookEndpoint.hotel_id == hotel_id_from_payload,
|
|
WebhookEndpoint.webhook_type == webhook_type,
|
|
WebhookEndpoint.is_enabled == True,
|
|
)
|
|
)
|
|
.options(selectinload(WebhookEndpoint.hotel))
|
|
)
|
|
webhook_endpoint = result.scalar_one_or_none()
|
|
|
|
if not webhook_endpoint:
|
|
_LOGGER.error(
|
|
"No webhook endpoint found for legacy endpoint: hotel_id=%s, type=%s",
|
|
hotel_id_from_payload,
|
|
webhook_type,
|
|
)
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"No webhook configuration found for hotel {hotel_id_from_payload}",
|
|
)
|
|
else:
|
|
# New secure endpoint - look up by webhook_secret
|
|
result = await db_session.execute(
|
|
select(WebhookEndpoint)
|
|
.where(
|
|
and_(
|
|
WebhookEndpoint.webhook_secret == webhook_secret,
|
|
WebhookEndpoint.is_enabled == True,
|
|
)
|
|
)
|
|
.options(selectinload(WebhookEndpoint.hotel))
|
|
)
|
|
webhook_endpoint = result.scalar_one_or_none()
|
|
|
|
if not webhook_endpoint:
|
|
raise HTTPException(status_code=404, detail="Webhook not found")
|
|
|
|
webhook_endpoint_id = webhook_endpoint.id
|
|
webhook_hotel_id = webhook_endpoint.hotel_id
|
|
|
|
# Verify hotel is active
|
|
if not webhook_endpoint.hotel.is_active:
|
|
raise HTTPException(status_code=404, detail="Hotel is not active")
|
|
|
|
# 3. Track payload metadata with canonical hashing handled by WebhookRequestData
|
|
payload_size = len(body)
|
|
|
|
# Check payload size limit (10MB)
|
|
if payload_size > 10 * 1024 * 1024:
|
|
_LOGGER.error("Payload too large: %d bytes", payload_size)
|
|
raise HTTPException(status_code=413, detail="Payload too large (max 10MB)")
|
|
|
|
webhook_request_data = WebhookRequestData(
|
|
payload_json=payload,
|
|
webhook_endpoint_id=webhook_endpoint_id,
|
|
hotel_id=webhook_hotel_id,
|
|
status=WebhookStatus.PROCESSING,
|
|
processing_started_at=timestamp,
|
|
created_at=timestamp,
|
|
source_ip=request.client.host if request.client else None,
|
|
user_agent=request.headers.get("user-agent"),
|
|
)
|
|
|
|
payload_hash = webhook_request_data.payload_hash
|
|
|
|
# 4. Check for duplicate with row-level locking
|
|
duplicate = await db_session.execute(
|
|
select(WebhookRequest)
|
|
.where(WebhookRequest.payload_hash == payload_hash)
|
|
.with_for_update(skip_locked=True)
|
|
)
|
|
existing = duplicate.scalar_one_or_none()
|
|
|
|
if existing:
|
|
if existing.status == WebhookStatus.COMPLETED:
|
|
# Already processed successfully
|
|
_LOGGER.info(
|
|
"Webhook already processed (webhook_id=%d, hotel=%s)",
|
|
existing.id,
|
|
webhook_endpoint.hotel_id,
|
|
)
|
|
return {
|
|
"status": "success",
|
|
"message": "Webhook already processed",
|
|
"webhook_id": existing.id,
|
|
"duplicate": True,
|
|
}
|
|
if existing.status == WebhookStatus.PROCESSING:
|
|
# Another worker is processing right now
|
|
_LOGGER.info(
|
|
"Webhook is being processed by another worker (webhook_id=%d)",
|
|
existing.id,
|
|
)
|
|
return {
|
|
"status": "success",
|
|
"message": "Webhook is being processed",
|
|
"webhook_id": existing.id,
|
|
"duplicate": True,
|
|
}
|
|
if existing.status == WebhookStatus.FAILED:
|
|
# Retry failed webhook
|
|
_LOGGER.info(
|
|
"Retrying failed webhook (webhook_id=%d, retry_count=%d)",
|
|
existing.id,
|
|
existing.retry_count,
|
|
)
|
|
webhook_request = existing
|
|
webhook_request.retry_count += 1
|
|
webhook_request.status = WebhookStatus.PROCESSING
|
|
webhook_request.processing_started_at = timestamp
|
|
else:
|
|
# 5. Create new webhook_request from validated data
|
|
webhook_request = WebhookRequest(**webhook_request_data.model_dump())
|
|
|
|
db_session.add(webhook_request)
|
|
await db_session.flush()
|
|
|
|
webhook_request_id = webhook_request.id
|
|
|
|
try:
|
|
# 6. Get processor for webhook_type
|
|
processor = webhook_registry.get_processor(webhook_endpoint.webhook_type)
|
|
if not processor:
|
|
raise ValueError(f"No processor for type: {webhook_endpoint.webhook_type}")
|
|
|
|
# Persist the webhook row before handing off to processors
|
|
await db_session.commit()
|
|
|
|
# 7. Process webhook with simplified interface
|
|
result = await processor.process(
|
|
webhook_request=webhook_request,
|
|
db_session=db_session,
|
|
config=request.app.state.config,
|
|
event_dispatcher=request.app.state.event_dispatcher,
|
|
)
|
|
|
|
if not db_session.in_transaction():
|
|
await db_session.begin()
|
|
|
|
completion_values = {
|
|
"status": WebhookStatus.COMPLETED,
|
|
"processing_completed_at": datetime.now(UTC),
|
|
}
|
|
|
|
if isinstance(result, dict):
|
|
created_customer_id = result.get("customer_id")
|
|
created_reservation_id = result.get("reservation_id")
|
|
if created_customer_id:
|
|
completion_values["created_customer_id"] = created_customer_id
|
|
if created_reservation_id:
|
|
completion_values["created_reservation_id"] = created_reservation_id
|
|
|
|
await db_session.execute(
|
|
update(WebhookRequest)
|
|
.where(WebhookRequest.id == webhook_request_id)
|
|
.values(**completion_values)
|
|
)
|
|
await db_session.commit()
|
|
|
|
return {
|
|
**result,
|
|
"webhook_id": webhook_request_id,
|
|
"hotel_id": webhook_hotel_id,
|
|
}
|
|
|
|
except Exception as e:
|
|
_LOGGER.exception("Error processing webhook: %s", e)
|
|
|
|
await db_session.rollback()
|
|
if not db_session.in_transaction():
|
|
await db_session.begin()
|
|
await db_session.execute(
|
|
update(WebhookRequest)
|
|
.where(WebhookRequest.id == webhook_request_id)
|
|
.values(
|
|
status=WebhookStatus.FAILED,
|
|
last_error=str(e)[:2000],
|
|
processing_completed_at=datetime.now(UTC),
|
|
)
|
|
)
|
|
await db_session.commit()
|
|
|
|
raise HTTPException(status_code=500, detail="Error processing webhook")
|
|
|
|
|
|
async def _process_csv_import_background(
|
|
csv_content: str,
|
|
filename: str,
|
|
hotel_code: str,
|
|
session_maker: SessionMaker,
|
|
config: dict[str, Any],
|
|
log_filename: Path,
|
|
):
|
|
"""Background task to process CSV import with automatic acknowledgement.
|
|
|
|
This runs in a separate asyncio task after the HTTP response is sent.
|
|
Handles both file saving and database processing.
|
|
All imported reservations are automatically acknowledged using the username
|
|
associated with the hotel_code from the config.
|
|
|
|
Args:
|
|
csv_content: CSV content as string
|
|
filename: Original filename
|
|
hotel_code: Hotel code (mandatory) - used to get username for acknowledgements
|
|
session_maker: SessionMaker for creating database sessions
|
|
config: Application configuration
|
|
log_filename: Path to save the CSV file
|
|
|
|
"""
|
|
try:
|
|
# First, save the CSV file (in background)
|
|
await asyncio.to_thread(log_filename.write_text, csv_content, encoding="utf-8")
|
|
_LOGGER.debug("CSV file saved to %s", log_filename)
|
|
|
|
# Now process the CSV import
|
|
_LOGGER.info("Starting database processing of %s", filename)
|
|
|
|
# Get username for acknowledgements from config
|
|
username = get_username_for_hotel(config, hotel_code)
|
|
|
|
# Create a new session for this background task
|
|
db_session = await session_maker.create_session()
|
|
try:
|
|
importer = CSVImporter(db_session, config)
|
|
# Import with pre-acknowledgement enabled
|
|
stats = await importer.import_csv_file(
|
|
str(log_filename),
|
|
hotel_code,
|
|
dryrun=False,
|
|
pre_acknowledge=True,
|
|
client_id=hotel_code,
|
|
username=username,
|
|
)
|
|
|
|
_LOGGER.info("CSV import complete for %s: %s", filename, stats)
|
|
finally:
|
|
await db_session.close()
|
|
except Exception:
|
|
_LOGGER.exception("Error processing CSV import in background for %s", filename)
|
|
|
|
|
|
@api_router.put("/admin/import-csv/{hotel_code}/{filename:path}")
|
|
@limiter.limit(BURST_RATE_LIMIT)
|
|
async def import_csv_endpoint(
|
|
request: Request,
|
|
background_tasks: BackgroundTasks,
|
|
hotel_code: str,
|
|
filename: str,
|
|
credentials: tuple = Depends(validate_basic_auth),
|
|
db_session=Depends(get_async_session),
|
|
session_maker: SessionMaker = Depends(get_session_maker),
|
|
):
|
|
"""Import reservations from CSV data sent via PUT request.
|
|
|
|
This endpoint allows importing historical form data into the system.
|
|
It creates customers and reservations, avoiding duplicates based on:
|
|
- Name, email, reservation dates
|
|
- fbclid/gclid tracking IDs
|
|
|
|
Returns immediately with 202 Accepted while processing continues in background.
|
|
All imported reservations are automatically acknowledged using the username
|
|
associated with the hotel_code in the config.
|
|
|
|
Requires basic authentication and saves CSV files to log directory.
|
|
Supports gzip compression via Content-Encoding header.
|
|
|
|
Args:
|
|
hotel_code: Hotel code (mandatory) - used to get username for acknowledgements
|
|
filename: Name for the CSV file (used for logging)
|
|
credentials: Basic auth credentials
|
|
|
|
Example: PUT /api/admin/import-csv/39054_001/reservations.csv
|
|
|
|
"""
|
|
try:
|
|
# Validate filename to prevent path traversal
|
|
if ".." in filename or filename.startswith("/"):
|
|
raise HTTPException(status_code=400, detail="ERROR: Invalid filename")
|
|
|
|
# Get the raw body content
|
|
body = await request.body()
|
|
|
|
if not body:
|
|
raise HTTPException(
|
|
status_code=400, detail="ERROR: No CSV content provided"
|
|
)
|
|
|
|
# Check if content is gzip compressed
|
|
content_encoding = request.headers.get("content-encoding", "").lower()
|
|
is_gzipped = content_encoding == "gzip"
|
|
|
|
# Decompress if gzipped
|
|
if is_gzipped:
|
|
try:
|
|
body = gzip.decompress(body)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"ERROR: Failed to decompress gzip content: {e}",
|
|
) from e
|
|
|
|
# Try to decode as UTF-8
|
|
try:
|
|
csv_content = body.decode("utf-8")
|
|
except UnicodeDecodeError:
|
|
# If UTF-8 fails, try with latin-1 as fallback
|
|
csv_content = body.decode("latin-1")
|
|
|
|
# Basic validation that it looks like CSV
|
|
if not csv_content.strip():
|
|
raise HTTPException(status_code=400, detail="ERROR: CSV content is empty")
|
|
|
|
# Create logs directory for CSV imports (blocking, but fast)
|
|
logs_dir = Path("logs/csv_imports")
|
|
logs_dir.mkdir(parents=True, mode=0o755, exist_ok=True)
|
|
|
|
# Generate filename with timestamp and authenticated user
|
|
username, _ = credentials
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
base_filename = Path(filename).stem
|
|
extension = Path(filename).suffix or ".csv"
|
|
log_filename = logs_dir / f"{base_filename}_{username}_{timestamp}{extension}"
|
|
|
|
_LOGGER.info(
|
|
"CSV file queued for processing: %s by user %s (original: %s)",
|
|
log_filename,
|
|
username,
|
|
filename,
|
|
)
|
|
|
|
# Schedule background processing using FastAPI's BackgroundTasks
|
|
# This handles both file saving AND database processing
|
|
# This ensures the response is sent immediately
|
|
background_tasks.add_task(
|
|
_process_csv_import_background,
|
|
csv_content,
|
|
filename,
|
|
hotel_code,
|
|
session_maker,
|
|
request.app.state.config,
|
|
log_filename,
|
|
)
|
|
|
|
response_headers = {
|
|
"Content-Type": "application/json; charset=utf-8",
|
|
}
|
|
|
|
# Return immediate acknowledgment
|
|
return Response(
|
|
content=json.dumps(
|
|
{
|
|
"status": "accepted",
|
|
"message": "CSV file received and queued for processing",
|
|
"filename": filename,
|
|
"timestamp": datetime.now().isoformat(),
|
|
}
|
|
),
|
|
headers=response_headers,
|
|
status_code=202,
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception:
|
|
_LOGGER.exception("Error in import_csv_endpoint")
|
|
raise HTTPException(status_code=500, detail="Error processing CSV upload")
|
|
|
|
|
|
async def _process_conversion_xml_background(
|
|
xml_content: str,
|
|
filename: str,
|
|
session_maker: SessionMaker,
|
|
log_filename: Path,
|
|
hotel: Hotel,
|
|
):
|
|
"""Background task to process conversion XML.
|
|
|
|
This runs in a separate asyncio task after the HTTP response is sent.
|
|
Handles both file prettification and database processing.
|
|
"""
|
|
try:
|
|
# First, prettify and save the XML file (in background)
|
|
try:
|
|
dom = xml.dom.minidom.parseString(xml_content)
|
|
pretty_xml = dom.toprettyxml(indent=" ")
|
|
# Remove extra blank lines that toprettyxml adds
|
|
pretty_xml = "\n".join(
|
|
[line for line in pretty_xml.split("\n") if line.strip()]
|
|
)
|
|
await asyncio.to_thread(
|
|
log_filename.write_text, pretty_xml, encoding="utf-8"
|
|
)
|
|
_LOGGER.debug("XML file prettified and saved to %s", log_filename)
|
|
except Exception as e:
|
|
# If formatting fails, save the original content
|
|
_LOGGER.warning("Failed to format XML: %s. Saving unformatted.", str(e))
|
|
await asyncio.to_thread(
|
|
log_filename.write_text, xml_content, encoding="utf-8"
|
|
)
|
|
|
|
# Now process the conversion XML
|
|
_LOGGER.info("Starting database processing of %s", filename)
|
|
conversion_service = ConversionService(session_maker, hotel.hotel_id)
|
|
processing_stats = await conversion_service.process_conversion_xml(xml_content, run_full_guest_matching=True)
|
|
|
|
await conversion_service.classify_regular_guests(24)
|
|
|
|
_LOGGER.info(
|
|
"Conversion processing complete for %s: %s", filename, processing_stats
|
|
)
|
|
except Exception:
|
|
_LOGGER.exception(
|
|
"Error processing conversion XML in background for %s", filename
|
|
)
|
|
|
|
|
|
@api_router.put("/hoteldata/conversions_import/{filename:path}")
|
|
@limiter.limit(DEFAULT_RATE_LIMIT)
|
|
async def handle_xml_upload(
|
|
request: Request,
|
|
background_tasks: BackgroundTasks,
|
|
filename: str,
|
|
credentials_tupel: tuple = Depends(validate_basic_auth),
|
|
db_session=Depends(get_async_session),
|
|
session_maker: SessionMaker = Depends(get_session_maker),
|
|
):
|
|
"""Endpoint for receiving XML files for conversion processing via PUT.
|
|
|
|
Processes conversion data from hotel PMS:
|
|
- Parses reservation and daily sales XML data
|
|
- Matches to existing reservations using truncated tracking IDs (fbclid/gclid)
|
|
- Links conversions to customers and hashed_customers
|
|
- Stores daily sales revenue data
|
|
|
|
Returns immediately with 202 Accepted while processing continues in background.
|
|
|
|
Requires basic authentication and saves XML files to log directory.
|
|
Supports gzip compression via Content-Encoding header.
|
|
|
|
Example: PUT /api/hoteldata/conversions_import/Reservierungen.xml
|
|
"""
|
|
try:
|
|
# Validate filename to prevent path traversal
|
|
if ".." in filename or filename.startswith("/"):
|
|
raise HTTPException(status_code=400, detail="ERROR: Invalid filename")
|
|
|
|
# Get the raw body content
|
|
body = await request.body()
|
|
|
|
if not body:
|
|
raise HTTPException(
|
|
status_code=400, detail="ERROR: No XML content provided"
|
|
)
|
|
|
|
# Check if content is gzip compressed
|
|
content_encoding = request.headers.get("content-encoding", "").lower()
|
|
is_gzipped = content_encoding == "gzip"
|
|
|
|
# Decompress if gzipped
|
|
if is_gzipped:
|
|
try:
|
|
body = gzip.decompress(body)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"ERROR: Failed to decompress gzip content: {e}",
|
|
) from e
|
|
|
|
# Try to decode as UTF-8
|
|
try:
|
|
xml_content = body.decode("utf-8")
|
|
except UnicodeDecodeError:
|
|
# If UTF-8 fails, try with latin-1 as fallback
|
|
xml_content = body.decode("latin-1")
|
|
|
|
# Basic validation that it's XML-like
|
|
if not xml_content.strip().startswith("<"):
|
|
raise HTTPException(
|
|
status_code=400, detail="ERROR: Content does not appear to be XML"
|
|
)
|
|
|
|
# Create logs directory for XML conversions (blocking, but fast)
|
|
logs_dir = Path("logs/conversions_import")
|
|
logs_dir.mkdir(parents=True, mode=0o755, exist_ok=True)
|
|
|
|
# Generate filename with timestamp and authenticated user
|
|
username, _ = credentials_tupel
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
base_filename = Path(filename).stem
|
|
extension = Path(filename).suffix or ".xml"
|
|
log_filename = logs_dir / f"{base_filename}_{username}_{timestamp}{extension}"
|
|
|
|
hotel_service = HotelService(db_session)
|
|
|
|
hotel = await hotel_service.get_hotel_by_username(username)
|
|
|
|
_LOGGER.info(
|
|
"XML file queued for processing: %s by user %s (original: %s)",
|
|
log_filename,
|
|
username,
|
|
filename,
|
|
)
|
|
|
|
# Schedule background processing using FastAPI's BackgroundTasks
|
|
# This handles both file prettification/saving AND database processing
|
|
# This ensures the response is sent immediately
|
|
background_tasks.add_task(
|
|
_process_conversion_xml_background,
|
|
xml_content,
|
|
filename,
|
|
session_maker,
|
|
log_filename,
|
|
hotel,
|
|
)
|
|
|
|
response_headers = {
|
|
"Content-Type": "application/xml; charset=utf-8",
|
|
"X-AlpineBits-Server-Accept-Encoding": "gzip",
|
|
}
|
|
|
|
# Return immediate acknowledgment
|
|
response_content = f"""<?xml version="1.0" encoding="UTF-8"?>
|
|
<response>
|
|
<status>accepted</status>
|
|
<message>XML file received and queued for processing</message>
|
|
<filename>{filename}</filename>
|
|
<timestamp>{datetime.now().isoformat()}</timestamp>
|
|
</response>"""
|
|
|
|
return Response(
|
|
content=response_content, headers=response_headers, status_code=202
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception:
|
|
_LOGGER.exception("Error in handle_xml_upload")
|
|
raise HTTPException(status_code=500, detail="Error processing XML upload")
|
|
|
|
|
|
# TODO Bit sketchy. May need requests-toolkit in the future
|
|
def parse_multipart_data(content_type: str, body: bytes) -> dict[str, Any]:
|
|
"""Parse multipart/form-data from raw request body.
|
|
|
|
This is a simplified parser for the AlpineBits use case.
|
|
"""
|
|
if "multipart/form-data" not in content_type:
|
|
raise HTTPException(
|
|
status_code=400, detail="ERROR: Content-Type must be multipart/form-data"
|
|
)
|
|
|
|
# Extract boundary
|
|
boundary = None
|
|
for part in content_type.split(";"):
|
|
part = part.strip()
|
|
if part.startswith("boundary="):
|
|
boundary = part.split("=", 1)[1].strip('"')
|
|
break
|
|
|
|
if not boundary:
|
|
raise HTTPException(
|
|
status_code=400, detail="ERROR: Missing boundary in multipart/form-data"
|
|
)
|
|
|
|
# Simple multipart parsing
|
|
parts = body.split(f"--{boundary}".encode())
|
|
data = {}
|
|
|
|
for part in parts:
|
|
if not part.strip() or part.strip() == b"--":
|
|
continue
|
|
|
|
# Split headers and content
|
|
if b"\r\n\r\n" in part:
|
|
headers_section, content = part.split(b"\r\n\r\n", 1)
|
|
content = content.rstrip(b"\r\n")
|
|
|
|
# Parse Content-Disposition header
|
|
headers = headers_section.decode("utf-8", errors="ignore")
|
|
name = None
|
|
for line in headers.split("\n"):
|
|
if "Content-Disposition" in line and "name=" in line:
|
|
# Extract name parameter
|
|
for param in line.split(";"):
|
|
param = param.strip()
|
|
if param.startswith("name="):
|
|
name = param.split("=", 1)[1].strip('"')
|
|
break
|
|
|
|
if name:
|
|
# Handle file uploads or text content
|
|
if content.startswith(b"<"):
|
|
# Likely XML content
|
|
data[name] = content.decode("utf-8", errors="ignore")
|
|
else:
|
|
data[name] = content.decode("utf-8", errors="ignore")
|
|
|
|
return data
|
|
|
|
|
|
@api_router.post("/alpinebits/server-2024-10")
|
|
@limiter.limit("60/minute")
|
|
async def alpinebits_server_handshake(
|
|
request: Request,
|
|
credentials_tupel: tuple = Depends(validate_basic_auth),
|
|
dbsession=Depends(get_async_session),
|
|
):
|
|
"""AlpineBits server endpoint implementing the handshake protocol.
|
|
|
|
This endpoint handles:
|
|
- Protocol version negotiation via X-AlpineBits-ClientProtocolVersion header
|
|
- Client identification via X-AlpineBits-ClientID header (optional)
|
|
- Multipart/form-data parsing for action and request parameters
|
|
- Gzip compression support
|
|
- Proper error handling with HTTP status codes
|
|
- Handshaking action processing
|
|
|
|
Authentication: HTTP Basic Auth required
|
|
Content-Type: multipart/form-data
|
|
Compression: gzip supported (check X-AlpineBits-Server-Accept-Encoding)
|
|
"""
|
|
try:
|
|
# Check required headers
|
|
client_protocol_version = request.headers.get(
|
|
"X-AlpineBits-ClientProtocolVersion"
|
|
)
|
|
if not client_protocol_version:
|
|
# Server concludes client speaks a protocol version preceding 2013-04
|
|
client_protocol_version = "pre-2013-04"
|
|
_LOGGER.info(
|
|
"No X-AlpineBits-ClientProtocolVersion header found, assuming pre-2013-04"
|
|
)
|
|
else:
|
|
_LOGGER.info("Client protocol version: %s", client_protocol_version)
|
|
|
|
# Optional client ID
|
|
client_id = request.headers.get("X-AlpineBits-ClientID")
|
|
if client_id:
|
|
_LOGGER.info("Client ID: %s", client_id)
|
|
|
|
# Check content encoding
|
|
content_encoding = request.headers.get("Content-Encoding")
|
|
is_compressed = content_encoding == "gzip"
|
|
|
|
if is_compressed:
|
|
_LOGGER.info("Request is gzip compressed")
|
|
|
|
# Get content type before processing
|
|
content_type = request.headers.get("Content-Type", "")
|
|
|
|
_LOGGER.info("Content-Type: %s", content_type)
|
|
_LOGGER.info("Content-Encoding: %s", content_encoding)
|
|
|
|
# Get request body
|
|
body = await request.body()
|
|
|
|
# Decompress if needed
|
|
form_data = validate_alpinebits_body(is_compressed, content_type, body)
|
|
|
|
# Check for required action parameter
|
|
action = form_data.get("action")
|
|
if not action:
|
|
raise HTTPException(
|
|
status_code=400, detail="ERROR: Missing required 'action' parameter"
|
|
)
|
|
|
|
_LOGGER.info("AlpineBits action: %s", action)
|
|
|
|
# Get optional request XML
|
|
request_xml = form_data.get("request")
|
|
|
|
server: AlpineBitsServer = app.state.alpine_bits_server
|
|
|
|
version = Version.V2024_10
|
|
|
|
username, password = credentials_tupel
|
|
|
|
client_info = AlpineBitsClientInfo(
|
|
username=username, password=password, client_id=client_id
|
|
)
|
|
|
|
# Create successful handshake response
|
|
response = await server.handle_request(
|
|
action,
|
|
request_xml,
|
|
client_info=client_info,
|
|
version=version,
|
|
dbsession=dbsession,
|
|
)
|
|
|
|
response_xml = response.xml_content
|
|
|
|
# Set response headers indicating server capabilities
|
|
headers = {
|
|
"Content-Type": "application/xml; charset=utf-8",
|
|
"X-AlpineBits-Server-Accept-Encoding": "gzip", # Indicate gzip support
|
|
"X-AlpineBits-Server-Version": "2024-10",
|
|
}
|
|
|
|
if is_compressed:
|
|
# Compress response if client sent compressed request
|
|
response_xml = gzip.compress(response_xml.encode("utf-8"))
|
|
headers["Content-Encoding"] = "gzip"
|
|
|
|
return Response(
|
|
content=response_xml, status_code=response.status_code, headers=headers
|
|
)
|
|
|
|
except HTTPException:
|
|
# Re-raise HTTP exceptions (auth errors, etc.)
|
|
raise
|
|
except Exception as e:
|
|
_LOGGER.exception("Error in AlpineBits handshake: %s", e)
|
|
raise HTTPException(status_code=500, detail="Internal server error")
|
|
|
|
|
|
def validate_alpinebits_body(is_compressed, content_type, body):
|
|
"""Check if the body conforms to AlpineBits expectations."""
|
|
if is_compressed:
|
|
try:
|
|
body = gzip.decompress(body)
|
|
|
|
except Exception:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="ERROR: Failed to decompress gzip content",
|
|
)
|
|
|
|
# Check content type (after decompression)
|
|
if (
|
|
"multipart/form-data" not in content_type
|
|
and "application/x-www-form-urlencoded" not in content_type
|
|
):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="ERROR: Content-Type must be multipart/form-data or application/x-www-form-urlencoded",
|
|
)
|
|
|
|
# Parse multipart data
|
|
if "multipart/form-data" in content_type:
|
|
try:
|
|
form_data = parse_multipart_data(content_type, body)
|
|
except Exception:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="ERROR: Failed to parse multipart/form-data",
|
|
)
|
|
elif "application/x-www-form-urlencoded" in content_type:
|
|
# Parse as urlencoded
|
|
form_data = dict(urllib.parse.parse_qsl(body.decode("utf-8")))
|
|
else:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="ERROR: Content-Type must be multipart/form-data or application/x-www-form-urlencoded",
|
|
)
|
|
|
|
return form_data
|
|
|
|
|
|
@api_router.get("/admin/stats")
|
|
@limiter.limit("10/minute")
|
|
async def get_api_stats(request: Request, admin_key: str = Depends(validate_api_key)):
|
|
"""Admin endpoint to get API usage statistics.
|
|
Requires admin API key.
|
|
"""
|
|
if admin_key != "admin-key":
|
|
raise HTTPException(status_code=403, detail="Admin access required")
|
|
|
|
# In a real application, you'd fetch this from your database/monitoring system
|
|
return {
|
|
"status": "success",
|
|
"stats": {
|
|
"uptime": "Available in production deployment",
|
|
"total_requests": "Available with monitoring setup",
|
|
"active_api_keys": len([k for k in ["wix-webhook-key", "admin-key"] if k]),
|
|
"rate_limit_backend": "redis" if os.getenv("REDIS_URL") else "memory",
|
|
},
|
|
"timestamp": datetime.now().isoformat(),
|
|
}
|
|
|
|
|
|
# Include the API router in the main app
|
|
app.include_router(api_router)
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
async def landing_page():
|
|
"""Serve the under construction landing page at the root route."""
|
|
try:
|
|
# Get the path to the HTML file
|
|
html_path = os.path.join(os.path.dirname(__file__), "templates", "index.html")
|
|
|
|
with open(html_path, encoding="utf-8") as f:
|
|
html_content = f.read()
|
|
|
|
return HTMLResponse(content=html_content, status_code=200)
|
|
except FileNotFoundError:
|
|
# Fallback if HTML file is not found
|
|
html_content = """
|
|
<!DOCTYPE html>
|
|
<html>
|
|
<head>
|
|
<title>99tales - Under Construction</title>
|
|
<style>
|
|
body { font-family: Arial, sans-serif; text-align: center; padding: 50px; }
|
|
h1 { color: #333; }
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<h1>🏗️ 99tales</h1>
|
|
<h2>Under Construction</h2>
|
|
<p>We're working hard to bring you something amazing!</p>
|
|
<p><a href="/api">API Documentation</a></p>
|
|
</body>
|
|
</html>
|
|
"""
|
|
return HTMLResponse(content=html_content, status_code=200)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|