"""API endpoints for the form-data and the alpinebits server.""" import asyncio import gzip import json import multiprocessing import os import traceback import urllib.parse from collections import defaultdict from datetime import date, datetime from functools import partial from pathlib import Path from typing import Any import httpx from fast_langdetect import detect from fastapi import APIRouter, Depends, FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse, Response from fastapi.security import ( HTTPAuthorizationCredentials, HTTPBasic, HTTPBasicCredentials, HTTPBearer, ) from pydantic import BaseModel from slowapi.errors import RateLimitExceeded from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import async_sessionmaker from alpine_bits_python.schemas import ReservationData from .alpinebits_server import ( AlpineBitsActionName, AlpineBitsClientInfo, AlpineBitsServer, Version, ) from .auth import generate_unique_id, validate_api_key from .config_loader import load_config from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT, HttpStatusCode from .conversion_service import ConversionService from .customer_service import CustomerService from .db import Base, create_database_engine from .db import Customer as DBCustomer from .db import Reservation as DBReservation from .email_monitoring import ReservationStatsCollector from .email_service import create_email_service from .logging_config import get_logger, setup_logging from .migrations import run_all_migrations from .pushover_service import create_pushover_service from .rate_limit import ( BURST_RATE_LIMIT, DEFAULT_RATE_LIMIT, WEBHOOK_RATE_LIMIT, custom_rate_limit_handler, limiter, webhook_limiter, ) from .reservation_service import ReservationService from .worker_coordination import is_primary_worker # Configure logging - will be reconfigured during lifespan with actual config _LOGGER = get_logger(__name__) # HTTP Basic auth for AlpineBits security_basic = HTTPBasic() # HTTP Bearer auth for API endpoints security_bearer = HTTPBearer() # Constants for token sanitization TOKEN_LOG_LENGTH = 10 def get_advertising_account_ids( config: dict[str, Any], hotel_code: str, fbclid: str | None, gclid: str | None ) -> tuple[str | None, str | None]: """Get advertising account IDs based on hotel config and click IDs. Args: config: Application configuration dict hotel_code: Hotel identifier to look up in config fbclid: Facebook click ID (if present, meta_account_id will be returned) gclid: Google click ID (if present, google_account_id will be returned) Returns: Tuple of (meta_account_id, google_account_id) based on conditional logic: - meta_account_id is set only if fbclid is present AND hotel has meta_account configured - google_account_id is set only if gclid is present AND hotel has google_account configured """ meta_account_id = None google_account_id = None # Look up hotel in config alpine_bits_auth = config.get("alpine_bits_auth", []) for hotel in alpine_bits_auth: if hotel.get(CONF_HOTEL_ID) == hotel_code: # Conditionally set meta_account_id if fbclid is present if fbclid: meta_account_id = hotel.get(CONF_META_ACCOUNT) # Conditionally set google_account_id if gclid is present if gclid: google_account_id = hotel.get(CONF_GOOGLE_ACCOUNT) break return meta_account_id, google_account_id # Pydantic models for language detection class LanguageDetectionRequest(BaseModel): text: str class LanguageDetectionResponse(BaseModel): language_code: str score: float # --- Enhanced event dispatcher with hotel-specific routing --- class EventDispatcher: """Simple event dispatcher for AlpineBits push requests.""" def __init__(self): self.listeners = defaultdict(list) self.hotel_listeners = defaultdict(list) # hotel_code -> list of listeners def register(self, event_name, func): self.listeners[event_name].append(func) def register_hotel_listener(self, event_name, hotel_code, func): """Register a listener for a specific hotel.""" self.hotel_listeners[f"{event_name}:{hotel_code}"].append(func) async def dispatch(self, event_name, *args, **kwargs): for func in self.listeners[event_name]: await func(*args, **kwargs) async def dispatch_for_hotel(self, event_name, hotel_code, *args, **kwargs): """Dispatch event only to listeners registered for specific hotel.""" key = f"{event_name}:{hotel_code}" for func in self.hotel_listeners[key]: await func(*args, **kwargs) event_dispatcher = EventDispatcher() # Load config at startup async def push_listener(customer: DBCustomer, reservation: DBReservation, hotel): """Push listener that sends reservation data to hotel's push endpoint. Only called for reservations that match this hotel's hotel_id. """ push_endpoint = hotel.get("push_endpoint") if not push_endpoint: _LOGGER.warning( "No push endpoint configured for hotel %s", hotel.get("hotel_id") ) return server: AlpineBitsServer = app.state.alpine_bits_server hotel_id = hotel["hotel_id"] reservation_hotel_id = reservation.hotel_code # Double-check hotel matching (should be guaranteed by dispatcher) if hotel_id != reservation_hotel_id: _LOGGER.warning( "Hotel ID mismatch: listener for %s, reservation for %s", hotel_id, reservation_hotel_id, ) return _LOGGER.info( "Processing push notification for hotel %s, reservation %s", hotel_id, reservation.unique_id, ) # Prepare payload for push notification request = await server.handle_request( request_action_name=AlpineBitsActionName.OTA_HOTEL_RES_NOTIF_GUEST_REQUESTS.request_name, request_xml=(reservation, customer), client_info=None, version=Version.V2024_10, ) if request.status_code != HttpStatusCode.OK: _LOGGER.error( "Failed to generate push request for hotel %s, reservation %s: %s", hotel_id, reservation.unique_id, request.xml_content, ) return # save push request to file logs_dir = Path("logs/push_requests") if not logs_dir.exists(): logs_dir.mkdir(parents=True, mode=0o755, exist_ok=True) stat_info = os.stat(logs_dir) _LOGGER.info( "Created directory owner: uid:%s, gid:%s", stat_info.st_uid, stat_info.st_gid, ) _LOGGER.info("Directory mode: %s", oct(stat_info.st_mode)[-3:]) log_filename = f"{logs_dir}/alpinebits_push_{hotel_id}_{reservation.unique_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xml" with open(log_filename, "w", encoding="utf-8") as f: f.write(request.xml_content) return headers = ( {"Authorization": f"Bearer {push_endpoint.get('token', '')}"} if push_endpoint.get("token") else {} ) "" try: async with httpx.AsyncClient() as client: resp = await client.post( push_endpoint["url"], json=payload, headers=headers, timeout=10 ) _LOGGER.info( "Push event fired to %s for hotel %s, status: %s", push_endpoint["url"], hotel["hotel_id"], resp.status_code, ) if resp.status_code not in [200, 201, 202]: _LOGGER.warning( "Push endpoint returned non-success status %s: %s", resp.status_code, resp.text, ) except Exception as e: _LOGGER.exception("Push event failed for hotel %s: %s", hotel["hotel_id"], e) # Optionally implement retry logic here@asynccontextmanager async def lifespan(app: FastAPI): # Setup DB # Determine if this is the primary worker using file-based locking # Only primary runs schedulers/background tasks # In multi-worker setups, only one worker should run singleton services is_primary, worker_lock = is_primary_worker() _LOGGER.info( "Worker startup: process=%s, pid=%d, primary=%s", multiprocessing.current_process().name, os.getpid(), is_primary, ) try: config = load_config() except Exception: _LOGGER.exception("Failed to load config: ") config = {} # Get event loop for email monitoring loop = asyncio.get_running_loop() # Initialize email service (before logging setup so it can be used by handlers) email_service = create_email_service(config) # Initialize pushover service pushover_service = create_pushover_service(config) # Setup logging from config with unified notification monitoring # Only primary worker should have the report scheduler running alert_handler, report_scheduler = setup_logging( config, email_service, pushover_service, loop, enable_scheduler=is_primary ) _LOGGER.info("Application startup initiated (primary_worker=%s)", is_primary) # Create database engine with schema support engine = create_database_engine(config=config, echo=False) AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False) app.state.engine = engine app.state.async_sessionmaker = AsyncSessionLocal app.state.config = config app.state.alpine_bits_server = AlpineBitsServer(config) app.state.event_dispatcher = event_dispatcher app.state.email_service = email_service app.state.pushover_service = pushover_service app.state.alert_handler = alert_handler app.state.report_scheduler = report_scheduler # Register push listeners for hotels with push_endpoint for hotel in config.get("alpine_bits_auth", []): push_endpoint = hotel.get("push_endpoint") hotel_id = hotel.get("hotel_id") if push_endpoint and hotel_id: # Register hotel-specific listener event_dispatcher.register_hotel_listener( "form_processed", hotel_id, partial(push_listener, hotel=hotel) ) _LOGGER.info( "Registered push listener for hotel %s with endpoint %s", hotel_id, push_endpoint.get("url"), ) elif push_endpoint and not hotel_id: _LOGGER.warning("Hotel has push_endpoint but no hotel_id: %s", hotel) elif hotel_id and not push_endpoint: _LOGGER.info("Hotel %s has no push_endpoint configured", hotel_id) # Create tables first (all workers) # This ensures tables exist before migrations try to alter them async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) _LOGGER.info("Database tables checked/created at startup.") # Run migrations after tables exist (only primary worker for race conditions) if is_primary: await run_all_migrations(engine, config) else: _LOGGER.info("Skipping migrations (non-primary worker)") # Hash any existing customers (only in primary worker to avoid race conditions) if is_primary: async with AsyncSessionLocal() as session: customer_service = CustomerService(session) hashed_count = await customer_service.hash_existing_customers() if hashed_count > 0: _LOGGER.info( "Backfilled hashed data for %d existing customers", hashed_count ) else: _LOGGER.info("All existing customers already have hashed data") else: _LOGGER.info("Skipping customer hashing (non-primary worker)") # Initialize and hook up stats collector for daily reports # Note: report_scheduler will only exist on the primary worker if report_scheduler: stats_collector = ReservationStatsCollector( async_sessionmaker=AsyncSessionLocal, config=config, ) # Hook up the stats collector to the report scheduler report_scheduler.set_stats_collector(stats_collector.collect_stats) _LOGGER.info("Stats collector initialized and hooked up to report scheduler") # Start daily report scheduler report_scheduler.start() _LOGGER.info("Daily report scheduler started") _LOGGER.info("Application startup complete") yield # Cleanup on shutdown _LOGGER.info("Application shutdown initiated") # Stop daily report scheduler if report_scheduler: report_scheduler.stop() _LOGGER.info("Daily report scheduler stopped") # Close alert handler (flush any remaining errors) if alert_handler: alert_handler.close() _LOGGER.info("Alert handler closed") # Shutdown email service thread pool if email_service: email_service.shutdown() _LOGGER.info("Email service shut down") # Dispose engine await engine.dispose() _LOGGER.info("Application shutdown complete") # Release worker lock if this was the primary worker if worker_lock: worker_lock.release() async def get_async_session(request: Request): async_sessionmaker = request.app.state.async_sessionmaker async with async_sessionmaker() as session: yield session app = FastAPI( title="Wix Form Handler API", description="Secure API endpoint to receive and process Wix form submissions with authentication and rate limiting", version="1.0.0", lifespan=lifespan, ) # Create API router with /api prefix api_router = APIRouter(prefix="/api", tags=["api"]) # Add rate limiting app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, custom_rate_limit_handler) # Add CORS middleware to allow requests from Wix app.add_middleware( CORSMiddleware, allow_origins=[ "https://*.wix.com", "https://*.wixstatic.com", "http://localhost:3000", # For development "http://localhost:8000", # For local testing ], allow_credentials=True, allow_methods=["GET", "POST"], allow_headers=["*"], ) @api_router.get("/") @limiter.limit(DEFAULT_RATE_LIMIT) async def root(request: Request): """Health check endpoint.""" return { "message": "Wix Form Handler API is running", "timestamp": datetime.now().isoformat(), "status": "healthy", "authentication": "required", "rate_limits": { "default": DEFAULT_RATE_LIMIT, "webhook": WEBHOOK_RATE_LIMIT, "burst": BURST_RATE_LIMIT, }, } @api_router.get("/health") @limiter.limit(DEFAULT_RATE_LIMIT) async def health_check(request: Request): """Detailed health check.""" return { "status": "healthy", "timestamp": datetime.now().isoformat(), "service": "wix-form-handler", "version": "1.0.0", "authentication": "enabled", "rate_limiting": "enabled", } @api_router.post("/detect-language", response_model=LanguageDetectionResponse) @limiter.limit(DEFAULT_RATE_LIMIT) async def detect_language( request: Request, data: LanguageDetectionRequest, credentials: HTTPAuthorizationCredentials = Depends(security_bearer), ): """Detect language of text, restricted to Italian or German. Requires Bearer token authentication. Returns the most likely language (it or de) with confidence score. """ # Validate bearer token token = credentials.credentials config = request.app.state.config # Check if token is valid valid_tokens = config.get("api_tokens", []) # If no tokens configured, reject authentication if not valid_tokens: _LOGGER.error("No api_tokens configured in config.yaml") raise HTTPException( status_code=401, detail="Authentication token not configured on server", ) if token not in valid_tokens: # Log sanitized token (first TOKEN_LOG_LENGTH chars) for security sanitized_token = ( token[:TOKEN_LOG_LENGTH] + "..." if len(token) > TOKEN_LOG_LENGTH else token ) _LOGGER.warning("Invalid token attempt: %s", sanitized_token) raise HTTPException( status_code=401, detail="Invalid authentication token", ) try: # Detect language with k=2 to get top 2 candidates results = detect(data.text, k=2) _LOGGER.info("Language detection results: %s", results) # Filter for Italian (it) or German (de) italian_german_results = [r for r in results if r.get("lang") in ["it", "de"]] if italian_german_results: # Return the best match between Italian and German best_match = italian_german_results[0] return_value = "Italienisch" if best_match["lang"] == "it" else "Deutsch" return LanguageDetectionResponse( language_code=return_value, score=best_match.get("score", 0.0) ) # If neither Italian nor German detected in top 2, check all results all_results = detect(data.text, k=10) italian_german_all = [r for r in all_results if r.get("lang") in ["it", "de"]] if italian_german_all: best_match = italian_german_all[0] return_value = "Italienisch" if best_match["lang"] == "it" else "Deutsch" return LanguageDetectionResponse( language_code=return_value, score=best_match.get("score", 0.0) ) # Default to German if no clear detection _LOGGER.warning( "Could not detect Italian or German in text: %s, defaulting to 'de'", data.text[:100], ) return LanguageDetectionResponse(language_code="Deutsch", score=0.0) except Exception as e: _LOGGER.exception("Error detecting language: %s", e) raise HTTPException(status_code=500, detail=f"Error detecting language: {e!s}") # Extracted business logic for handling Wix form submissions async def process_wix_form_submission(request: Request, data: dict[str, Any], db): """Shared business logic for handling Wix form submissions (test and production).""" timestamp = datetime.now().isoformat() _LOGGER.info("Received Wix form data at %s", timestamp) data = data.get("data") # Handle nested "data" key if present # save customer and reservation to DB contact_info = data.get("contact", {}) first_name = contact_info.get("name", {}).get("first") last_name = contact_info.get("name", {}).get("last") email = contact_info.get("email") phone_number = contact_info.get("phones", [{}])[0].get("e164Phone") contact_info.get("locale", "de-de") contact_id = contact_info.get("contactId") name_prefix = data.get("field:anrede") email_newsletter = data.get("field:form_field_5a7b", False) address_line = None city_name = None postal_code = None country_code = None gender = None birth_date = None language = data.get("contact", {}).get("locale", "en")[:2] # Dates start_date = ( data.get("field:date_picker_a7c8") or data.get("Anreisedatum") or data.get("submissions", [{}])[1].get("value") ) end_date = ( data.get("field:date_picker_7e65") or data.get("Abreisedatum") or data.get("submissions", [{}])[2].get("value") ) # Room/guest info num_adults = int(data.get("field:number_7cf5") or 2) num_children = int(data.get("field:anzahl_kinder") or 0) children_ages = [] if num_children > 0: for k in data: if k.startswith("field:alter_kind_"): try: age = int(data[k]) children_ages.append(age) except ValueError: _LOGGER.warning("Invalid age value for %s: %s", k, data[k]) offer = data.get("field:angebot_auswaehlen") # get submissionId and ensure max length 35. Generate one if not present unique_id = data.get("submissionId", generate_unique_id()) # Use CustomerService to handle customer creation/update with hashing customer_service = CustomerService(db) customer_data = { "given_name": first_name, "surname": last_name, "contact_id": contact_id, "name_prefix": name_prefix, "email_address": email, "phone": phone_number, "email_newsletter": email_newsletter, "address_line": address_line, "city_name": city_name, "postal_code": postal_code, "country_code": country_code, "gender": gender, "birth_date": birth_date, "language": language, "address_catalog": False, "name_title": None, } # This automatically creates/updates both Customer and HashedCustomer db_customer = await customer_service.get_or_create_customer(customer_data) # Determine hotel_code and hotel_name # Priority: 1) Form field, 2) Configuration default, 3) Hardcoded fallback hotel_code = data.get("field:hotelid", None) if hotel_code is None: _LOGGER.warning("No hotel_code provided in form data, using default") hotel_code = request.app.state.config.get("default_hotel_code", "123") hotel_name = ( data.get("field:hotelname") or data.get("hotelname") or request.app.state.config.get("default_hotel_name") or "Frangart Inn" # fallback ) submissionTime = data.get("submissionTime") # 2025-10-07T05:48:41.855Z try: if submissionTime: submissionTime = datetime.fromisoformat( submissionTime[:-1] ) # Remove Z and convert except Exception as e: _LOGGER.exception("Error parsing submissionTime: %s", e) submissionTime = None # Extract fbclid and gclid for conditional account ID lookup fbclid = data.get("field:fbclid") gclid = data.get("field:gclid") # Get advertising account IDs conditionally based on fbclid/gclid presence meta_account_id, google_account_id = get_advertising_account_ids( request.app.state.config, hotel_code, fbclid, gclid ) reservation = ReservationData( unique_id=unique_id, start_date=date.fromisoformat(start_date), end_date=date.fromisoformat(end_date), num_adults=num_adults, num_children=num_children, children_ages=children_ages, hotel_code=hotel_code, hotel_name=hotel_name, offer=offer, created_at=submissionTime, utm_source=data.get("field:utm_source"), utm_medium=data.get("field:utm_medium"), utm_campaign=data.get("field:utm_campaign"), utm_term=data.get("field:utm_term"), utm_content=data.get("field:utm_content"), user_comment=data.get("field:long_answer_3524", ""), fbclid=fbclid, gclid=gclid, meta_account_id=meta_account_id, google_account_id=google_account_id, ) if reservation.md5_unique_id is None: raise HTTPException(status_code=400, detail="Failed to generate md5_unique_id") # Use ReservationService to create reservation reservation_service = ReservationService(db) db_reservation = await reservation_service.create_reservation( reservation, db_customer.id ) async def push_event(): # Fire event for listeners (push, etc.) - hotel-specific dispatch dispatcher = getattr(request.app.state, "event_dispatcher", None) if dispatcher: # Get hotel_code from reservation to target the right listeners hotel_code = getattr(db_reservation, "hotel_code", None) if hotel_code and hotel_code.strip(): await dispatcher.dispatch_for_hotel( "form_processed", hotel_code, db_customer, db_reservation ) _LOGGER.info("Dispatched form_processed event for hotel %s", hotel_code) else: _LOGGER.warning( "No hotel_code in reservation, skipping push notifications" ) # Create task and store reference to prevent it from being garbage collected # The task runs independently and we don't need to await it here task = asyncio.create_task(push_event()) # Add done callback to log any exceptions that occur in the background task task.add_done_callback(lambda t: t.exception() if not t.cancelled() else None) return { "status": "success", "message": "Wix form data received successfully", "received_keys": list(data.keys()), "timestamp": timestamp, "note": "No authentication required for this endpoint", } async def process_generic_webhook_submission( request: Request, data: dict[str, Any], db ): """Process generic webhook submissions with nested structure. Expected structure: { "hotel_data": {"hotelname": "...", "hotelcode": "..."}, "form_data": { "sprache": "de/it/en", "anreise": "DD.MM.YYYY", "abreise": "DD.MM.YYYY", "erwachsene": "N", "kinder": "N", "alter": {"1": "age1", "2": "age2", ...}, "anrede": "...", "name": "...", "nachname": "...", "mail": "...", "tel": "...", "nachricht": "..." }, "tracking_data": { "utm_source": "...", "utm_medium": "...", "utm_campaign": "...", "utm_content": "...", "utm_term": "...", "fbclid": "...", "gclid": "..." }, "timestamp": "ISO8601" } """ timestamp = datetime.now().isoformat() _LOGGER.info("Processing generic webhook submission at %s", timestamp) # Extract nested data hotel_data = data.get("hotel_data", {}) form_data = data.get("form_data", {}) tracking_data = data.get("tracking_data", {}) # Extract hotel information hotel_code = hotel_data.get("hotelcode") hotel_name = hotel_data.get("hotelname") if not hotel_code: _LOGGER.warning("No hotel_code provided in webhook data, using default") hotel_code = request.app.state.config.get("default_hotel_code", "123") if not hotel_name: hotel_name = ( request.app.state.config.get("default_hotel_name") or "Frangart Inn" ) # Extract customer information first_name = form_data.get("name") last_name = form_data.get("nachname") email = form_data.get("mail") phone_number = form_data.get("tel") name_prefix = form_data.get("anrede") language = form_data.get("sprache", "de")[:2] user_comment = form_data.get("nachricht", "") plz = form_data.get("plz", "") city = form_data.get("stadt", "") country = form_data.get("land", "") # Parse dates - handle DD.MM.YYYY format start_date_str = form_data.get("anreise") end_date_str = form_data.get("abreise") if not start_date_str or not end_date_str: raise HTTPException( status_code=400, detail="Missing required dates (anreise/abreise)" ) try: # Parse DD.MM.YYYY format using strptime start_date = datetime.strptime(start_date_str, "%d.%m.%Y").date() end_date = datetime.strptime(end_date_str, "%d.%m.%Y").date() except ValueError as e: _LOGGER.error( "Error parsing dates: start=%s, end=%s, error=%s", start_date_str, end_date_str, e, ) raise HTTPException(status_code=400, detail=f"Invalid date format: {e}") from e # Extract room/guest info num_adults = int(form_data.get("erwachsene", 2)) num_children = int(form_data.get("kinder", 0)) # Extract children ages from nested structure children_ages = [] if num_children > 0: alter_data = form_data.get("alter", {}) for i in range(1, num_children + 1): age_str = alter_data.get(str(i)) if age_str: try: children_ages.append(int(age_str)) except ValueError: _LOGGER.warning("Invalid age value for child %d: %s", i, age_str) # Extract tracking information utm_source = None utm_medium = None utm_campaign = None utm_term = None utm_content = None fbclid = None gclid = None if tracking_data: utm_source = tracking_data.get("utm_source") utm_medium = tracking_data.get("utm_medium") utm_campaign = tracking_data.get("utm_campaign") utm_term = tracking_data.get("utm_term") utm_content = tracking_data.get("utm_content") fbclid = tracking_data.get("fbclid") gclid = tracking_data.get("gclid") # Parse submission timestamp submission_time = data.get("timestamp") try: if submission_time: # Handle ISO8601 format with timezone if submission_time.endswith("Z"): submission_time = datetime.fromisoformat(submission_time[:-1]) elif "+" in submission_time: # Remove timezone info (e.g., +02:00) submission_time = datetime.fromisoformat(submission_time.split("+")[0]) else: submission_time = datetime.fromisoformat(submission_time) except Exception as e: _LOGGER.exception("Error parsing submission timestamp: %s", e) submission_time = None # Generate unique ID unique_id = generate_unique_id() # Use CustomerService to handle customer creation/update with hashing customer_service = CustomerService(db) customer_data = { "given_name": first_name, "surname": last_name, "contact_id": None, "name_prefix": name_prefix if name_prefix != "--" else None, "email_address": email, "phone": phone_number if phone_number else None, "email_newsletter": False, "address_line": None, "city_name": city if city else None, "postal_code": plz if plz else None, "country_code": country if country else None, "gender": None, "birth_date": None, "language": language, "address_catalog": False, "name_title": None, } # Create/update customer db_customer = await customer_service.get_or_create_customer(customer_data) # Get advertising account IDs conditionally based on fbclid/gclid presence meta_account_id, google_account_id = get_advertising_account_ids( request.app.state.config, hotel_code, fbclid, gclid ) # Create reservation reservation_kwargs = { "unique_id": unique_id, "start_date": start_date, "end_date": end_date, "num_adults": num_adults, "num_children": num_children, "children_ages": children_ages, "hotel_code": hotel_code, "hotel_name": hotel_name, "offer": None, "utm_source": utm_source, "utm_medium": utm_medium, "utm_campaign": utm_campaign, "utm_term": utm_term, "utm_content": utm_content, "user_comment": user_comment, "fbclid": fbclid, "gclid": gclid, "meta_account_id": meta_account_id, "google_account_id": google_account_id, } # Only include created_at if we have a valid submission_time if submission_time: reservation_kwargs["created_at"] = submission_time reservation = ReservationData(**reservation_kwargs) if reservation.md5_unique_id is None: raise HTTPException(status_code=400, detail="Failed to generate md5_unique_id") # Use ReservationService to create reservation reservation_service = ReservationService(db) db_reservation = await reservation_service.create_reservation( reservation, db_customer.id ) async def push_event(): # Fire event for listeners (push, etc.) - hotel-specific dispatch dispatcher = getattr(request.app.state, "event_dispatcher", None) if dispatcher: # Get hotel_code from reservation to target the right listeners hotel_code = getattr(db_reservation, "hotel_code", None) if hotel_code and hotel_code.strip(): await dispatcher.dispatch_for_hotel( "form_processed", hotel_code, db_customer, db_reservation ) _LOGGER.info("Dispatched form_processed event for hotel %s", hotel_code) else: _LOGGER.warning( "No hotel_code in reservation, skipping push notifications" ) # Create task and store reference to prevent garbage collection task = asyncio.create_task(push_event()) # Add done callback to log any exceptions task.add_done_callback(lambda t: t.exception() if not t.cancelled() else None) _LOGGER.info( "Successfully processed generic webhook: customer_id=%s, reservation_id=%s", db_customer.id, db_reservation.id, ) return { "status": "success", "message": "Generic webhook data processed successfully", "customer_id": db_customer.id, "reservation_id": db_reservation.id, "timestamp": timestamp, } async def validate_basic_auth( credentials: HTTPBasicCredentials = Depends(security_basic), ) -> str: """Validate basic authentication for AlpineBits protocol. Returns username if valid, raises HTTPException if not. """ # Accept any username/password pair present in config['alpine_bits_auth'] if not credentials.username or not credentials.password: raise HTTPException( status_code=401, detail="ERROR: Authentication required", headers={"WWW-Authenticate": "Basic"}, ) valid = False config = app.state.config for entry in config["alpine_bits_auth"]: if ( credentials.username == entry["username"] and credentials.password == entry["password"] ): valid = True break if not valid: raise HTTPException( status_code=401, detail="ERROR: Invalid credentials", headers={"WWW-Authenticate": "Basic"}, ) _LOGGER.info( "AlpineBits authentication successful for user: %s (from config)", credentials.username, ) return credentials.username, credentials.password @api_router.post("/webhook/wix-form") @webhook_limiter.limit(WEBHOOK_RATE_LIMIT) async def handle_wix_form( request: Request, data: dict[str, Any], db_session=Depends(get_async_session) ): """Unified endpoint to handle Wix form submissions (test and production). No authentication required for this endpoint. """ try: return await process_wix_form_submission(request, data, db_session) except IntegrityError as e: # Handle duplicate submissions gracefully - likely same form sent twice # or race condition between workers if "unique constraint" in str(e).lower() and "unique_id" in str(e).lower(): _LOGGER.warning( "Duplicate submission detected (unique_id already exists). " "Returning success to prevent retry. Error: %s", str(e), ) # Return success since the reservation already exists return {"status": "success", "message": "Reservation already processed"} # Re-raise if it's a different integrity error raise except Exception as e: _LOGGER.exception("Error in handle_wix_form") log_entry = { "timestamp": datetime.now().isoformat(), "client_ip": request.client.host if request.client else "unknown", "headers": dict(request.headers), "data": data, "error": str(e), } # Use asyncio to run file I/O in thread pool to avoid blocking logs_dir = Path("logs/errors") await asyncio.to_thread(logs_dir.mkdir, parents=True, mode=0o755, exist_ok=True) log_filename = ( logs_dir / f"wix_error_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" ) await asyncio.to_thread( log_filename.write_text, json.dumps(log_entry, indent=2, default=str, ensure_ascii=False), encoding="utf-8", ) _LOGGER.error("Error data logged to: %s", log_filename) raise HTTPException(status_code=500, detail="Error processing Wix form data") @api_router.post("/webhook/wix-form/test") @limiter.limit(DEFAULT_RATE_LIMIT) async def handle_wix_form_test( request: Request, data: dict[str, Any], db_session=Depends(get_async_session) ): """Test endpoint to verify the API is working with raw JSON data. No authentication required for testing purposes. """ try: return await process_wix_form_submission(request, data, db_session) except Exception as e: _LOGGER.exception("Error in handle_wix_form_test: %s", e) # Log error data to file asynchronously import traceback log_entry = { "timestamp": datetime.now().isoformat(), "client_ip": request.client.host if request.client else "unknown", "headers": dict(request.headers), "data": data, "error": str(e), "traceback": traceback.format_exc(), } # Use asyncio to run file I/O in thread pool to avoid blocking logs_dir = Path("logs/errors") await asyncio.to_thread(logs_dir.mkdir, parents=True, mode=0o755, exist_ok=True) log_filename = ( logs_dir / f"wix_test_error_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" ) await asyncio.to_thread( log_filename.write_text, json.dumps(log_entry, indent=2, default=str, ensure_ascii=False), encoding="utf-8", ) _LOGGER.error("Error data logged to: %s", log_filename) raise HTTPException(status_code=500, detail="Error processing test data") @api_router.post("/webhook/generic") @webhook_limiter.limit(WEBHOOK_RATE_LIMIT) async def handle_generic_webhook( request: Request, db_session=Depends(get_async_session) ): """Handle generic webhook endpoint for receiving JSON payloads. Supports gzip compression, extracts customer and reservation data, saves to database, and triggers push notifications. No authentication required for this endpoint. """ # Store data for error logging if needed data = None try: timestamp = datetime.now().isoformat() _LOGGER.info("Received generic webhook data at %s", timestamp) # Get the raw body content body = await request.body() if not body: raise HTTPException(status_code=400, detail="ERROR: No content provided") # Check if content is gzip compressed content_encoding = request.headers.get("content-encoding", "").lower() is_gzipped = content_encoding == "gzip" # Decompress if gzipped if is_gzipped: try: body = gzip.decompress(body) _LOGGER.info("Successfully decompressed gzip content") except Exception as e: raise HTTPException( status_code=400, detail=f"ERROR: Failed to decompress gzip content: {e}", ) from e # Parse JSON try: data = json.loads(body.decode("utf-8")) except json.JSONDecodeError as e: raise HTTPException( status_code=400, detail=f"ERROR: Invalid JSON content: {e}", ) from e if True: # log to file for now logs_dir = Path("logs/generic_webhooks") await asyncio.to_thread( logs_dir.mkdir, parents=True, mode=0o755, exist_ok=True ) log_filename = ( logs_dir / f"generic_webhook_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" ) await asyncio.to_thread( log_filename.write_text, json.dumps(data, indent=2, default=str, ensure_ascii=False), encoding="utf-8", ) # Process the webhook data and save to database await process_generic_webhook_submission(request, data, db_session) return { "status": "success", "message": "Generic webhook data received and processed successfully", "timestamp": timestamp, } except HTTPException: raise except Exception as e: _LOGGER.exception("Error in handle_generic_webhook") # Log error data to file asynchronously (only on error) error_log_entry = { "timestamp": datetime.now().isoformat(), "client_ip": request.client.host if request.client else "unknown", "headers": dict(request.headers), "data": data, # Include the parsed data if available "error": str(e), "traceback": traceback.format_exc(), } # Use asyncio to run file I/O in thread pool to avoid blocking error_logs_dir = Path("logs/errors") await asyncio.to_thread( error_logs_dir.mkdir, parents=True, mode=0o755, exist_ok=True ) error_log_filename = error_logs_dir / ( f"generic_webhook_error_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" ) await asyncio.to_thread( error_log_filename.write_text, json.dumps(error_log_entry, indent=2, default=str, ensure_ascii=False), encoding="utf-8", ) _LOGGER.error("Error data logged to: %s", error_log_filename) raise HTTPException( status_code=500, detail="Error processing generic webhook data" ) from e @api_router.put("/hoteldata/conversions_import/{filename:path}") @limiter.limit(DEFAULT_RATE_LIMIT) async def handle_xml_upload( request: Request, filename: str, credentials_tupel: tuple = Depends(validate_basic_auth), db_session=Depends(get_async_session), ): """Endpoint for receiving XML files for conversion processing via PUT. Processes conversion data from hotel PMS: - Parses reservation and daily sales XML data - Matches to existing reservations using truncated tracking IDs (fbclid/gclid) - Links conversions to customers and hashed_customers - Stores daily sales revenue data Requires basic authentication and saves XML files to log directory. Supports gzip compression via Content-Encoding header. Example: PUT /api/hoteldata/conversions_import/Reservierungen.xml """ try: # Validate filename to prevent path traversal if ".." in filename or filename.startswith("/"): raise HTTPException(status_code=400, detail="ERROR: Invalid filename") # Get the raw body content body = await request.body() if not body: raise HTTPException( status_code=400, detail="ERROR: No XML content provided" ) # Check if content is gzip compressed content_encoding = request.headers.get("content-encoding", "").lower() is_gzipped = content_encoding == "gzip" # Decompress if gzipped if is_gzipped: try: body = gzip.decompress(body) except Exception as e: raise HTTPException( status_code=400, detail=f"ERROR: Failed to decompress gzip content: {e}", ) from e # Try to decode as UTF-8 try: xml_content = body.decode("utf-8") except UnicodeDecodeError: # If UTF-8 fails, try with latin-1 as fallback xml_content = body.decode("latin-1") # Basic validation that it's XML-like if not xml_content.strip().startswith("<"): raise HTTPException( status_code=400, detail="ERROR: Content does not appear to be XML" ) # Create logs directory for XML conversions logs_dir = Path("logs/conversions_import") if not logs_dir.exists(): logs_dir.mkdir(parents=True, mode=0o755, exist_ok=True) _LOGGER.info("Created directory: %s", logs_dir) # Generate filename with timestamp and authenticated user username, _ = credentials_tupel timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # Use the filename from the path, but add timestamp and username for uniqueness base_filename = Path(filename).stem extension = Path(filename).suffix or ".xml" log_filename = logs_dir / f"{base_filename}_{username}_{timestamp}{extension}" # Save XML content to file log_filename.write_text(xml_content, encoding="utf-8") _LOGGER.info( "XML file saved to %s by user %s (original: %s)", log_filename, username, filename, ) # Process the conversion XML and save to database conversion_service = ConversionService(db_session) processing_stats = await conversion_service.process_conversion_xml(xml_content) _LOGGER.info( "Conversion processing complete for %s: %s", filename, processing_stats ) response_headers = { "Content-Type": "application/xml; charset=utf-8", "X-AlpineBits-Server-Accept-Encoding": "gzip", } # Return processing stats in response response_content = f""" success Conversion data processed successfully {processing_stats["total_reservations"]} {processing_stats["deleted_reservations"]} {processing_stats["total_daily_sales"]} {processing_stats["matched_to_reservation"]} {processing_stats["matched_to_customer"]} {processing_stats["matched_to_hashed_customer"]} {processing_stats["unmatched"]} {processing_stats["errors"]} """ return Response( content=response_content, headers=response_headers, status_code=200 ) except HTTPException: raise except Exception: _LOGGER.exception("Error in handle_xml_upload") raise HTTPException(status_code=500, detail="Error processing XML upload") # TODO Bit sketchy. May need requests-toolkit in the future def parse_multipart_data(content_type: str, body: bytes) -> dict[str, Any]: """Parse multipart/form-data from raw request body. This is a simplified parser for the AlpineBits use case. """ if "multipart/form-data" not in content_type: raise HTTPException( status_code=400, detail="ERROR: Content-Type must be multipart/form-data" ) # Extract boundary boundary = None for part in content_type.split(";"): part = part.strip() if part.startswith("boundary="): boundary = part.split("=", 1)[1].strip('"') break if not boundary: raise HTTPException( status_code=400, detail="ERROR: Missing boundary in multipart/form-data" ) # Simple multipart parsing parts = body.split(f"--{boundary}".encode()) data = {} for part in parts: if not part.strip() or part.strip() == b"--": continue # Split headers and content if b"\r\n\r\n" in part: headers_section, content = part.split(b"\r\n\r\n", 1) content = content.rstrip(b"\r\n") # Parse Content-Disposition header headers = headers_section.decode("utf-8", errors="ignore") name = None for line in headers.split("\n"): if "Content-Disposition" in line and "name=" in line: # Extract name parameter for param in line.split(";"): param = param.strip() if param.startswith("name="): name = param.split("=", 1)[1].strip('"') break if name: # Handle file uploads or text content if content.startswith(b"<"): # Likely XML content data[name] = content.decode("utf-8", errors="ignore") else: data[name] = content.decode("utf-8", errors="ignore") return data @api_router.post("/alpinebits/server-2024-10") @limiter.limit("60/minute") async def alpinebits_server_handshake( request: Request, credentials_tupel: tuple = Depends(validate_basic_auth), dbsession=Depends(get_async_session), ): """AlpineBits server endpoint implementing the handshake protocol. This endpoint handles: - Protocol version negotiation via X-AlpineBits-ClientProtocolVersion header - Client identification via X-AlpineBits-ClientID header (optional) - Multipart/form-data parsing for action and request parameters - Gzip compression support - Proper error handling with HTTP status codes - Handshaking action processing Authentication: HTTP Basic Auth required Content-Type: multipart/form-data Compression: gzip supported (check X-AlpineBits-Server-Accept-Encoding) """ try: # Check required headers client_protocol_version = request.headers.get( "X-AlpineBits-ClientProtocolVersion" ) if not client_protocol_version: # Server concludes client speaks a protocol version preceding 2013-04 client_protocol_version = "pre-2013-04" _LOGGER.info( "No X-AlpineBits-ClientProtocolVersion header found, assuming pre-2013-04" ) else: _LOGGER.info("Client protocol version: %s", client_protocol_version) # Optional client ID client_id = request.headers.get("X-AlpineBits-ClientID") if client_id: _LOGGER.info("Client ID: %s", client_id) # Check content encoding content_encoding = request.headers.get("Content-Encoding") is_compressed = content_encoding == "gzip" if is_compressed: _LOGGER.info("Request is gzip compressed") # Get content type before processing content_type = request.headers.get("Content-Type", "") _LOGGER.info("Content-Type: %s", content_type) _LOGGER.info("Content-Encoding: %s", content_encoding) # Get request body body = await request.body() # Decompress if needed form_data = validate_alpinebits_body(is_compressed, content_type, body) # Check for required action parameter action = form_data.get("action") if not action: raise HTTPException( status_code=400, detail="ERROR: Missing required 'action' parameter" ) _LOGGER.info("AlpineBits action: %s", action) # Get optional request XML request_xml = form_data.get("request") server: AlpineBitsServer = app.state.alpine_bits_server version = Version.V2024_10 username, password = credentials_tupel client_info = AlpineBitsClientInfo( username=username, password=password, client_id=client_id ) # Create successful handshake response response = await server.handle_request( action, request_xml, client_info=client_info, version=version, dbsession=dbsession, ) response_xml = response.xml_content # Set response headers indicating server capabilities headers = { "Content-Type": "application/xml; charset=utf-8", "X-AlpineBits-Server-Accept-Encoding": "gzip", # Indicate gzip support "X-AlpineBits-Server-Version": "2024-10", } if is_compressed: # Compress response if client sent compressed request response_xml = gzip.compress(response_xml.encode("utf-8")) headers["Content-Encoding"] = "gzip" return Response( content=response_xml, status_code=response.status_code, headers=headers ) except HTTPException: # Re-raise HTTP exceptions (auth errors, etc.) raise except Exception as e: _LOGGER.exception("Error in AlpineBits handshake: %s", e) raise HTTPException(status_code=500, detail="Internal server error") def validate_alpinebits_body(is_compressed, content_type, body): """Check if the body conforms to AlpineBits expectations.""" if is_compressed: try: body = gzip.decompress(body) except Exception: raise HTTPException( status_code=400, detail="ERROR: Failed to decompress gzip content", ) # Check content type (after decompression) if ( "multipart/form-data" not in content_type and "application/x-www-form-urlencoded" not in content_type ): raise HTTPException( status_code=400, detail="ERROR: Content-Type must be multipart/form-data or application/x-www-form-urlencoded", ) # Parse multipart data if "multipart/form-data" in content_type: try: form_data = parse_multipart_data(content_type, body) except Exception: raise HTTPException( status_code=400, detail="ERROR: Failed to parse multipart/form-data", ) elif "application/x-www-form-urlencoded" in content_type: # Parse as urlencoded form_data = dict(urllib.parse.parse_qsl(body.decode("utf-8"))) else: raise HTTPException( status_code=400, detail="ERROR: Content-Type must be multipart/form-data or application/x-www-form-urlencoded", ) return form_data @api_router.get("/admin/stats") @limiter.limit("10/minute") async def get_api_stats(request: Request, admin_key: str = Depends(validate_api_key)): """Admin endpoint to get API usage statistics. Requires admin API key. """ if admin_key != "admin-key": raise HTTPException(status_code=403, detail="Admin access required") # In a real application, you'd fetch this from your database/monitoring system return { "status": "success", "stats": { "uptime": "Available in production deployment", "total_requests": "Available with monitoring setup", "active_api_keys": len([k for k in ["wix-webhook-key", "admin-key"] if k]), "rate_limit_backend": "redis" if os.getenv("REDIS_URL") else "memory", }, "timestamp": datetime.now().isoformat(), } # Include the API router in the main app app.include_router(api_router) @app.get("/", response_class=HTMLResponse) async def landing_page(): """Serve the under construction landing page at the root route.""" try: # Get the path to the HTML file html_path = os.path.join(os.path.dirname(__file__), "templates", "index.html") with open(html_path, encoding="utf-8") as f: html_content = f.read() return HTMLResponse(content=html_content, status_code=200) except FileNotFoundError: # Fallback if HTML file is not found html_content = """ 99tales - Under Construction

🏗️ 99tales

Under Construction

We're working hard to bring you something amazing!

API Documentation

""" return HTMLResponse(content=html_content, status_code=200) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)