"""Webhook processor interface and implementations.""" import asyncio from datetime import date, datetime from typing import Any, Protocol from fastapi import HTTPException from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession from alpine_bits_python.auth import generate_unique_id from alpine_bits_python.config_loader import get_advertising_account_ids from alpine_bits_python.customer_service import CustomerService from alpine_bits_python.reservation_service import ReservationService from alpine_bits_python.schemas import ReservationData from .db import WebhookRequest from .logging_config import get_logger _LOGGER = get_logger(__name__) class WebhookProcessorProtocol(Protocol): """Protocol for webhook processors.""" @property def webhook_type(self) -> str: """Return webhook type identifier (e.g., 'wix_form', 'generic').""" ... async def process( self, webhook_request: WebhookRequest, db_session: AsyncSession, config: dict[str, Any] | None = None, event_dispatcher: Any | None = None, ) -> dict[str, Any]: """Process webhook payload. Args: webhook_request: WebhookRequest database record (contains payload_json and hotel_id) db_session: Database session config: Application configuration (optional) event_dispatcher: Event dispatcher for push notifications (optional) Returns: Response dict with status, message, customer_id, reservation_id Raises: HTTPException on processing errors """ ... class WebhookProcessorRegistry: """Registry for webhook processors.""" def __init__(self): """Initialize the registry.""" self._processors: dict[str, WebhookProcessorProtocol] = {} def register(self, processor: WebhookProcessorProtocol) -> None: """Register a webhook processor. Args: processor: Processor instance to register """ self._processors[processor.webhook_type] = processor _LOGGER.info("Registered webhook processor: %s", processor.webhook_type) def get_processor(self, webhook_type: str) -> WebhookProcessorProtocol | None: """Get processor for webhook type. Args: webhook_type: Type of webhook to process Returns: Processor instance or None if not found """ return self._processors.get(webhook_type) async def process_wix_form_submission( data: dict[str, Any], db_session: AsyncSession, config: dict[str, Any] | None = None, hotel_id: str | None = None, event_dispatcher: Any | None = None, ): """Shared business logic for handling Wix form submissions (test and production). Args: data: Webhook payload data db_session: Database session config: Application config (optional) hotel_id: Hotel ID (optional, will use from data or config default if not provided) event_dispatcher: Event dispatcher for push notifications (optional) """ timestamp = datetime.now().isoformat() _LOGGER.info("Received Wix form data at %s", timestamp) # Provide fallback config if still None if config is None: config = {} data = data.get("data") # Handle nested "data" key if present # save customer and reservation to DB contact_info = data.get("contact", {}) first_name = contact_info.get("name", {}).get("first") last_name = contact_info.get("name", {}).get("last") email = contact_info.get("email") phone_number = contact_info.get("phones", [{}])[0].get("e164Phone") contact_info.get("locale", "de-de") contact_id = contact_info.get("contactId") name_prefix = data.get("field:anrede") email_newsletter = data.get("field:form_field_5a7b", False) # if email_newsletter is a string, attempt to convert to boolean, else false if isinstance(email_newsletter, str): email_newsletter = email_newsletter.lower() in [ "yes", "true", "1", "on", "selezionato", "angekreuzt", ] address_line = None city_name = None postal_code = None country_code = None gender = None birth_date = None language = data.get("contact", {}).get("locale", "en")[:2] # Dates start_date = ( data.get("field:date_picker_a7c8") or data.get("Anreisedatum") or data.get("submissions", [{}])[1].get("value") ) end_date = ( data.get("field:date_picker_7e65") or data.get("Abreisedatum") or data.get("submissions", [{}])[2].get("value") ) # Room/guest info num_adults = int(data.get("field:number_7cf5") or 1) num_children = int(data.get("field:anzahl_kinder") or 0) children_ages = [] if num_children > 0: # Collect all child age fields, then take only the first num_children # This handles form updates that may send extra padded/zero fields temp_ages = [] for k in data: if k.startswith("field:alter_kind_"): if data[k] is None or data[k] == "": continue try: age = int(data[k]) temp_ages.append(age) except ValueError: _LOGGER.warning("Invalid age value for %s: %s", k, data[k]) # Only keep the first num_children ages, regardless of their values children_ages = temp_ages[:num_children] offer = data.get("field:angebot_auswaehlen") # get submissionId and ensure max length 35. Generate one if not present unique_id = data.get("submissionId", generate_unique_id()) # Use CustomerService to handle customer creation/update with hashing customer_service = CustomerService(db_session) customer_data = { "given_name": first_name, "surname": last_name, "contact_id": contact_id, "name_prefix": name_prefix, "email_address": email, "phone": phone_number, "email_newsletter": email_newsletter, "address_line": address_line, "city_name": city_name, "postal_code": postal_code, "country_code": country_code, "gender": gender, "birth_date": birth_date, "language": language, "address_catalog": False, "name_title": None, } # This automatically creates/updates both Customer and HashedCustomer db_customer = await customer_service.get_or_create_customer(customer_data) # Determine hotel_code and hotel_name # Priority: 1) Passed hotel_id, 2) Form field, 3) Config default, 4) Fallback hotel_code = hotel_id or data.get("field:hotelid", None) if hotel_code is None: _LOGGER.warning("No hotel_code provided, using default from config") hotel_code = config.get("default_hotel_code", "123") hotel_name = ( data.get("field:hotelname") or data.get("hotelname") or config.get("default_hotel_name") or "Frangart Inn" # fallback ) submissionTime = data.get("submissionTime") # 2025-10-07T05:48:41.855Z try: if submissionTime: submissionTime = datetime.fromisoformat( submissionTime[:-1] ) # Remove Z and convert except Exception as e: _LOGGER.exception("Error parsing submissionTime: %s", e) submissionTime = None # Extract fbclid and gclid for conditional account ID lookup fbclid = data.get("field:fbclid") gclid = data.get("field:gclid") # Get advertising account IDs conditionally based on fbclid/gclid presence meta_account_id, google_account_id = get_advertising_account_ids( config, hotel_code, fbclid, gclid ) reservation = ReservationData( unique_id=unique_id, start_date=date.fromisoformat(start_date), end_date=date.fromisoformat(end_date), num_adults=num_adults, num_children=num_children, children_ages=children_ages, hotel_code=hotel_code, hotel_name=hotel_name, offer=offer, created_at=submissionTime, utm_source=data.get("field:utm_source"), utm_medium=data.get("field:utm_medium"), utm_campaign=data.get("field:utm_campaign"), utm_term=data.get("field:utm_term"), utm_content=data.get("field:utm_content"), user_comment=data.get("field:long_answer_3524", ""), fbclid=fbclid, gclid=gclid, meta_account_id=meta_account_id, google_account_id=google_account_id, ) if reservation.md5_unique_id is None: raise HTTPException(status_code=400, detail="Failed to generate md5_unique_id") # Use ReservationService to create reservation reservation_service = ReservationService(db_session) try: db_reservation = await reservation_service.create_reservation( reservation, db_customer.id ) except IntegrityError as e: await db_session.rollback() # Check if this is a duplicate (unique constraint violation) error_msg = str(e.orig) if hasattr(e, 'orig') else str(e) is_duplicate = any(keyword in error_msg.lower() for keyword in ['unique', 'duplicate', 'already exists']) if is_duplicate: _LOGGER.info( "Duplicate reservation detected for unique_id=%s, skipping (this is expected for reprocessing)", unique_id ) return { "status": "duplicate", "message": "Reservation already exists (duplicate submission)", "unique_id": unique_id, "timestamp": timestamp, } else: # Real integrity error (not a duplicate) _LOGGER.exception("Database integrity error creating reservation: %s", e) raise HTTPException( status_code=500, detail="Database error creating reservation" ) from e async def push_event(): # Fire event for listeners (push, etc.) - hotel-specific dispatch if event_dispatcher: # Get hotel_code from reservation to target the right listeners hotel_code = getattr(db_reservation, "hotel_code", None) if hotel_code and hotel_code.strip(): await event_dispatcher.dispatch_for_hotel( "form_processed", hotel_code, db_customer, db_reservation ) _LOGGER.info("Dispatched form_processed event for hotel %s", hotel_code) else: _LOGGER.warning( "No hotel_code in reservation, skipping push notifications" ) # Create task and store reference to prevent it from being garbage collected # The task runs independently and we don't need to await it here task = asyncio.create_task(push_event()) # Add done callback to log any exceptions that occur in the background task task.add_done_callback(lambda t: t.exception() if not t.cancelled() else None) return { "status": "success", "message": "Wix form data received successfully", "received_keys": list(data.keys()), "timestamp": timestamp, "note": "No authentication required for this endpoint", } class WixFormProcessor: """Processor for Wix form webhooks.""" @property def webhook_type(self) -> str: """Return webhook type identifier.""" return "wix_form" async def process( self, webhook_request: WebhookRequest, db_session: AsyncSession, config: dict[str, Any] | None = None, event_dispatcher: Any | None = None, ) -> dict[str, Any]: """Process Wix form webhook payload. Args: webhook_request: WebhookRequest database record db_session: Database session config: Application configuration (optional) event_dispatcher: Event dispatcher for push notifications (optional) Returns: Response dict with status and details """ # Call processing function with data from webhook_request result = await process_wix_form_submission( data=webhook_request.payload_json, db_session=db_session, config=config, hotel_id=webhook_request.hotel_id, event_dispatcher=event_dispatcher, ) return result async def process_generic_webhook_submission( data: dict[str, Any], db_session: AsyncSession, config: dict[str, Any] | None = None, hotel_id: str | None = None, event_dispatcher: Any | None = None, ): """Process generic webhook submissions with nested structure. Args: data: Webhook payload data db_session: Database session config: Application config (optional) hotel_id: Hotel ID (optional, will use from data or config default) event_dispatcher: Event dispatcher for push notifications (optional) Expected structure: { "hotel_data": {"hotelname": "...", "hotelcode": "..."}, "form_data": { "sprache": "de/it/en", "anreise": "DD.MM.YYYY", "abreise": "DD.MM.YYYY", "erwachsene": "N", "kinder": "N", "alter": {"1": "age1", "2": "age2", ...}, "anrede": "...", "name": "...", "nachname": "...", "mail": "...", "tel": "...", "nachricht": "..." }, "tracking_data": { "utm_source": "...", "utm_medium": "...", "utm_campaign": "...", "utm_content": "...", "utm_term": "...", "fbclid": "...", "gclid": "..." }, "timestamp": "ISO8601" } """ timestamp = datetime.now().isoformat() _LOGGER.info("Processing generic webhook submission at %s", timestamp) # Provide fallback config if still None if config is None: config = {} # Extract nested data hotel_data = data.get("hotel_data", {}) form_data = data.get("form_data", {}) tracking_data = data.get("tracking_data", {}) offer_data = form_data.get("unterkunftTyp", {}) selected_offers = [] if offer_data: # grab keys and values. If value is "on" add the key not the value to a list of selected offers offer_data: dict[str, str] for key, value in offer_data.items(): if value == "on": selected_offers.append(key) selected_offers_str = ", ".join(selected_offers) if selected_offers else None # Extract hotel information # Priority: 1) Passed hotel_id, 2) Webhook data, 3) Config default, 4) Fallback hotel_code = hotel_id or hotel_data.get("hotelcode") hotel_name = hotel_data.get("hotelname") if not hotel_code: _LOGGER.warning("No hotel_code provided, using default from config") hotel_code = config.get("default_hotel_code", "123") if not hotel_name: hotel_name = config.get("default_hotel_name") or "Frangart Inn" # Extract customer information first_name = form_data.get("name") last_name = form_data.get("nachname") email = form_data.get("mail") phone_number = form_data.get("tel") name_prefix = form_data.get("anrede") language = form_data.get("sprache", "de")[:2] user_comment = form_data.get("nachricht", "") plz = form_data.get("plz", "") city = form_data.get("stadt", "") country = form_data.get("land", "") # Parse dates - handle DD.MM.YYYY format start_date_str = form_data.get("anreise") end_date_str = form_data.get("abreise") if not start_date_str or not end_date_str: raise HTTPException( status_code=400, detail="Missing required dates (anreise/abreise)" ) try: # Parse DD.MM.YYYY format using strptime start_date = datetime.strptime(start_date_str, "%d.%m.%Y").date() end_date = datetime.strptime(end_date_str, "%d.%m.%Y").date() except ValueError as e: _LOGGER.error( "Error parsing dates: start=%s, end=%s, error=%s", start_date_str, end_date_str, e, ) raise HTTPException(status_code=400, detail=f"Invalid date format: {e}") from e # Extract room/guest info num_adults = int(form_data.get("erwachsene", 2)) num_children = int(form_data.get("kinder", 0)) # Extract children ages from nested structure children_ages = [] if num_children > 0: alter_data = form_data.get("alter", {}) for i in range(1, num_children + 1): age_str = alter_data.get(str(i)) if age_str: try: children_ages.append(int(age_str)) except ValueError: _LOGGER.warning("Invalid age value for child %d: %s", i, age_str) # Extract tracking information utm_source = None utm_medium = None utm_campaign = None utm_term = None utm_content = None fbclid = None gclid = None if tracking_data: utm_source = tracking_data.get("utm_source") utm_medium = tracking_data.get("utm_medium") utm_campaign = tracking_data.get("utm_campaign") utm_term = tracking_data.get("utm_term") utm_content = tracking_data.get("utm_content") fbclid = tracking_data.get("fbclid") gclid = tracking_data.get("gclid") # Parse submission timestamp submission_time = data.get("timestamp") try: if submission_time: # Handle ISO8601 format with timezone if submission_time.endswith("Z"): submission_time = datetime.fromisoformat(submission_time[:-1]) elif "+" in submission_time: # Remove timezone info (e.g., +02:00) submission_time = datetime.fromisoformat(submission_time.split("+")[0]) else: submission_time = datetime.fromisoformat(submission_time) except Exception as e: _LOGGER.exception("Error parsing submission timestamp: %s", e) submission_time = None # Generate unique ID unique_id = generate_unique_id() # Use CustomerService to handle customer creation/update with hashing customer_service = CustomerService(db_session) customer_data = { "given_name": first_name, "surname": last_name, "contact_id": None, "name_prefix": name_prefix if name_prefix != "--" else None, "email_address": email, "phone": phone_number if phone_number else None, "email_newsletter": False, "address_line": None, "city_name": city if city else None, "postal_code": plz if plz else None, "country_code": country if country else None, "gender": None, "birth_date": None, "language": language, "address_catalog": False, "name_title": None, } # Create/update customer db_customer = await customer_service.get_or_create_customer(customer_data) # Get advertising account IDs conditionally based on fbclid/gclid presence meta_account_id, google_account_id = get_advertising_account_ids( config, hotel_code, fbclid, gclid ) # Create reservation reservation_kwargs = { "unique_id": unique_id, "start_date": start_date, "end_date": end_date, "num_adults": num_adults, "num_children": num_children, "children_ages": children_ages, "hotel_code": hotel_code, "hotel_name": hotel_name, "offer": selected_offers_str, "utm_source": utm_source, "utm_medium": utm_medium, "utm_campaign": utm_campaign, "utm_term": utm_term, "utm_content": utm_content, "user_comment": user_comment, "fbclid": fbclid, "gclid": gclid, "meta_account_id": meta_account_id, "google_account_id": google_account_id, } # Only include created_at if we have a valid submission_time if submission_time: reservation_kwargs["created_at"] = submission_time reservation = ReservationData(**reservation_kwargs) if reservation.md5_unique_id is None: raise HTTPException(status_code=400, detail="Failed to generate md5_unique_id") # Use ReservationService to create reservation reservation_service = ReservationService(db_session) try: db_reservation = await reservation_service.create_reservation( reservation, db_customer.id ) except IntegrityError as e: await db_session.rollback() # Check if this is a duplicate (unique constraint violation) error_msg = str(e.orig) if hasattr(e, 'orig') else str(e) is_duplicate = any(keyword in error_msg.lower() for keyword in ['unique', 'duplicate', 'already exists']) if is_duplicate: _LOGGER.info( "Duplicate reservation detected for unique_id=%s, skipping (this is expected for reprocessing)", unique_id ) return { "status": "duplicate", "message": "Reservation already exists (duplicate submission)", "unique_id": unique_id, "timestamp": timestamp, } else: # Real integrity error (not a duplicate) _LOGGER.exception("Database integrity error creating reservation: %s", e) raise HTTPException( status_code=500, detail="Database error creating reservation" ) from e async def push_event(): # Fire event for listeners (push, etc.) - hotel-specific dispatch if event_dispatcher: # Get hotel_code from reservation to target the right listeners hotel_code = getattr(db_reservation, "hotel_code", None) if hotel_code and hotel_code.strip(): await event_dispatcher.dispatch_for_hotel( "form_processed", hotel_code, db_customer, db_reservation ) _LOGGER.info("Dispatched form_processed event for hotel %s", hotel_code) else: _LOGGER.warning( "No hotel_code in reservation, skipping push notifications" ) # Create task and store reference to prevent garbage collection task = asyncio.create_task(push_event()) # Add done callback to log any exceptions task.add_done_callback(lambda t: t.exception() if not t.cancelled() else None) _LOGGER.info( "Successfully processed generic webhook: customer_id=%s, reservation_id=%s", db_customer.id, db_reservation.id, ) return { "status": "success", "message": "Generic webhook data processed successfully", "customer_id": db_customer.id, "reservation_id": db_reservation.id, "timestamp": timestamp, } class GenericWebhookProcessor: """Processor for generic webhooks.""" @property def webhook_type(self) -> str: """Return webhook type identifier.""" return "generic" async def process( self, webhook_request: WebhookRequest, db_session: AsyncSession, config: dict[str, Any] | None = None, event_dispatcher: Any | None = None, ) -> dict[str, Any]: """Process generic webhook payload. Args: webhook_request: WebhookRequest database record db_session: Database session config: Application configuration (optional) event_dispatcher: Event dispatcher for push notifications (optional) Returns: Response dict with status and details """ # Call processing function with data from webhook_request result = await process_generic_webhook_submission( data=webhook_request.payload_json, db_session=db_session, config=config, hotel_id=webhook_request.hotel_id, event_dispatcher=event_dispatcher, ) return result # Global registry instance webhook_registry = WebhookProcessorRegistry() def initialize_webhook_processors() -> None: """Initialize and register all webhook processors. This should be called during application startup. """ # Register built-in processors webhook_registry.register(WixFormProcessor()) webhook_registry.register(GenericWebhookProcessor()) _LOGGER.info("Webhook processors initialized")