Compare commits
2 Commits
454b524077
...
e2d0ef8e53
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e2d0ef8e53 | ||
|
|
f89236228d |
1329910
meta_insights_dump2025_11_27.sql
Normal file
1329910
meta_insights_dump2025_11_27.sql
Normal file
File diff suppressed because one or more lines are too long
@@ -6,7 +6,6 @@ import hashlib
|
||||
import json
|
||||
import multiprocessing
|
||||
import os
|
||||
import traceback
|
||||
import urllib.parse
|
||||
import xml.dom.minidom
|
||||
from collections import defaultdict
|
||||
@@ -30,12 +29,9 @@ from fastapi.security import (
|
||||
from pydantic import BaseModel
|
||||
from slowapi.errors import RateLimitExceeded
|
||||
from sqlalchemy import and_, select, update
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
from sqlalchemy.ext.asyncio import async_sessionmaker
|
||||
from sqlalchemy.orm import selectinload
|
||||
|
||||
from alpine_bits_python.webhook_processor import process_generic_webhook_submission, process_wix_form_submission
|
||||
|
||||
from .alpinebits_server import (
|
||||
AlpineBitsActionName,
|
||||
AlpineBitsClientInfo,
|
||||
@@ -43,14 +39,19 @@ from .alpinebits_server import (
|
||||
Version,
|
||||
)
|
||||
from .auth import validate_api_key
|
||||
from .config_loader import load_config, get_username_for_hotel
|
||||
from .config_loader import get_username_for_hotel, load_config
|
||||
from .const import HttpStatusCode, WebhookStatus
|
||||
from .conversion_service import ConversionService
|
||||
from .csv_import import CSVImporter
|
||||
from .db import Customer as DBCustomer
|
||||
from .db import Reservation as DBReservation
|
||||
from .db import Hotel, WebhookEndpoint, WebhookRequest
|
||||
from .db import ResilientAsyncSession, SessionMaker, create_database_engine
|
||||
from .db import (
|
||||
ResilientAsyncSession,
|
||||
SessionMaker,
|
||||
WebhookEndpoint,
|
||||
WebhookRequest,
|
||||
create_database_engine,
|
||||
)
|
||||
from .db_setup import run_startup_tasks
|
||||
from .email_monitoring import ReservationStatsCollector
|
||||
from .email_service import create_email_service
|
||||
@@ -218,8 +219,7 @@ async def push_listener(customer: DBCustomer, reservation: DBReservation, hotel)
|
||||
|
||||
|
||||
async def cleanup_stale_webhooks(
|
||||
async_sessionmaker: async_sessionmaker,
|
||||
timeout_minutes: int = 10
|
||||
async_sessionmaker: async_sessionmaker, timeout_minutes: int = 10
|
||||
) -> int:
|
||||
"""Reset webhooks stuck in 'processing' (worker crashed).
|
||||
|
||||
@@ -229,6 +229,7 @@ async def cleanup_stale_webhooks(
|
||||
|
||||
Returns:
|
||||
Number of stale webhooks reset
|
||||
|
||||
"""
|
||||
timeout_threshold = datetime.now(UTC) - timedelta(minutes=timeout_minutes)
|
||||
|
||||
@@ -238,12 +239,12 @@ async def cleanup_stale_webhooks(
|
||||
.where(
|
||||
and_(
|
||||
WebhookRequest.status == WebhookStatus.PROCESSING,
|
||||
WebhookRequest.processing_started_at < timeout_threshold
|
||||
WebhookRequest.processing_started_at < timeout_threshold,
|
||||
)
|
||||
)
|
||||
.values(
|
||||
status=WebhookStatus.FAILED,
|
||||
last_error='Processing timeout - worker may have crashed'
|
||||
last_error="Processing timeout - worker may have crashed",
|
||||
)
|
||||
)
|
||||
await session.commit()
|
||||
@@ -256,8 +257,7 @@ async def cleanup_stale_webhooks(
|
||||
|
||||
|
||||
async def purge_old_webhook_payloads(
|
||||
async_sessionmaker: async_sessionmaker,
|
||||
retention_days: int = 7
|
||||
async_sessionmaker: async_sessionmaker, retention_days: int = 7
|
||||
) -> int:
|
||||
"""Purge payload_json from old completed webhooks.
|
||||
|
||||
@@ -269,6 +269,7 @@ async def purge_old_webhook_payloads(
|
||||
|
||||
Returns:
|
||||
Number of payloads purged
|
||||
|
||||
"""
|
||||
cutoff_date = datetime.now(UTC) - timedelta(days=retention_days)
|
||||
|
||||
@@ -279,13 +280,10 @@ async def purge_old_webhook_payloads(
|
||||
and_(
|
||||
WebhookRequest.status == WebhookStatus.COMPLETED,
|
||||
WebhookRequest.created_at < cutoff_date,
|
||||
WebhookRequest.purged_at.is_(None) # Not already purged
|
||||
WebhookRequest.purged_at.is_(None), # Not already purged
|
||||
)
|
||||
)
|
||||
.values(
|
||||
payload_json=None,
|
||||
purged_at=datetime.now(UTC)
|
||||
)
|
||||
.values(payload_json=None, purged_at=datetime.now(UTC))
|
||||
)
|
||||
await session.commit()
|
||||
count = result.rowcount
|
||||
@@ -303,6 +301,7 @@ async def periodic_webhook_cleanup(async_sessionmaker: async_sessionmaker):
|
||||
|
||||
Args:
|
||||
async_sessionmaker: SQLAlchemy async sessionmaker
|
||||
|
||||
"""
|
||||
try:
|
||||
# Clean up stale webhooks (stuck in 'processing')
|
||||
@@ -314,7 +313,7 @@ async def periodic_webhook_cleanup(async_sessionmaker: async_sessionmaker):
|
||||
_LOGGER.debug(
|
||||
"Webhook cleanup: %d stale reset, %d payloads purged",
|
||||
stale_count,
|
||||
purged_count
|
||||
purged_count,
|
||||
)
|
||||
except Exception as e:
|
||||
_LOGGER.exception("Error during periodic webhook cleanup: %s", e)
|
||||
@@ -382,6 +381,7 @@ async def lifespan(app: FastAPI):
|
||||
|
||||
# Initialize webhook processors
|
||||
from .webhook_processor import initialize_webhook_processors
|
||||
|
||||
initialize_webhook_processors()
|
||||
_LOGGER.info("Webhook processors initialized")
|
||||
|
||||
@@ -433,6 +433,7 @@ async def lifespan(app: FastAPI):
|
||||
# Start periodic webhook cleanup (only on primary worker)
|
||||
cleanup_task = None
|
||||
if is_primary:
|
||||
|
||||
async def run_periodic_cleanup():
|
||||
"""Run cleanup tasks every 5 minutes."""
|
||||
while True:
|
||||
@@ -658,7 +659,6 @@ async def detect_language(
|
||||
raise HTTPException(status_code=500, detail=f"Error detecting language: {e!s}")
|
||||
|
||||
|
||||
|
||||
async def validate_basic_auth(
|
||||
credentials: HTTPBasicCredentials = Depends(security_basic),
|
||||
) -> str:
|
||||
@@ -754,23 +754,37 @@ async def handle_webhook_unified(
|
||||
# Extract hotel_id from payload based on webhook type
|
||||
if webhook_type == "wix_form":
|
||||
# Wix forms: field:hotelid or use default
|
||||
hotel_id_from_payload = payload.get("data", {}).get("field:hotelid") if isinstance(payload.get("data"), dict) else payload.get("field:hotelid")
|
||||
hotel_id_from_payload = (
|
||||
payload.get("data", {}).get("field:hotelid")
|
||||
if isinstance(payload.get("data"), dict)
|
||||
else payload.get("field:hotelid")
|
||||
)
|
||||
if not hotel_id_from_payload:
|
||||
hotel_id_from_payload = request.app.state.config.get("default_hotel_code", "123")
|
||||
_LOGGER.info("Legacy wix-form endpoint: using default hotel_code=%s", hotel_id_from_payload)
|
||||
hotel_id_from_payload = request.app.state.config.get(
|
||||
"default_hotel_code", "123"
|
||||
)
|
||||
_LOGGER.info(
|
||||
"Legacy wix-form endpoint: using default hotel_code=%s",
|
||||
hotel_id_from_payload,
|
||||
)
|
||||
elif webhook_type == "generic":
|
||||
# Generic webhooks: hotel_data.hotelcode or use default
|
||||
hotel_data = payload.get("hotel_data", {})
|
||||
hotel_id_from_payload = hotel_data.get("hotelcode")
|
||||
if not hotel_id_from_payload:
|
||||
hotel_id_from_payload = request.app.state.config.get("default_hotel_code", "123")
|
||||
_LOGGER.info("Legacy generic endpoint: using default hotel_code=%s", hotel_id_from_payload)
|
||||
hotel_id_from_payload = request.app.state.config.get(
|
||||
"default_hotel_code", "123"
|
||||
)
|
||||
_LOGGER.info(
|
||||
"Legacy generic endpoint: using default hotel_code=%s",
|
||||
hotel_id_from_payload,
|
||||
)
|
||||
|
||||
_LOGGER.info(
|
||||
"Legacy endpoint detected: %s, webhook_type=%s, hotel_id=%s",
|
||||
webhook_secret,
|
||||
webhook_type,
|
||||
hotel_id_from_payload
|
||||
hotel_id_from_payload,
|
||||
)
|
||||
|
||||
# Look up the webhook endpoint for this hotel and type
|
||||
@@ -780,7 +794,7 @@ async def handle_webhook_unified(
|
||||
and_(
|
||||
WebhookEndpoint.hotel_id == hotel_id_from_payload,
|
||||
WebhookEndpoint.webhook_type == webhook_type,
|
||||
WebhookEndpoint.is_enabled == True
|
||||
WebhookEndpoint.is_enabled == True,
|
||||
)
|
||||
)
|
||||
.options(selectinload(WebhookEndpoint.hotel))
|
||||
@@ -791,11 +805,11 @@ async def handle_webhook_unified(
|
||||
_LOGGER.error(
|
||||
"No webhook endpoint found for legacy endpoint: hotel_id=%s, type=%s",
|
||||
hotel_id_from_payload,
|
||||
webhook_type
|
||||
webhook_type,
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail=f"No webhook configuration found for hotel {hotel_id_from_payload}"
|
||||
detail=f"No webhook configuration found for hotel {hotel_id_from_payload}",
|
||||
)
|
||||
else:
|
||||
# New secure endpoint - look up by webhook_secret
|
||||
@@ -804,7 +818,7 @@ async def handle_webhook_unified(
|
||||
.where(
|
||||
and_(
|
||||
WebhookEndpoint.webhook_secret == webhook_secret,
|
||||
WebhookEndpoint.is_enabled == True
|
||||
WebhookEndpoint.is_enabled == True,
|
||||
)
|
||||
)
|
||||
.options(selectinload(WebhookEndpoint.hotel))
|
||||
@@ -842,7 +856,7 @@ async def handle_webhook_unified(
|
||||
_LOGGER.info(
|
||||
"Webhook already processed (webhook_id=%d, hotel=%s)",
|
||||
existing.id,
|
||||
webhook_endpoint.hotel_id
|
||||
webhook_endpoint.hotel_id,
|
||||
)
|
||||
return {
|
||||
"status": "success",
|
||||
@@ -850,11 +864,11 @@ async def handle_webhook_unified(
|
||||
"webhook_id": existing.id,
|
||||
"duplicate": True,
|
||||
}
|
||||
elif existing.status == WebhookStatus.PROCESSING:
|
||||
if existing.status == WebhookStatus.PROCESSING:
|
||||
# Another worker is processing right now
|
||||
_LOGGER.info(
|
||||
"Webhook is being processed by another worker (webhook_id=%d)",
|
||||
existing.id
|
||||
existing.id,
|
||||
)
|
||||
return {
|
||||
"status": "success",
|
||||
@@ -862,12 +876,12 @@ async def handle_webhook_unified(
|
||||
"webhook_id": existing.id,
|
||||
"duplicate": True,
|
||||
}
|
||||
elif existing.status == WebhookStatus.FAILED:
|
||||
if existing.status == WebhookStatus.FAILED:
|
||||
# Retry failed webhook
|
||||
_LOGGER.info(
|
||||
"Retrying failed webhook (webhook_id=%d, retry_count=%d)",
|
||||
existing.id,
|
||||
existing.retry_count
|
||||
existing.retry_count,
|
||||
)
|
||||
webhook_request = existing
|
||||
webhook_request.retry_count += 1
|
||||
@@ -895,13 +909,11 @@ async def handle_webhook_unified(
|
||||
if not processor:
|
||||
raise ValueError(f"No processor for type: {webhook_endpoint.webhook_type}")
|
||||
|
||||
# 7. Process webhook
|
||||
# 7. Process webhook with simplified interface
|
||||
result = await processor.process(
|
||||
payload=payload,
|
||||
webhook_request=webhook_request,
|
||||
hotel=webhook_endpoint.hotel,
|
||||
db_session=db_session,
|
||||
request=request,
|
||||
config=request.app.state.config,
|
||||
)
|
||||
|
||||
# 8. Update status
|
||||
@@ -927,101 +939,6 @@ async def handle_webhook_unified(
|
||||
raise HTTPException(status_code=500, detail="Error processing webhook")
|
||||
|
||||
|
||||
@api_router.post("/webhook/wix-form")
|
||||
@webhook_limiter.limit(WEBHOOK_RATE_LIMIT)
|
||||
async def handle_wix_form(
|
||||
request: Request, data: dict[str, Any], db_session=Depends(get_async_session)
|
||||
):
|
||||
"""Unified endpoint to handle Wix form submissions (test and production).
|
||||
|
||||
No authentication required for this endpoint.
|
||||
"""
|
||||
try:
|
||||
return await process_wix_form_submission(request, data, db_session)
|
||||
except IntegrityError as e:
|
||||
# Handle duplicate submissions gracefully - likely same form sent twice
|
||||
# or race condition between workers
|
||||
if "unique constraint" in str(e).lower() and "unique_id" in str(e).lower():
|
||||
_LOGGER.warning(
|
||||
"Duplicate submission detected (unique_id already exists). "
|
||||
"Returning success to prevent retry. Error: %s",
|
||||
str(e),
|
||||
)
|
||||
# Return success since the reservation already exists
|
||||
return {"status": "success", "message": "Reservation already processed"}
|
||||
# Re-raise if it's a different integrity error
|
||||
raise
|
||||
except Exception as e:
|
||||
_LOGGER.exception("Error in handle_wix_form")
|
||||
|
||||
log_entry = {
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"client_ip": request.client.host if request.client else "unknown",
|
||||
"headers": dict(request.headers),
|
||||
"data": data,
|
||||
"error": str(e),
|
||||
}
|
||||
|
||||
# Use asyncio to run file I/O in thread pool to avoid blocking
|
||||
logs_dir = Path("logs/errors")
|
||||
await asyncio.to_thread(logs_dir.mkdir, parents=True, mode=0o755, exist_ok=True)
|
||||
|
||||
log_filename = (
|
||||
logs_dir / f"wix_error_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
||||
)
|
||||
await asyncio.to_thread(
|
||||
log_filename.write_text,
|
||||
json.dumps(log_entry, indent=2, default=str, ensure_ascii=False),
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
_LOGGER.error("Error data logged to: %s", log_filename)
|
||||
raise HTTPException(status_code=500, detail="Error processing Wix form data")
|
||||
|
||||
|
||||
@api_router.post("/webhook/wix-form/test")
|
||||
@limiter.limit(DEFAULT_RATE_LIMIT)
|
||||
async def handle_wix_form_test(
|
||||
request: Request, data: dict[str, Any], db_session=Depends(get_async_session)
|
||||
):
|
||||
"""Test endpoint to verify the API is working with raw JSON data.
|
||||
|
||||
No authentication required for testing purposes.
|
||||
"""
|
||||
try:
|
||||
return await process_wix_form_submission(request, data, db_session)
|
||||
except Exception as e:
|
||||
_LOGGER.exception("Error in handle_wix_form_test: %s", e)
|
||||
|
||||
# Log error data to file asynchronously
|
||||
import traceback
|
||||
|
||||
log_entry = {
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"client_ip": request.client.host if request.client else "unknown",
|
||||
"headers": dict(request.headers),
|
||||
"data": data,
|
||||
"error": str(e),
|
||||
"traceback": traceback.format_exc(),
|
||||
}
|
||||
|
||||
# Use asyncio to run file I/O in thread pool to avoid blocking
|
||||
logs_dir = Path("logs/errors")
|
||||
await asyncio.to_thread(logs_dir.mkdir, parents=True, mode=0o755, exist_ok=True)
|
||||
|
||||
log_filename = (
|
||||
logs_dir / f"wix_test_error_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
||||
)
|
||||
await asyncio.to_thread(
|
||||
log_filename.write_text,
|
||||
json.dumps(log_entry, indent=2, default=str, ensure_ascii=False),
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
_LOGGER.error("Error data logged to: %s", log_filename)
|
||||
raise HTTPException(status_code=500, detail="Error processing test data")
|
||||
|
||||
|
||||
async def _process_csv_import_background(
|
||||
csv_content: str,
|
||||
filename: str,
|
||||
@@ -1044,6 +961,7 @@ async def _process_csv_import_background(
|
||||
session_maker: SessionMaker for creating database sessions
|
||||
config: Application configuration
|
||||
log_filename: Path to save the CSV file
|
||||
|
||||
"""
|
||||
try:
|
||||
# First, save the CSV file (in background)
|
||||
@@ -1067,18 +985,14 @@ async def _process_csv_import_background(
|
||||
dryrun=False,
|
||||
pre_acknowledge=True,
|
||||
client_id=hotel_code,
|
||||
username=username
|
||||
username=username,
|
||||
)
|
||||
|
||||
_LOGGER.info(
|
||||
"CSV import complete for %s: %s", filename, stats
|
||||
)
|
||||
_LOGGER.info("CSV import complete for %s: %s", filename, stats)
|
||||
finally:
|
||||
await db_session.close()
|
||||
except Exception:
|
||||
_LOGGER.exception(
|
||||
"Error processing CSV import in background for %s", filename
|
||||
)
|
||||
_LOGGER.exception("Error processing CSV import in background for %s", filename)
|
||||
|
||||
|
||||
@api_router.put("/admin/import-csv/{hotel_code}/{filename:path}")
|
||||
@@ -1150,9 +1064,7 @@ async def import_csv_endpoint(
|
||||
|
||||
# Basic validation that it looks like CSV
|
||||
if not csv_content.strip():
|
||||
raise HTTPException(
|
||||
status_code=400, detail="ERROR: CSV content is empty"
|
||||
)
|
||||
raise HTTPException(status_code=400, detail="ERROR: CSV content is empty")
|
||||
|
||||
# Create logs directory for CSV imports (blocking, but fast)
|
||||
logs_dir = Path("logs/csv_imports")
|
||||
@@ -1191,12 +1103,14 @@ async def import_csv_endpoint(
|
||||
|
||||
# Return immediate acknowledgment
|
||||
return Response(
|
||||
content=json.dumps({
|
||||
content=json.dumps(
|
||||
{
|
||||
"status": "accepted",
|
||||
"message": "CSV file received and queued for processing",
|
||||
"filename": filename,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
}),
|
||||
}
|
||||
),
|
||||
headers=response_headers,
|
||||
status_code=202,
|
||||
)
|
||||
@@ -1208,116 +1122,6 @@ async def import_csv_endpoint(
|
||||
raise HTTPException(status_code=500, detail="Error processing CSV upload")
|
||||
|
||||
|
||||
@api_router.post("/webhook/generic")
|
||||
@webhook_limiter.limit(WEBHOOK_RATE_LIMIT)
|
||||
async def handle_generic_webhook(
|
||||
request: Request, db_session=Depends(get_async_session)
|
||||
):
|
||||
"""Handle generic webhook endpoint for receiving JSON payloads.
|
||||
|
||||
Supports gzip compression, extracts customer and reservation data,
|
||||
saves to database, and triggers push notifications.
|
||||
|
||||
No authentication required for this endpoint.
|
||||
"""
|
||||
# Store data for error logging if needed
|
||||
data = None
|
||||
|
||||
try:
|
||||
timestamp = datetime.now().isoformat()
|
||||
_LOGGER.info("Received generic webhook data at %s", timestamp)
|
||||
|
||||
# Get the raw body content
|
||||
body = await request.body()
|
||||
|
||||
if not body:
|
||||
raise HTTPException(status_code=400, detail="ERROR: No content provided")
|
||||
|
||||
# Check if content is gzip compressed
|
||||
content_encoding = request.headers.get("content-encoding", "").lower()
|
||||
is_gzipped = content_encoding == "gzip"
|
||||
|
||||
# Decompress if gzipped
|
||||
if is_gzipped:
|
||||
try:
|
||||
body = gzip.decompress(body)
|
||||
_LOGGER.info("Successfully decompressed gzip content")
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"ERROR: Failed to decompress gzip content: {e}",
|
||||
) from e
|
||||
|
||||
# Parse JSON
|
||||
try:
|
||||
data = json.loads(body.decode("utf-8"))
|
||||
except json.JSONDecodeError as e:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"ERROR: Invalid JSON content: {e}",
|
||||
) from e
|
||||
|
||||
if True:
|
||||
# log to file for now
|
||||
logs_dir = Path("logs/generic_webhooks")
|
||||
await asyncio.to_thread(
|
||||
logs_dir.mkdir, parents=True, mode=0o755, exist_ok=True
|
||||
)
|
||||
log_filename = (
|
||||
logs_dir
|
||||
/ f"generic_webhook_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
||||
)
|
||||
await asyncio.to_thread(
|
||||
log_filename.write_text,
|
||||
json.dumps(data, indent=2, default=str, ensure_ascii=False),
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
# Process the webhook data and save to database
|
||||
await process_generic_webhook_submission(request, data, db_session)
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"message": "Generic webhook data received and processed successfully",
|
||||
"timestamp": timestamp,
|
||||
}
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
_LOGGER.exception("Error in handle_generic_webhook")
|
||||
|
||||
# Log error data to file asynchronously (only on error)
|
||||
error_log_entry = {
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"client_ip": request.client.host if request.client else "unknown",
|
||||
"headers": dict(request.headers),
|
||||
"data": data, # Include the parsed data if available
|
||||
"error": str(e),
|
||||
"traceback": traceback.format_exc(),
|
||||
}
|
||||
|
||||
# Use asyncio to run file I/O in thread pool to avoid blocking
|
||||
error_logs_dir = Path("logs/errors")
|
||||
await asyncio.to_thread(
|
||||
error_logs_dir.mkdir, parents=True, mode=0o755, exist_ok=True
|
||||
)
|
||||
|
||||
error_log_filename = error_logs_dir / (
|
||||
f"generic_webhook_error_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
||||
)
|
||||
await asyncio.to_thread(
|
||||
error_log_filename.write_text,
|
||||
json.dumps(error_log_entry, indent=2, default=str, ensure_ascii=False),
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
_LOGGER.error("Error data logged to: %s", error_log_filename)
|
||||
raise HTTPException(
|
||||
status_code=500, detail="Error processing generic webhook data"
|
||||
) from e
|
||||
|
||||
|
||||
async def _process_conversion_xml_background(
|
||||
xml_content: str,
|
||||
filename: str,
|
||||
|
||||
@@ -7,15 +7,18 @@ before the application starts accepting requests. It includes:
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy import select, text
|
||||
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker
|
||||
from sqlalchemy.orm import selectinload
|
||||
|
||||
from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT
|
||||
from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT, WebhookStatus
|
||||
from .customer_service import CustomerService
|
||||
from .db import create_database_engine
|
||||
from .db import WebhookEndpoint, WebhookRequest, create_database_engine
|
||||
from .logging_config import get_logger
|
||||
from .webhook_processor import webhook_registry
|
||||
|
||||
_LOGGER = get_logger(__name__)
|
||||
|
||||
@@ -236,6 +239,123 @@ async def backfill_acked_requests_username(
|
||||
)
|
||||
|
||||
|
||||
async def reprocess_stuck_webhooks(
|
||||
sessionmaker: async_sessionmaker,
|
||||
config: dict[str, Any] | None = None,
|
||||
) -> None:
|
||||
"""Reprocess webhooks that were stuck in 'processing' state.
|
||||
|
||||
Finds webhooks with status='processing' and reprocesses them.
|
||||
These are webhooks that were not fully processed in the previous run,
|
||||
likely due to a crash or unexpected shutdown.
|
||||
|
||||
Args:
|
||||
sessionmaker: SQLAlchemy async sessionmaker
|
||||
config: Application configuration dictionary
|
||||
"""
|
||||
_LOGGER.info("Checking for stuck webhooks to reprocess...")
|
||||
|
||||
async with sessionmaker() as session:
|
||||
# Find all webhooks stuck in 'processing' state
|
||||
result = await session.execute(
|
||||
select(WebhookRequest)
|
||||
.where(WebhookRequest.status == WebhookStatus.PROCESSING)
|
||||
.options(
|
||||
selectinload(WebhookRequest.webhook_endpoint).selectinload(
|
||||
WebhookEndpoint.hotel
|
||||
)
|
||||
)
|
||||
)
|
||||
stuck_webhooks = result.scalars().all()
|
||||
|
||||
if not stuck_webhooks:
|
||||
_LOGGER.info("No stuck webhooks found")
|
||||
return
|
||||
|
||||
_LOGGER.info("Found %d stuck webhooks to reprocess", len(stuck_webhooks))
|
||||
|
||||
reprocessed_count = 0
|
||||
failed_count = 0
|
||||
|
||||
for webhook_request in stuck_webhooks:
|
||||
webhook_id = webhook_request.id
|
||||
webhook_endpoint = webhook_request.webhook_endpoint
|
||||
|
||||
if not webhook_endpoint:
|
||||
_LOGGER.error(
|
||||
"Webhook request %d has no webhook_endpoint, skipping", webhook_id
|
||||
)
|
||||
webhook_request.status = WebhookStatus.FAILED
|
||||
webhook_request.last_error = (
|
||||
"No webhook endpoint found during startup reprocessing"
|
||||
)
|
||||
webhook_request.processing_completed_at = datetime.now(UTC)
|
||||
failed_count += 1
|
||||
continue
|
||||
|
||||
if not webhook_request.payload_json:
|
||||
_LOGGER.error(
|
||||
"Webhook request %d has no payload (purged?), marking as failed",
|
||||
webhook_id,
|
||||
)
|
||||
webhook_request.status = WebhookStatus.FAILED
|
||||
webhook_request.last_error = (
|
||||
"No payload available for reprocessing (purged)"
|
||||
)
|
||||
webhook_request.processing_completed_at = datetime.now(UTC)
|
||||
failed_count += 1
|
||||
continue
|
||||
|
||||
try:
|
||||
_LOGGER.info(
|
||||
"Reprocessing webhook %d (hotel=%s, type=%s)",
|
||||
webhook_id,
|
||||
webhook_endpoint.hotel_id,
|
||||
webhook_endpoint.webhook_type,
|
||||
)
|
||||
|
||||
# Get processor for webhook_type
|
||||
processor = webhook_registry.get_processor(
|
||||
webhook_endpoint.webhook_type
|
||||
)
|
||||
if not processor:
|
||||
raise ValueError(
|
||||
f"No processor for type: {webhook_endpoint.webhook_type}"
|
||||
)
|
||||
|
||||
# Reprocess webhook with simplified interface
|
||||
await processor.process(
|
||||
webhook_request=webhook_request,
|
||||
db_session=session,
|
||||
config=config,
|
||||
)
|
||||
|
||||
# Update status to completed
|
||||
webhook_request.status = WebhookStatus.COMPLETED
|
||||
webhook_request.processing_completed_at = datetime.now(UTC)
|
||||
reprocessed_count += 1
|
||||
|
||||
_LOGGER.info("Successfully reprocessed webhook %d", webhook_id)
|
||||
|
||||
except Exception as e:
|
||||
_LOGGER.exception("Failed to reprocess webhook %d: %s", webhook_id, e)
|
||||
webhook_request.status = WebhookStatus.FAILED
|
||||
webhook_request.last_error = (
|
||||
f"Reprocessing failed during startup: {str(e)[:1950]}"
|
||||
)
|
||||
webhook_request.processing_completed_at = datetime.now(UTC)
|
||||
failed_count += 1
|
||||
|
||||
# Commit all changes
|
||||
await session.commit()
|
||||
|
||||
_LOGGER.info(
|
||||
"Webhook reprocessing complete: %d successful, %d failed",
|
||||
reprocessed_count,
|
||||
failed_count,
|
||||
)
|
||||
|
||||
|
||||
async def run_startup_tasks(
|
||||
sessionmaker: async_sessionmaker,
|
||||
config: dict[str, Any] | None = None,
|
||||
@@ -284,3 +404,6 @@ async def run_startup_tasks(
|
||||
"No engine provided to run_startup_tasks, "
|
||||
"skipping config-based backfill tasks"
|
||||
)
|
||||
|
||||
# Reprocess stuck webhooks (those stuck in 'processing' state)
|
||||
await reprocess_stuck_webhooks(sessionmaker, config)
|
||||
|
||||
@@ -1,21 +1,19 @@
|
||||
"""Webhook processor interface and implementations."""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
import asyncio
|
||||
from datetime import date, datetime
|
||||
from typing import Any, Protocol
|
||||
|
||||
from fastapi import HTTPException, Request
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from alpine_bits_python.config_loader import get_advertising_account_ids
|
||||
from alpine_bits_python.auth import generate_unique_id
|
||||
from alpine_bits_python.config_loader import get_advertising_account_ids
|
||||
from alpine_bits_python.customer_service import CustomerService
|
||||
from alpine_bits_python.reservation_service import ReservationService
|
||||
from alpine_bits_python.schemas import ReservationData
|
||||
|
||||
|
||||
|
||||
from .db import Hotel, WebhookRequest
|
||||
from .logging_config import get_logger
|
||||
|
||||
@@ -32,26 +30,23 @@ class WebhookProcessorProtocol(Protocol):
|
||||
|
||||
async def process(
|
||||
self,
|
||||
payload: dict[str, Any],
|
||||
webhook_request: WebhookRequest,
|
||||
hotel: Hotel,
|
||||
db_session: AsyncSession,
|
||||
request: Request,
|
||||
config: dict[str, Any] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Process webhook payload.
|
||||
|
||||
Args:
|
||||
payload: Parsed webhook payload
|
||||
webhook_request: WebhookRequest database record
|
||||
hotel: Hotel associated with this webhook
|
||||
webhook_request: WebhookRequest database record (contains payload_json and hotel_id)
|
||||
db_session: Database session
|
||||
request: FastAPI Request object
|
||||
config: Application configuration (optional)
|
||||
|
||||
Returns:
|
||||
Response dict with status, message, customer_id, reservation_id
|
||||
|
||||
Raises:
|
||||
HTTPException on processing errors
|
||||
|
||||
"""
|
||||
...
|
||||
|
||||
@@ -68,6 +63,7 @@ class WebhookProcessorRegistry:
|
||||
|
||||
Args:
|
||||
processor: Processor instance to register
|
||||
|
||||
"""
|
||||
self._processors[processor.webhook_type] = processor
|
||||
_LOGGER.info("Registered webhook processor: %s", processor.webhook_type)
|
||||
@@ -80,16 +76,43 @@ class WebhookProcessorRegistry:
|
||||
|
||||
Returns:
|
||||
Processor instance or None if not found
|
||||
|
||||
"""
|
||||
return self._processors.get(webhook_type)
|
||||
|
||||
|
||||
async def process_wix_form_submission(request: Request, data: dict[str, Any], db):
|
||||
"""Shared business logic for handling Wix form submissions (test and production)."""
|
||||
async def process_wix_form_submission(
|
||||
request: Request | None,
|
||||
data: dict[str, Any],
|
||||
db,
|
||||
config: dict[str, Any] | None = None,
|
||||
hotel_id: str | None = None,
|
||||
event_dispatcher=None,
|
||||
):
|
||||
"""Shared business logic for handling Wix form submissions (test and production).
|
||||
|
||||
Args:
|
||||
request: FastAPI Request object (can be None during startup reprocessing)
|
||||
data: Webhook payload data
|
||||
db: Database session
|
||||
config: Application config (optional, extracted from request if not provided)
|
||||
hotel_id: Hotel ID (optional, will use from data or config default if not provided)
|
||||
event_dispatcher: Event dispatcher for push notifications (optional)
|
||||
"""
|
||||
timestamp = datetime.now().isoformat()
|
||||
|
||||
_LOGGER.info("Received Wix form data at %s", timestamp)
|
||||
|
||||
# Extract config and event_dispatcher from request if not provided
|
||||
if config is None and request is not None:
|
||||
config = request.app.state.config
|
||||
if event_dispatcher is None and request is not None:
|
||||
event_dispatcher = getattr(request.app.state, "event_dispatcher", None)
|
||||
|
||||
# Provide fallback config if still None
|
||||
if config is None:
|
||||
config = {}
|
||||
|
||||
data = data.get("data") # Handle nested "data" key if present
|
||||
|
||||
# save customer and reservation to DB
|
||||
@@ -190,18 +213,17 @@ async def process_wix_form_submission(request: Request, data: dict[str, Any], db
|
||||
db_customer = await customer_service.get_or_create_customer(customer_data)
|
||||
|
||||
# Determine hotel_code and hotel_name
|
||||
# Priority: 1) Form field, 2) Configuration default, 3) Hardcoded fallback
|
||||
hotel_code = data.get("field:hotelid", None)
|
||||
# Priority: 1) Passed hotel_id, 2) Form field, 3) Config default, 4) Fallback
|
||||
hotel_code = hotel_id or data.get("field:hotelid", None)
|
||||
|
||||
if hotel_code is None:
|
||||
_LOGGER.warning("No hotel_code provided in form data, using default")
|
||||
|
||||
hotel_code = request.app.state.config.get("default_hotel_code", "123")
|
||||
_LOGGER.warning("No hotel_code provided, using default from config")
|
||||
hotel_code = config.get("default_hotel_code", "123")
|
||||
|
||||
hotel_name = (
|
||||
data.get("field:hotelname")
|
||||
or data.get("hotelname")
|
||||
or request.app.state.config.get("default_hotel_name")
|
||||
or config.get("default_hotel_name")
|
||||
or "Frangart Inn" # fallback
|
||||
)
|
||||
|
||||
@@ -221,7 +243,7 @@ async def process_wix_form_submission(request: Request, data: dict[str, Any], db
|
||||
|
||||
# Get advertising account IDs conditionally based on fbclid/gclid presence
|
||||
meta_account_id, google_account_id = get_advertising_account_ids(
|
||||
request.app.state.config, hotel_code, fbclid, gclid
|
||||
config, hotel_code, fbclid, gclid
|
||||
)
|
||||
|
||||
reservation = ReservationData(
|
||||
@@ -252,18 +274,23 @@ async def process_wix_form_submission(request: Request, data: dict[str, Any], db
|
||||
|
||||
# Use ReservationService to create reservation
|
||||
reservation_service = ReservationService(db)
|
||||
try:
|
||||
db_reservation = await reservation_service.create_reservation(
|
||||
reservation, db_customer.id
|
||||
)
|
||||
except IntegrityError as e:
|
||||
_LOGGER.exception("Database integrity error creating reservation: %s", e)
|
||||
raise HTTPException(
|
||||
status_code=500, detail="Database error creating reservation"
|
||||
) from e
|
||||
|
||||
async def push_event():
|
||||
# Fire event for listeners (push, etc.) - hotel-specific dispatch
|
||||
dispatcher = getattr(request.app.state, "event_dispatcher", None)
|
||||
if dispatcher:
|
||||
if event_dispatcher:
|
||||
# Get hotel_code from reservation to target the right listeners
|
||||
hotel_code = getattr(db_reservation, "hotel_code", None)
|
||||
if hotel_code and hotel_code.strip():
|
||||
await dispatcher.dispatch_for_hotel(
|
||||
await event_dispatcher.dispatch_for_hotel(
|
||||
"form_processed", hotel_code, db_customer, db_reservation
|
||||
)
|
||||
_LOGGER.info("Dispatched form_processed event for hotel %s", hotel_code)
|
||||
@@ -297,40 +324,52 @@ class WixFormProcessor:
|
||||
|
||||
async def process(
|
||||
self,
|
||||
payload: dict[str, Any],
|
||||
webhook_request: WebhookRequest,
|
||||
hotel: Hotel,
|
||||
db_session: AsyncSession,
|
||||
request: Request,
|
||||
config: dict[str, Any] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Process Wix form webhook payload.
|
||||
|
||||
Args:
|
||||
payload: Parsed webhook payload
|
||||
webhook_request: WebhookRequest database record
|
||||
hotel: Hotel associated with this webhook
|
||||
db_session: Database session
|
||||
request: FastAPI Request object
|
||||
config: Application configuration (optional)
|
||||
|
||||
Returns:
|
||||
Response dict with status and details
|
||||
|
||||
"""
|
||||
# Import here to avoid circular dependency
|
||||
# Call processing function with data from webhook_request
|
||||
result = await process_wix_form_submission(
|
||||
request=None, # No request context needed
|
||||
data=webhook_request.payload_json,
|
||||
db=db_session,
|
||||
config=config,
|
||||
hotel_id=webhook_request.hotel_id,
|
||||
event_dispatcher=None, # No push events during reprocessing
|
||||
)
|
||||
|
||||
# Call existing processing function
|
||||
result = await process_wix_form_submission(request, payload, db_session)
|
||||
|
||||
# The existing function doesn't return customer/reservation IDs directly,
|
||||
# but they would be in the database session. We'll need to extract them
|
||||
# from the result or query after the fact. For now, return the result as-is.
|
||||
return result
|
||||
|
||||
|
||||
async def process_generic_webhook_submission(
|
||||
request: Request, data: dict[str, Any], db
|
||||
request: Request | None,
|
||||
data: dict[str, Any],
|
||||
db,
|
||||
config: dict[str, Any] | None = None,
|
||||
hotel_id: str | None = None,
|
||||
event_dispatcher=None,
|
||||
):
|
||||
"""Process generic webhook submissions with nested structure.
|
||||
|
||||
Args:
|
||||
request: FastAPI Request object (can be None during startup reprocessing)
|
||||
data: Webhook payload data
|
||||
db: Database session
|
||||
config: Application config (optional, extracted from request if not provided)
|
||||
hotel_id: Hotel ID (optional, will use from data or config default)
|
||||
event_dispatcher: Event dispatcher for push notifications (optional)
|
||||
|
||||
Expected structure:
|
||||
{
|
||||
"hotel_data": {"hotelname": "...", "hotelcode": "..."},
|
||||
@@ -363,6 +402,16 @@ async def process_generic_webhook_submission(
|
||||
timestamp = datetime.now().isoformat()
|
||||
_LOGGER.info("Processing generic webhook submission at %s", timestamp)
|
||||
|
||||
# Extract config and event_dispatcher from request if not provided
|
||||
if config is None and request is not None:
|
||||
config = request.app.state.config
|
||||
if event_dispatcher is None and request is not None:
|
||||
event_dispatcher = getattr(request.app.state, "event_dispatcher", None)
|
||||
|
||||
# Provide fallback config if still None
|
||||
if config is None:
|
||||
config = {}
|
||||
|
||||
# Extract nested data
|
||||
hotel_data = data.get("hotel_data", {})
|
||||
form_data = data.get("form_data", {})
|
||||
@@ -383,17 +432,16 @@ async def process_generic_webhook_submission(
|
||||
selected_offers_str = ", ".join(selected_offers) if selected_offers else None
|
||||
|
||||
# Extract hotel information
|
||||
hotel_code = hotel_data.get("hotelcode")
|
||||
# Priority: 1) Passed hotel_id, 2) Webhook data, 3) Config default, 4) Fallback
|
||||
hotel_code = hotel_id or hotel_data.get("hotelcode")
|
||||
hotel_name = hotel_data.get("hotelname")
|
||||
|
||||
if not hotel_code:
|
||||
_LOGGER.warning("No hotel_code provided in webhook data, using default")
|
||||
hotel_code = request.app.state.config.get("default_hotel_code", "123")
|
||||
_LOGGER.warning("No hotel_code provided, using default from config")
|
||||
hotel_code = config.get("default_hotel_code", "123")
|
||||
|
||||
if not hotel_name:
|
||||
hotel_name = (
|
||||
request.app.state.config.get("default_hotel_name") or "Frangart Inn"
|
||||
)
|
||||
hotel_name = config.get("default_hotel_name") or "Frangart Inn"
|
||||
|
||||
# Extract customer information
|
||||
first_name = form_data.get("name")
|
||||
@@ -509,7 +557,7 @@ async def process_generic_webhook_submission(
|
||||
|
||||
# Get advertising account IDs conditionally based on fbclid/gclid presence
|
||||
meta_account_id, google_account_id = get_advertising_account_ids(
|
||||
request.app.state.config, hotel_code, fbclid, gclid
|
||||
config, hotel_code, fbclid, gclid
|
||||
)
|
||||
|
||||
# Create reservation
|
||||
@@ -552,12 +600,11 @@ async def process_generic_webhook_submission(
|
||||
|
||||
async def push_event():
|
||||
# Fire event for listeners (push, etc.) - hotel-specific dispatch
|
||||
dispatcher = getattr(request.app.state, "event_dispatcher", None)
|
||||
if dispatcher:
|
||||
if event_dispatcher:
|
||||
# Get hotel_code from reservation to target the right listeners
|
||||
hotel_code = getattr(db_reservation, "hotel_code", None)
|
||||
if hotel_code and hotel_code.strip():
|
||||
await dispatcher.dispatch_for_hotel(
|
||||
await event_dispatcher.dispatch_for_hotel(
|
||||
"form_processed", hotel_code, db_customer, db_reservation
|
||||
)
|
||||
_LOGGER.info("Dispatched form_processed event for hotel %s", hotel_code)
|
||||
@@ -596,28 +643,30 @@ class GenericWebhookProcessor:
|
||||
|
||||
async def process(
|
||||
self,
|
||||
payload: dict[str, Any],
|
||||
webhook_request: WebhookRequest,
|
||||
hotel: Hotel,
|
||||
db_session: AsyncSession,
|
||||
request: Request,
|
||||
config: dict[str, Any] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Process generic webhook payload.
|
||||
|
||||
Args:
|
||||
payload: Parsed webhook payload
|
||||
webhook_request: WebhookRequest database record
|
||||
hotel: Hotel associated with this webhook
|
||||
db_session: Database session
|
||||
request: FastAPI Request object
|
||||
config: Application configuration (optional)
|
||||
|
||||
Returns:
|
||||
Response dict with status and details
|
||||
|
||||
"""
|
||||
|
||||
|
||||
# Call existing processing function
|
||||
result = await process_generic_webhook_submission(request, payload, db_session)
|
||||
# Call processing function with data from webhook_request
|
||||
result = await process_generic_webhook_submission(
|
||||
request=None, # No request context needed
|
||||
data=webhook_request.payload_json,
|
||||
db=db_session,
|
||||
config=config,
|
||||
hotel_id=webhook_request.hotel_id,
|
||||
event_dispatcher=None, # No push events during reprocessing
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
Reference in New Issue
Block a user