Generic webhook now gets saved to database

This commit is contained in:
Jonas Linter
2025-10-14 14:28:47 +02:00
parent 669cf00bbc
commit 8f2565b5a9
2 changed files with 542 additions and 52 deletions

View File

@@ -2,6 +2,7 @@ import asyncio
import gzip
import json
import os
import traceback
import urllib.parse
from collections import defaultdict
from datetime import date, datetime
@@ -480,6 +481,249 @@ async def process_wix_form_submission(request: Request, data: dict[str, Any], db
}
async def process_generic_webhook_submission(
request: Request, data: dict[str, Any], db
):
"""Process generic webhook submissions with nested structure.
Expected structure:
{
"hotel_data": {"hotelname": "...", "hotelcode": "..."},
"form_data": {
"sprache": "de/it/en",
"anreise": "DD.MM.YYYY",
"abreise": "DD.MM.YYYY",
"erwachsene": "N",
"kinder": "N",
"alter": {"1": "age1", "2": "age2", ...},
"anrede": "...",
"name": "...",
"nachname": "...",
"mail": "...",
"tel": "...",
"nachricht": "..."
},
"tracking_data": {
"utm_source": "...",
"utm_medium": "...",
"utm_campaign": "...",
"utm_content": "...",
"utm_term": "...",
"fbclid": "...",
"gclid": "..."
},
"timestamp": "ISO8601"
}
"""
timestamp = datetime.now().isoformat()
_LOGGER.info("Processing generic webhook submission at %s", timestamp)
# Extract nested data
hotel_data = data.get("hotel_data", {})
form_data = data.get("form_data", {})
tracking_data = data.get("tracking_data", {})
# Extract hotel information
hotel_code = hotel_data.get("hotelcode")
hotel_name = hotel_data.get("hotelname")
if not hotel_code:
_LOGGER.warning(
"No hotel_code provided in webhook data, using default"
)
hotel_code = request.app.state.config.get("default_hotel_code", "123")
if not hotel_name:
hotel_name = (
request.app.state.config.get("default_hotel_name") or "Frangart Inn"
)
# Extract customer information
first_name = form_data.get("name")
last_name = form_data.get("nachname")
email = form_data.get("mail")
phone_number = form_data.get("tel")
name_prefix = form_data.get("anrede")
language = form_data.get("sprache", "de")[:2]
user_comment = form_data.get("nachricht", "")
# Parse dates - handle DD.MM.YYYY format
start_date_str = form_data.get("anreise")
end_date_str = form_data.get("abreise")
if not start_date_str or not end_date_str:
raise HTTPException(
status_code=400,
detail="Missing required dates (anreise/abreise)"
)
try:
# Parse DD.MM.YYYY format using strptime
start_date = datetime.strptime(start_date_str, "%d.%m.%Y").date()
end_date = datetime.strptime(end_date_str, "%d.%m.%Y").date()
except ValueError as e:
_LOGGER.error(
"Error parsing dates: start=%s, end=%s, error=%s",
start_date_str,
end_date_str,
e,
)
raise HTTPException(
status_code=400, detail=f"Invalid date format: {e}"
) from e
# Extract room/guest info
num_adults = int(form_data.get("erwachsene", 2))
num_children = int(form_data.get("kinder", 0))
# Extract children ages from nested structure
children_ages = []
if num_children > 0:
alter_data = form_data.get("alter", {})
for i in range(1, num_children + 1):
age_str = alter_data.get(str(i))
if age_str:
try:
children_ages.append(int(age_str))
except ValueError:
_LOGGER.warning(
"Invalid age value for child %d: %s", i, age_str
)
# Extract tracking information
utm_source = tracking_data.get("utm_source")
utm_medium = tracking_data.get("utm_medium")
utm_campaign = tracking_data.get("utm_campaign")
utm_term = tracking_data.get("utm_term")
utm_content = tracking_data.get("utm_content")
fbclid = tracking_data.get("fbclid")
gclid = tracking_data.get("gclid")
# Parse submission timestamp
submission_time = data.get("timestamp")
try:
if submission_time:
# Handle ISO8601 format with timezone
if submission_time.endswith("Z"):
submission_time = datetime.fromisoformat(submission_time[:-1])
elif "+" in submission_time:
# Remove timezone info (e.g., +02:00)
submission_time = datetime.fromisoformat(
submission_time.split("+")[0]
)
else:
submission_time = datetime.fromisoformat(submission_time)
except Exception as e:
_LOGGER.exception("Error parsing submission timestamp: %s", e)
submission_time = None
# Generate unique ID
unique_id = generate_unique_id()
# Use CustomerService to handle customer creation/update with hashing
customer_service = CustomerService(db)
customer_data = {
"given_name": first_name,
"surname": last_name,
"contact_id": None,
"name_prefix": name_prefix if name_prefix != "--" else None,
"email_address": email,
"phone": phone_number if phone_number else None,
"email_newsletter": False,
"address_line": None,
"city_name": None,
"postal_code": None,
"country_code": None,
"gender": None,
"birth_date": None,
"language": language,
"address_catalog": False,
"name_title": None,
}
# Create/update customer
db_customer = await customer_service.get_or_create_customer(customer_data)
# Create reservation
reservation_kwargs = {
"unique_id": unique_id,
"start_date": start_date,
"end_date": end_date,
"num_adults": num_adults,
"num_children": num_children,
"children_ages": children_ages,
"hotel_code": hotel_code,
"hotel_name": hotel_name,
"offer": None,
"utm_source": utm_source,
"utm_medium": utm_medium,
"utm_campaign": utm_campaign,
"utm_term": utm_term,
"utm_content": utm_content,
"user_comment": user_comment,
"fbclid": fbclid,
"gclid": gclid,
}
# Only include created_at if we have a valid submission_time
if submission_time:
reservation_kwargs["created_at"] = submission_time
reservation = ReservationData(**reservation_kwargs)
if reservation.md5_unique_id is None:
raise HTTPException(
status_code=400, detail="Failed to generate md5_unique_id"
)
# Use ReservationService to create reservation
reservation_service = ReservationService(db)
db_reservation = await reservation_service.create_reservation(
reservation, db_customer.id
)
async def push_event():
# Fire event for listeners (push, etc.) - hotel-specific dispatch
dispatcher = getattr(request.app.state, "event_dispatcher", None)
if dispatcher:
# Get hotel_code from reservation to target the right listeners
hotel_code = getattr(db_reservation, "hotel_code", None)
if hotel_code and hotel_code.strip():
await dispatcher.dispatch_for_hotel(
"form_processed", hotel_code, db_customer, db_reservation
)
_LOGGER.info(
"Dispatched form_processed event for hotel %s", hotel_code
)
else:
_LOGGER.warning(
"No hotel_code in reservation, skipping push notifications"
)
# Create task and store reference to prevent garbage collection
task = asyncio.create_task(push_event())
# Add done callback to log any exceptions
task.add_done_callback(
lambda t: t.exception() if not t.cancelled() else None
)
_LOGGER.info(
"Successfully processed generic webhook: customer_id=%s, "
"reservation_id=%s",
db_customer.id,
db_reservation.id,
)
return {
"status": "success",
"message": "Generic webhook data processed successfully",
"customer_id": db_customer.id,
"reservation_id": db_reservation.id,
"timestamp": timestamp,
}
async def validate_basic_auth(
credentials: HTTPBasicCredentials = Depends(security_basic),
) -> str:
@@ -601,57 +845,98 @@ async def handle_wix_form_test(
@api_router.post("/webhook/generic")
@webhook_limiter.limit(WEBHOOK_RATE_LIMIT)
async def handle_generic_webhook(request: Request, data: dict[str, Any]):
async def handle_generic_webhook(
request: Request, db_session=Depends(get_async_session)
):
"""Handle generic webhook endpoint for receiving JSON payloads.
Logs the data to file for later analysis. Does not process the data
or save to database since the structure is not yet known.
Supports gzip compression, extracts customer and reservation data,
saves to database, and triggers push notifications.
No authentication required for this endpoint.
"""
# Store data for error logging if needed
data = None
try:
timestamp = datetime.now().isoformat()
_LOGGER.info("Received generic webhook data at %s", timestamp)
# Create log entry with metadata
log_entry = {
# Get the raw body content
body = await request.body()
if not body:
raise HTTPException(
status_code=400, detail="ERROR: No content provided"
)
# Check if content is gzip compressed
content_encoding = request.headers.get("content-encoding", "").lower()
is_gzipped = content_encoding == "gzip"
# Decompress if gzipped
if is_gzipped:
try:
body = gzip.decompress(body)
_LOGGER.info("Successfully decompressed gzip content")
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"ERROR: Failed to decompress gzip content: {e}",
) from e
# Parse JSON
try:
data = json.loads(body.decode("utf-8"))
except json.JSONDecodeError as e:
raise HTTPException(
status_code=400,
detail=f"ERROR: Invalid JSON content: {e}",
) from e
# Process the webhook data and save to database
await process_generic_webhook_submission(request, data, db_session)
return {
"status": "success",
"message": "Generic webhook data received and processed successfully",
"timestamp": timestamp,
"client_ip": request.client.host if request.client else "unknown",
"headers": dict(request.headers),
"data": data,
"origin_header": request.headers.get("origin"),
}
# Create logs directory if it doesn't exist
logs_dir = Path("logs/generic_webhooks")
if not logs_dir.exists():
logs_dir.mkdir(parents=True, mode=0o755, exist_ok=True)
_LOGGER.info("Created directory: %s", logs_dir)
# Generate log filename with timestamp
log_filename = (
logs_dir / f"webhook_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
)
# Write log file
with log_filename.open("w", encoding="utf-8") as f:
json.dump(log_entry, f, indent=2, default=str, ensure_ascii=False)
_LOGGER.info("Generic webhook data logged to: %s", log_filename)
except HTTPException:
raise
except Exception as e:
_LOGGER.exception("Error in handle_generic_webhook")
# Log error data to file asynchronously (only on error)
error_log_entry = {
"timestamp": datetime.now().isoformat(),
"client_ip": request.client.host if request.client else "unknown",
"headers": dict(request.headers),
"data": data, # Include the parsed data if available
"error": str(e),
"traceback": traceback.format_exc(),
}
# Use asyncio to run file I/O in thread pool to avoid blocking
error_logs_dir = Path("logs/errors")
await asyncio.to_thread(
error_logs_dir.mkdir, parents=True, mode=0o755, exist_ok=True
)
error_log_filename = error_logs_dir / (
f"generic_webhook_error_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
)
await asyncio.to_thread(
error_log_filename.write_text,
json.dumps(error_log_entry, indent=2, default=str, ensure_ascii=False),
encoding="utf-8",
)
_LOGGER.error("Error data logged to: %s", error_log_filename)
raise HTTPException(
status_code=500, detail="Error processing generic webhook data"
) from e
else:
return {
"status": "success",
"message": "Generic webhook data received successfully",
"data_logged_to": str(log_filename),
"timestamp": timestamp,
"note": "Data logged for later analysis",
}
@api_router.put("/hoteldata/conversions_import/{filename:path}")