From 2b1215a43a3ee2c28bc6dd8e28e9cc7941b095df Mon Sep 17 00:00:00 2001 From: Jonas Linter Date: Thu, 27 Nov 2025 15:33:15 +0100 Subject: [PATCH] Some more refactoring. Push_events don't work at the moment --- src/alpine_bits_python/api.py | 324 ++++---------------- src/alpine_bits_python/db_setup.py | 129 +++++++- src/alpine_bits_python/webhook_processor.py | 144 ++++++--- 3 files changed, 283 insertions(+), 314 deletions(-) diff --git a/src/alpine_bits_python/api.py b/src/alpine_bits_python/api.py index c9dc88f..92819dc 100644 --- a/src/alpine_bits_python/api.py +++ b/src/alpine_bits_python/api.py @@ -6,7 +6,6 @@ import hashlib import json import multiprocessing import os -import traceback import urllib.parse import xml.dom.minidom from collections import defaultdict @@ -30,12 +29,9 @@ from fastapi.security import ( from pydantic import BaseModel from slowapi.errors import RateLimitExceeded from sqlalchemy import and_, select, update -from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import async_sessionmaker from sqlalchemy.orm import selectinload -from alpine_bits_python.webhook_processor import process_generic_webhook_submission, process_wix_form_submission - from .alpinebits_server import ( AlpineBitsActionName, AlpineBitsClientInfo, @@ -43,14 +39,19 @@ from .alpinebits_server import ( Version, ) from .auth import validate_api_key -from .config_loader import load_config, get_username_for_hotel +from .config_loader import get_username_for_hotel, load_config from .const import HttpStatusCode, WebhookStatus from .conversion_service import ConversionService from .csv_import import CSVImporter from .db import Customer as DBCustomer from .db import Reservation as DBReservation -from .db import Hotel, WebhookEndpoint, WebhookRequest -from .db import ResilientAsyncSession, SessionMaker, create_database_engine +from .db import ( + ResilientAsyncSession, + SessionMaker, + WebhookEndpoint, + WebhookRequest, + create_database_engine, +) from .db_setup import run_startup_tasks from .email_monitoring import ReservationStatsCollector from .email_service import create_email_service @@ -218,8 +219,7 @@ async def push_listener(customer: DBCustomer, reservation: DBReservation, hotel) async def cleanup_stale_webhooks( - async_sessionmaker: async_sessionmaker, - timeout_minutes: int = 10 + async_sessionmaker: async_sessionmaker, timeout_minutes: int = 10 ) -> int: """Reset webhooks stuck in 'processing' (worker crashed). @@ -229,6 +229,7 @@ async def cleanup_stale_webhooks( Returns: Number of stale webhooks reset + """ timeout_threshold = datetime.now(UTC) - timedelta(minutes=timeout_minutes) @@ -238,12 +239,12 @@ async def cleanup_stale_webhooks( .where( and_( WebhookRequest.status == WebhookStatus.PROCESSING, - WebhookRequest.processing_started_at < timeout_threshold + WebhookRequest.processing_started_at < timeout_threshold, ) ) .values( status=WebhookStatus.FAILED, - last_error='Processing timeout - worker may have crashed' + last_error="Processing timeout - worker may have crashed", ) ) await session.commit() @@ -256,8 +257,7 @@ async def cleanup_stale_webhooks( async def purge_old_webhook_payloads( - async_sessionmaker: async_sessionmaker, - retention_days: int = 7 + async_sessionmaker: async_sessionmaker, retention_days: int = 7 ) -> int: """Purge payload_json from old completed webhooks. @@ -269,6 +269,7 @@ async def purge_old_webhook_payloads( Returns: Number of payloads purged + """ cutoff_date = datetime.now(UTC) - timedelta(days=retention_days) @@ -279,13 +280,10 @@ async def purge_old_webhook_payloads( and_( WebhookRequest.status == WebhookStatus.COMPLETED, WebhookRequest.created_at < cutoff_date, - WebhookRequest.purged_at.is_(None) # Not already purged + WebhookRequest.purged_at.is_(None), # Not already purged ) ) - .values( - payload_json=None, - purged_at=datetime.now(UTC) - ) + .values(payload_json=None, purged_at=datetime.now(UTC)) ) await session.commit() count = result.rowcount @@ -303,6 +301,7 @@ async def periodic_webhook_cleanup(async_sessionmaker: async_sessionmaker): Args: async_sessionmaker: SQLAlchemy async sessionmaker + """ try: # Clean up stale webhooks (stuck in 'processing') @@ -314,7 +313,7 @@ async def periodic_webhook_cleanup(async_sessionmaker: async_sessionmaker): _LOGGER.debug( "Webhook cleanup: %d stale reset, %d payloads purged", stale_count, - purged_count + purged_count, ) except Exception as e: _LOGGER.exception("Error during periodic webhook cleanup: %s", e) @@ -382,6 +381,7 @@ async def lifespan(app: FastAPI): # Initialize webhook processors from .webhook_processor import initialize_webhook_processors + initialize_webhook_processors() _LOGGER.info("Webhook processors initialized") @@ -433,6 +433,7 @@ async def lifespan(app: FastAPI): # Start periodic webhook cleanup (only on primary worker) cleanup_task = None if is_primary: + async def run_periodic_cleanup(): """Run cleanup tasks every 5 minutes.""" while True: @@ -658,7 +659,6 @@ async def detect_language( raise HTTPException(status_code=500, detail=f"Error detecting language: {e!s}") - async def validate_basic_auth( credentials: HTTPBasicCredentials = Depends(security_basic), ) -> str: @@ -754,23 +754,37 @@ async def handle_webhook_unified( # Extract hotel_id from payload based on webhook type if webhook_type == "wix_form": # Wix forms: field:hotelid or use default - hotel_id_from_payload = payload.get("data", {}).get("field:hotelid") if isinstance(payload.get("data"), dict) else payload.get("field:hotelid") + hotel_id_from_payload = ( + payload.get("data", {}).get("field:hotelid") + if isinstance(payload.get("data"), dict) + else payload.get("field:hotelid") + ) if not hotel_id_from_payload: - hotel_id_from_payload = request.app.state.config.get("default_hotel_code", "123") - _LOGGER.info("Legacy wix-form endpoint: using default hotel_code=%s", hotel_id_from_payload) + hotel_id_from_payload = request.app.state.config.get( + "default_hotel_code", "123" + ) + _LOGGER.info( + "Legacy wix-form endpoint: using default hotel_code=%s", + hotel_id_from_payload, + ) elif webhook_type == "generic": # Generic webhooks: hotel_data.hotelcode or use default hotel_data = payload.get("hotel_data", {}) hotel_id_from_payload = hotel_data.get("hotelcode") if not hotel_id_from_payload: - hotel_id_from_payload = request.app.state.config.get("default_hotel_code", "123") - _LOGGER.info("Legacy generic endpoint: using default hotel_code=%s", hotel_id_from_payload) + hotel_id_from_payload = request.app.state.config.get( + "default_hotel_code", "123" + ) + _LOGGER.info( + "Legacy generic endpoint: using default hotel_code=%s", + hotel_id_from_payload, + ) _LOGGER.info( "Legacy endpoint detected: %s, webhook_type=%s, hotel_id=%s", webhook_secret, webhook_type, - hotel_id_from_payload + hotel_id_from_payload, ) # Look up the webhook endpoint for this hotel and type @@ -780,7 +794,7 @@ async def handle_webhook_unified( and_( WebhookEndpoint.hotel_id == hotel_id_from_payload, WebhookEndpoint.webhook_type == webhook_type, - WebhookEndpoint.is_enabled == True + WebhookEndpoint.is_enabled == True, ) ) .options(selectinload(WebhookEndpoint.hotel)) @@ -791,11 +805,11 @@ async def handle_webhook_unified( _LOGGER.error( "No webhook endpoint found for legacy endpoint: hotel_id=%s, type=%s", hotel_id_from_payload, - webhook_type + webhook_type, ) raise HTTPException( status_code=404, - detail=f"No webhook configuration found for hotel {hotel_id_from_payload}" + detail=f"No webhook configuration found for hotel {hotel_id_from_payload}", ) else: # New secure endpoint - look up by webhook_secret @@ -804,7 +818,7 @@ async def handle_webhook_unified( .where( and_( WebhookEndpoint.webhook_secret == webhook_secret, - WebhookEndpoint.is_enabled == True + WebhookEndpoint.is_enabled == True, ) ) .options(selectinload(WebhookEndpoint.hotel)) @@ -842,7 +856,7 @@ async def handle_webhook_unified( _LOGGER.info( "Webhook already processed (webhook_id=%d, hotel=%s)", existing.id, - webhook_endpoint.hotel_id + webhook_endpoint.hotel_id, ) return { "status": "success", @@ -850,11 +864,11 @@ async def handle_webhook_unified( "webhook_id": existing.id, "duplicate": True, } - elif existing.status == WebhookStatus.PROCESSING: + if existing.status == WebhookStatus.PROCESSING: # Another worker is processing right now _LOGGER.info( "Webhook is being processed by another worker (webhook_id=%d)", - existing.id + existing.id, ) return { "status": "success", @@ -862,12 +876,12 @@ async def handle_webhook_unified( "webhook_id": existing.id, "duplicate": True, } - elif existing.status == WebhookStatus.FAILED: + if existing.status == WebhookStatus.FAILED: # Retry failed webhook _LOGGER.info( "Retrying failed webhook (webhook_id=%d, retry_count=%d)", existing.id, - existing.retry_count + existing.retry_count, ) webhook_request = existing webhook_request.retry_count += 1 @@ -895,13 +909,11 @@ async def handle_webhook_unified( if not processor: raise ValueError(f"No processor for type: {webhook_endpoint.webhook_type}") - # 7. Process webhook + # 7. Process webhook with simplified interface result = await processor.process( - payload=payload, webhook_request=webhook_request, - hotel=webhook_endpoint.hotel, db_session=db_session, - request=request, + config=request.app.state.config, ) # 8. Update status @@ -927,101 +939,6 @@ async def handle_webhook_unified( raise HTTPException(status_code=500, detail="Error processing webhook") -@api_router.post("/webhook/wix-form") -@webhook_limiter.limit(WEBHOOK_RATE_LIMIT) -async def handle_wix_form( - request: Request, data: dict[str, Any], db_session=Depends(get_async_session) -): - """Unified endpoint to handle Wix form submissions (test and production). - - No authentication required for this endpoint. - """ - try: - return await process_wix_form_submission(request, data, db_session) - except IntegrityError as e: - # Handle duplicate submissions gracefully - likely same form sent twice - # or race condition between workers - if "unique constraint" in str(e).lower() and "unique_id" in str(e).lower(): - _LOGGER.warning( - "Duplicate submission detected (unique_id already exists). " - "Returning success to prevent retry. Error: %s", - str(e), - ) - # Return success since the reservation already exists - return {"status": "success", "message": "Reservation already processed"} - # Re-raise if it's a different integrity error - raise - except Exception as e: - _LOGGER.exception("Error in handle_wix_form") - - log_entry = { - "timestamp": datetime.now().isoformat(), - "client_ip": request.client.host if request.client else "unknown", - "headers": dict(request.headers), - "data": data, - "error": str(e), - } - - # Use asyncio to run file I/O in thread pool to avoid blocking - logs_dir = Path("logs/errors") - await asyncio.to_thread(logs_dir.mkdir, parents=True, mode=0o755, exist_ok=True) - - log_filename = ( - logs_dir / f"wix_error_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" - ) - await asyncio.to_thread( - log_filename.write_text, - json.dumps(log_entry, indent=2, default=str, ensure_ascii=False), - encoding="utf-8", - ) - - _LOGGER.error("Error data logged to: %s", log_filename) - raise HTTPException(status_code=500, detail="Error processing Wix form data") - - -@api_router.post("/webhook/wix-form/test") -@limiter.limit(DEFAULT_RATE_LIMIT) -async def handle_wix_form_test( - request: Request, data: dict[str, Any], db_session=Depends(get_async_session) -): - """Test endpoint to verify the API is working with raw JSON data. - - No authentication required for testing purposes. - """ - try: - return await process_wix_form_submission(request, data, db_session) - except Exception as e: - _LOGGER.exception("Error in handle_wix_form_test: %s", e) - - # Log error data to file asynchronously - import traceback - - log_entry = { - "timestamp": datetime.now().isoformat(), - "client_ip": request.client.host if request.client else "unknown", - "headers": dict(request.headers), - "data": data, - "error": str(e), - "traceback": traceback.format_exc(), - } - - # Use asyncio to run file I/O in thread pool to avoid blocking - logs_dir = Path("logs/errors") - await asyncio.to_thread(logs_dir.mkdir, parents=True, mode=0o755, exist_ok=True) - - log_filename = ( - logs_dir / f"wix_test_error_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" - ) - await asyncio.to_thread( - log_filename.write_text, - json.dumps(log_entry, indent=2, default=str, ensure_ascii=False), - encoding="utf-8", - ) - - _LOGGER.error("Error data logged to: %s", log_filename) - raise HTTPException(status_code=500, detail="Error processing test data") - - async def _process_csv_import_background( csv_content: str, filename: str, @@ -1044,6 +961,7 @@ async def _process_csv_import_background( session_maker: SessionMaker for creating database sessions config: Application configuration log_filename: Path to save the CSV file + """ try: # First, save the CSV file (in background) @@ -1067,18 +985,14 @@ async def _process_csv_import_background( dryrun=False, pre_acknowledge=True, client_id=hotel_code, - username=username + username=username, ) - _LOGGER.info( - "CSV import complete for %s: %s", filename, stats - ) + _LOGGER.info("CSV import complete for %s: %s", filename, stats) finally: await db_session.close() except Exception: - _LOGGER.exception( - "Error processing CSV import in background for %s", filename - ) + _LOGGER.exception("Error processing CSV import in background for %s", filename) @api_router.put("/admin/import-csv/{hotel_code}/{filename:path}") @@ -1150,9 +1064,7 @@ async def import_csv_endpoint( # Basic validation that it looks like CSV if not csv_content.strip(): - raise HTTPException( - status_code=400, detail="ERROR: CSV content is empty" - ) + raise HTTPException(status_code=400, detail="ERROR: CSV content is empty") # Create logs directory for CSV imports (blocking, but fast) logs_dir = Path("logs/csv_imports") @@ -1191,12 +1103,14 @@ async def import_csv_endpoint( # Return immediate acknowledgment return Response( - content=json.dumps({ - "status": "accepted", - "message": "CSV file received and queued for processing", - "filename": filename, - "timestamp": datetime.now().isoformat(), - }), + content=json.dumps( + { + "status": "accepted", + "message": "CSV file received and queued for processing", + "filename": filename, + "timestamp": datetime.now().isoformat(), + } + ), headers=response_headers, status_code=202, ) @@ -1208,116 +1122,6 @@ async def import_csv_endpoint( raise HTTPException(status_code=500, detail="Error processing CSV upload") -@api_router.post("/webhook/generic") -@webhook_limiter.limit(WEBHOOK_RATE_LIMIT) -async def handle_generic_webhook( - request: Request, db_session=Depends(get_async_session) -): - """Handle generic webhook endpoint for receiving JSON payloads. - - Supports gzip compression, extracts customer and reservation data, - saves to database, and triggers push notifications. - - No authentication required for this endpoint. - """ - # Store data for error logging if needed - data = None - - try: - timestamp = datetime.now().isoformat() - _LOGGER.info("Received generic webhook data at %s", timestamp) - - # Get the raw body content - body = await request.body() - - if not body: - raise HTTPException(status_code=400, detail="ERROR: No content provided") - - # Check if content is gzip compressed - content_encoding = request.headers.get("content-encoding", "").lower() - is_gzipped = content_encoding == "gzip" - - # Decompress if gzipped - if is_gzipped: - try: - body = gzip.decompress(body) - _LOGGER.info("Successfully decompressed gzip content") - except Exception as e: - raise HTTPException( - status_code=400, - detail=f"ERROR: Failed to decompress gzip content: {e}", - ) from e - - # Parse JSON - try: - data = json.loads(body.decode("utf-8")) - except json.JSONDecodeError as e: - raise HTTPException( - status_code=400, - detail=f"ERROR: Invalid JSON content: {e}", - ) from e - - if True: - # log to file for now - logs_dir = Path("logs/generic_webhooks") - await asyncio.to_thread( - logs_dir.mkdir, parents=True, mode=0o755, exist_ok=True - ) - log_filename = ( - logs_dir - / f"generic_webhook_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" - ) - await asyncio.to_thread( - log_filename.write_text, - json.dumps(data, indent=2, default=str, ensure_ascii=False), - encoding="utf-8", - ) - - # Process the webhook data and save to database - await process_generic_webhook_submission(request, data, db_session) - - return { - "status": "success", - "message": "Generic webhook data received and processed successfully", - "timestamp": timestamp, - } - - except HTTPException: - raise - except Exception as e: - _LOGGER.exception("Error in handle_generic_webhook") - - # Log error data to file asynchronously (only on error) - error_log_entry = { - "timestamp": datetime.now().isoformat(), - "client_ip": request.client.host if request.client else "unknown", - "headers": dict(request.headers), - "data": data, # Include the parsed data if available - "error": str(e), - "traceback": traceback.format_exc(), - } - - # Use asyncio to run file I/O in thread pool to avoid blocking - error_logs_dir = Path("logs/errors") - await asyncio.to_thread( - error_logs_dir.mkdir, parents=True, mode=0o755, exist_ok=True - ) - - error_log_filename = error_logs_dir / ( - f"generic_webhook_error_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" - ) - await asyncio.to_thread( - error_log_filename.write_text, - json.dumps(error_log_entry, indent=2, default=str, ensure_ascii=False), - encoding="utf-8", - ) - - _LOGGER.error("Error data logged to: %s", error_log_filename) - raise HTTPException( - status_code=500, detail="Error processing generic webhook data" - ) from e - - async def _process_conversion_xml_background( xml_content: str, filename: str, diff --git a/src/alpine_bits_python/db_setup.py b/src/alpine_bits_python/db_setup.py index d45af0a..82b69fa 100644 --- a/src/alpine_bits_python/db_setup.py +++ b/src/alpine_bits_python/db_setup.py @@ -7,15 +7,18 @@ before the application starts accepting requests. It includes: """ import asyncio +from datetime import UTC, datetime from typing import Any -from sqlalchemy import text +from sqlalchemy import select, text from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker +from sqlalchemy.orm import selectinload -from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT +from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT, WebhookStatus from .customer_service import CustomerService -from .db import create_database_engine +from .db import WebhookEndpoint, WebhookRequest, create_database_engine from .logging_config import get_logger +from .webhook_processor import webhook_registry _LOGGER = get_logger(__name__) @@ -236,6 +239,123 @@ async def backfill_acked_requests_username( ) +async def reprocess_stuck_webhooks( + sessionmaker: async_sessionmaker, + config: dict[str, Any] | None = None, +) -> None: + """Reprocess webhooks that were stuck in 'processing' state. + + Finds webhooks with status='processing' and reprocesses them. + These are webhooks that were not fully processed in the previous run, + likely due to a crash or unexpected shutdown. + + Args: + sessionmaker: SQLAlchemy async sessionmaker + config: Application configuration dictionary + """ + _LOGGER.info("Checking for stuck webhooks to reprocess...") + + async with sessionmaker() as session: + # Find all webhooks stuck in 'processing' state + result = await session.execute( + select(WebhookRequest) + .where(WebhookRequest.status == WebhookStatus.PROCESSING) + .options( + selectinload(WebhookRequest.webhook_endpoint).selectinload( + WebhookEndpoint.hotel + ) + ) + ) + stuck_webhooks = result.scalars().all() + + if not stuck_webhooks: + _LOGGER.info("No stuck webhooks found") + return + + _LOGGER.info("Found %d stuck webhooks to reprocess", len(stuck_webhooks)) + + reprocessed_count = 0 + failed_count = 0 + + for webhook_request in stuck_webhooks: + webhook_id = webhook_request.id + webhook_endpoint = webhook_request.webhook_endpoint + + if not webhook_endpoint: + _LOGGER.error( + "Webhook request %d has no webhook_endpoint, skipping", webhook_id + ) + webhook_request.status = WebhookStatus.FAILED + webhook_request.last_error = ( + "No webhook endpoint found during startup reprocessing" + ) + webhook_request.processing_completed_at = datetime.now(UTC) + failed_count += 1 + continue + + if not webhook_request.payload_json: + _LOGGER.error( + "Webhook request %d has no payload (purged?), marking as failed", + webhook_id, + ) + webhook_request.status = WebhookStatus.FAILED + webhook_request.last_error = ( + "No payload available for reprocessing (purged)" + ) + webhook_request.processing_completed_at = datetime.now(UTC) + failed_count += 1 + continue + + try: + _LOGGER.info( + "Reprocessing webhook %d (hotel=%s, type=%s)", + webhook_id, + webhook_endpoint.hotel_id, + webhook_endpoint.webhook_type, + ) + + # Get processor for webhook_type + processor = webhook_registry.get_processor( + webhook_endpoint.webhook_type + ) + if not processor: + raise ValueError( + f"No processor for type: {webhook_endpoint.webhook_type}" + ) + + # Reprocess webhook with simplified interface + await processor.process( + webhook_request=webhook_request, + db_session=session, + config=config, + ) + + # Update status to completed + webhook_request.status = WebhookStatus.COMPLETED + webhook_request.processing_completed_at = datetime.now(UTC) + reprocessed_count += 1 + + _LOGGER.info("Successfully reprocessed webhook %d", webhook_id) + + except Exception as e: + _LOGGER.exception("Failed to reprocess webhook %d: %s", webhook_id, e) + webhook_request.status = WebhookStatus.FAILED + webhook_request.last_error = ( + f"Reprocessing failed during startup: {str(e)[:1950]}" + ) + webhook_request.processing_completed_at = datetime.now(UTC) + failed_count += 1 + + # Commit all changes + await session.commit() + + _LOGGER.info( + "Webhook reprocessing complete: %d successful, %d failed", + reprocessed_count, + failed_count, + ) + + async def run_startup_tasks( sessionmaker: async_sessionmaker, config: dict[str, Any] | None = None, @@ -284,3 +404,6 @@ async def run_startup_tasks( "No engine provided to run_startup_tasks, " "skipping config-based backfill tasks" ) + + # Reprocess stuck webhooks (those stuck in 'processing' state) + await reprocess_stuck_webhooks(sessionmaker, config) diff --git a/src/alpine_bits_python/webhook_processor.py b/src/alpine_bits_python/webhook_processor.py index 6eb0d68..2fd769e 100644 --- a/src/alpine_bits_python/webhook_processor.py +++ b/src/alpine_bits_python/webhook_processor.py @@ -30,20 +30,16 @@ class WebhookProcessorProtocol(Protocol): async def process( self, - payload: dict[str, Any], webhook_request: WebhookRequest, - hotel: Hotel, db_session: AsyncSession, - request: Request, + config: dict[str, Any] | None = None, ) -> dict[str, Any]: """Process webhook payload. Args: - payload: Parsed webhook payload - webhook_request: WebhookRequest database record - hotel: Hotel associated with this webhook + webhook_request: WebhookRequest database record (contains payload_json and hotel_id) db_session: Database session - request: FastAPI Request object + config: Application configuration (optional) Returns: Response dict with status, message, customer_id, reservation_id @@ -85,12 +81,38 @@ class WebhookProcessorRegistry: return self._processors.get(webhook_type) -async def process_wix_form_submission(request: Request, data: dict[str, Any], db): - """Shared business logic for handling Wix form submissions (test and production).""" +async def process_wix_form_submission( + request: Request | None, + data: dict[str, Any], + db, + config: dict[str, Any] | None = None, + hotel_id: str | None = None, + event_dispatcher=None, +): + """Shared business logic for handling Wix form submissions (test and production). + + Args: + request: FastAPI Request object (can be None during startup reprocessing) + data: Webhook payload data + db: Database session + config: Application config (optional, extracted from request if not provided) + hotel_id: Hotel ID (optional, will use from data or config default if not provided) + event_dispatcher: Event dispatcher for push notifications (optional) + """ timestamp = datetime.now().isoformat() _LOGGER.info("Received Wix form data at %s", timestamp) + # Extract config and event_dispatcher from request if not provided + if config is None and request is not None: + config = request.app.state.config + if event_dispatcher is None and request is not None: + event_dispatcher = getattr(request.app.state, "event_dispatcher", None) + + # Provide fallback config if still None + if config is None: + config = {} + data = data.get("data") # Handle nested "data" key if present # save customer and reservation to DB @@ -191,18 +213,17 @@ async def process_wix_form_submission(request: Request, data: dict[str, Any], db db_customer = await customer_service.get_or_create_customer(customer_data) # Determine hotel_code and hotel_name - # Priority: 1) Form field, 2) Configuration default, 3) Hardcoded fallback - hotel_code = data.get("field:hotelid", None) + # Priority: 1) Passed hotel_id, 2) Form field, 3) Config default, 4) Fallback + hotel_code = hotel_id or data.get("field:hotelid", None) if hotel_code is None: - _LOGGER.warning("No hotel_code provided in form data, using default") - - hotel_code = request.app.state.config.get("default_hotel_code", "123") + _LOGGER.warning("No hotel_code provided, using default from config") + hotel_code = config.get("default_hotel_code", "123") hotel_name = ( data.get("field:hotelname") or data.get("hotelname") - or request.app.state.config.get("default_hotel_name") + or config.get("default_hotel_name") or "Frangart Inn" # fallback ) @@ -222,7 +243,7 @@ async def process_wix_form_submission(request: Request, data: dict[str, Any], db # Get advertising account IDs conditionally based on fbclid/gclid presence meta_account_id, google_account_id = get_advertising_account_ids( - request.app.state.config, hotel_code, fbclid, gclid + config, hotel_code, fbclid, gclid ) reservation = ReservationData( @@ -265,12 +286,11 @@ async def process_wix_form_submission(request: Request, data: dict[str, Any], db async def push_event(): # Fire event for listeners (push, etc.) - hotel-specific dispatch - dispatcher = getattr(request.app.state, "event_dispatcher", None) - if dispatcher: + if event_dispatcher: # Get hotel_code from reservation to target the right listeners hotel_code = getattr(db_reservation, "hotel_code", None) if hotel_code and hotel_code.strip(): - await dispatcher.dispatch_for_hotel( + await event_dispatcher.dispatch_for_hotel( "form_processed", hotel_code, db_customer, db_reservation ) _LOGGER.info("Dispatched form_processed event for hotel %s", hotel_code) @@ -304,41 +324,52 @@ class WixFormProcessor: async def process( self, - payload: dict[str, Any], webhook_request: WebhookRequest, - hotel: Hotel, db_session: AsyncSession, - request: Request, + config: dict[str, Any] | None = None, ) -> dict[str, Any]: """Process Wix form webhook payload. Args: - payload: Parsed webhook payload webhook_request: WebhookRequest database record - hotel: Hotel associated with this webhook db_session: Database session - request: FastAPI Request object + config: Application configuration (optional) Returns: Response dict with status and details """ - # Import here to avoid circular dependency + # Call processing function with data from webhook_request + result = await process_wix_form_submission( + request=None, # No request context needed + data=webhook_request.payload_json, + db=db_session, + config=config, + hotel_id=webhook_request.hotel_id, + event_dispatcher=None, # No push events during reprocessing + ) - # Call existing processing function - result = await process_wix_form_submission(request, payload, db_session) - - # The existing function doesn't return customer/reservation IDs directly, - # but they would be in the database session. We'll need to extract them - # from the result or query after the fact. For now, return the result as-is. return result async def process_generic_webhook_submission( - request: Request, data: dict[str, Any], db + request: Request | None, + data: dict[str, Any], + db, + config: dict[str, Any] | None = None, + hotel_id: str | None = None, + event_dispatcher=None, ): """Process generic webhook submissions with nested structure. + Args: + request: FastAPI Request object (can be None during startup reprocessing) + data: Webhook payload data + db: Database session + config: Application config (optional, extracted from request if not provided) + hotel_id: Hotel ID (optional, will use from data or config default) + event_dispatcher: Event dispatcher for push notifications (optional) + Expected structure: { "hotel_data": {"hotelname": "...", "hotelcode": "..."}, @@ -371,6 +402,16 @@ async def process_generic_webhook_submission( timestamp = datetime.now().isoformat() _LOGGER.info("Processing generic webhook submission at %s", timestamp) + # Extract config and event_dispatcher from request if not provided + if config is None and request is not None: + config = request.app.state.config + if event_dispatcher is None and request is not None: + event_dispatcher = getattr(request.app.state, "event_dispatcher", None) + + # Provide fallback config if still None + if config is None: + config = {} + # Extract nested data hotel_data = data.get("hotel_data", {}) form_data = data.get("form_data", {}) @@ -391,17 +432,16 @@ async def process_generic_webhook_submission( selected_offers_str = ", ".join(selected_offers) if selected_offers else None # Extract hotel information - hotel_code = hotel_data.get("hotelcode") + # Priority: 1) Passed hotel_id, 2) Webhook data, 3) Config default, 4) Fallback + hotel_code = hotel_id or hotel_data.get("hotelcode") hotel_name = hotel_data.get("hotelname") if not hotel_code: - _LOGGER.warning("No hotel_code provided in webhook data, using default") - hotel_code = request.app.state.config.get("default_hotel_code", "123") + _LOGGER.warning("No hotel_code provided, using default from config") + hotel_code = config.get("default_hotel_code", "123") if not hotel_name: - hotel_name = ( - request.app.state.config.get("default_hotel_name") or "Frangart Inn" - ) + hotel_name = config.get("default_hotel_name") or "Frangart Inn" # Extract customer information first_name = form_data.get("name") @@ -517,7 +557,7 @@ async def process_generic_webhook_submission( # Get advertising account IDs conditionally based on fbclid/gclid presence meta_account_id, google_account_id = get_advertising_account_ids( - request.app.state.config, hotel_code, fbclid, gclid + config, hotel_code, fbclid, gclid ) # Create reservation @@ -560,12 +600,11 @@ async def process_generic_webhook_submission( async def push_event(): # Fire event for listeners (push, etc.) - hotel-specific dispatch - dispatcher = getattr(request.app.state, "event_dispatcher", None) - if dispatcher: + if event_dispatcher: # Get hotel_code from reservation to target the right listeners hotel_code = getattr(db_reservation, "hotel_code", None) if hotel_code and hotel_code.strip(): - await dispatcher.dispatch_for_hotel( + await event_dispatcher.dispatch_for_hotel( "form_processed", hotel_code, db_customer, db_reservation ) _LOGGER.info("Dispatched form_processed event for hotel %s", hotel_code) @@ -604,27 +643,30 @@ class GenericWebhookProcessor: async def process( self, - payload: dict[str, Any], webhook_request: WebhookRequest, - hotel: Hotel, db_session: AsyncSession, - request: Request, + config: dict[str, Any] | None = None, ) -> dict[str, Any]: """Process generic webhook payload. Args: - payload: Parsed webhook payload webhook_request: WebhookRequest database record - hotel: Hotel associated with this webhook db_session: Database session - request: FastAPI Request object + config: Application configuration (optional) Returns: Response dict with status and details """ - # Call existing processing function - result = await process_generic_webhook_submission(request, payload, db_session) + # Call processing function with data from webhook_request + result = await process_generic_webhook_submission( + request=None, # No request context needed + data=webhook_request.payload_json, + db=db_session, + config=config, + hotel_id=webhook_request.hotel_id, + event_dispatcher=None, # No push events during reprocessing + ) return result