Some more refactoring. Push_events don't work at the moment

This commit is contained in:
Jonas Linter
2025-11-27 15:33:15 +01:00
parent f89236228d
commit e2d0ef8e53
4 changed files with 1330193 additions and 314 deletions

View File

@@ -6,7 +6,6 @@ import hashlib
import json
import multiprocessing
import os
import traceback
import urllib.parse
import xml.dom.minidom
from collections import defaultdict
@@ -30,12 +29,9 @@ from fastapi.security import (
from pydantic import BaseModel
from slowapi.errors import RateLimitExceeded
from sqlalchemy import and_, select, update
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.orm import selectinload
from alpine_bits_python.webhook_processor import process_generic_webhook_submission, process_wix_form_submission
from .alpinebits_server import (
AlpineBitsActionName,
AlpineBitsClientInfo,
@@ -43,14 +39,19 @@ from .alpinebits_server import (
Version,
)
from .auth import validate_api_key
from .config_loader import load_config, get_username_for_hotel
from .config_loader import get_username_for_hotel, load_config
from .const import HttpStatusCode, WebhookStatus
from .conversion_service import ConversionService
from .csv_import import CSVImporter
from .db import Customer as DBCustomer
from .db import Reservation as DBReservation
from .db import Hotel, WebhookEndpoint, WebhookRequest
from .db import ResilientAsyncSession, SessionMaker, create_database_engine
from .db import (
ResilientAsyncSession,
SessionMaker,
WebhookEndpoint,
WebhookRequest,
create_database_engine,
)
from .db_setup import run_startup_tasks
from .email_monitoring import ReservationStatsCollector
from .email_service import create_email_service
@@ -218,8 +219,7 @@ async def push_listener(customer: DBCustomer, reservation: DBReservation, hotel)
async def cleanup_stale_webhooks(
async_sessionmaker: async_sessionmaker,
timeout_minutes: int = 10
async_sessionmaker: async_sessionmaker, timeout_minutes: int = 10
) -> int:
"""Reset webhooks stuck in 'processing' (worker crashed).
@@ -229,6 +229,7 @@ async def cleanup_stale_webhooks(
Returns:
Number of stale webhooks reset
"""
timeout_threshold = datetime.now(UTC) - timedelta(minutes=timeout_minutes)
@@ -238,12 +239,12 @@ async def cleanup_stale_webhooks(
.where(
and_(
WebhookRequest.status == WebhookStatus.PROCESSING,
WebhookRequest.processing_started_at < timeout_threshold
WebhookRequest.processing_started_at < timeout_threshold,
)
)
.values(
status=WebhookStatus.FAILED,
last_error='Processing timeout - worker may have crashed'
last_error="Processing timeout - worker may have crashed",
)
)
await session.commit()
@@ -256,8 +257,7 @@ async def cleanup_stale_webhooks(
async def purge_old_webhook_payloads(
async_sessionmaker: async_sessionmaker,
retention_days: int = 7
async_sessionmaker: async_sessionmaker, retention_days: int = 7
) -> int:
"""Purge payload_json from old completed webhooks.
@@ -269,6 +269,7 @@ async def purge_old_webhook_payloads(
Returns:
Number of payloads purged
"""
cutoff_date = datetime.now(UTC) - timedelta(days=retention_days)
@@ -279,13 +280,10 @@ async def purge_old_webhook_payloads(
and_(
WebhookRequest.status == WebhookStatus.COMPLETED,
WebhookRequest.created_at < cutoff_date,
WebhookRequest.purged_at.is_(None) # Not already purged
WebhookRequest.purged_at.is_(None), # Not already purged
)
)
.values(
payload_json=None,
purged_at=datetime.now(UTC)
)
.values(payload_json=None, purged_at=datetime.now(UTC))
)
await session.commit()
count = result.rowcount
@@ -303,6 +301,7 @@ async def periodic_webhook_cleanup(async_sessionmaker: async_sessionmaker):
Args:
async_sessionmaker: SQLAlchemy async sessionmaker
"""
try:
# Clean up stale webhooks (stuck in 'processing')
@@ -314,7 +313,7 @@ async def periodic_webhook_cleanup(async_sessionmaker: async_sessionmaker):
_LOGGER.debug(
"Webhook cleanup: %d stale reset, %d payloads purged",
stale_count,
purged_count
purged_count,
)
except Exception as e:
_LOGGER.exception("Error during periodic webhook cleanup: %s", e)
@@ -382,6 +381,7 @@ async def lifespan(app: FastAPI):
# Initialize webhook processors
from .webhook_processor import initialize_webhook_processors
initialize_webhook_processors()
_LOGGER.info("Webhook processors initialized")
@@ -433,6 +433,7 @@ async def lifespan(app: FastAPI):
# Start periodic webhook cleanup (only on primary worker)
cleanup_task = None
if is_primary:
async def run_periodic_cleanup():
"""Run cleanup tasks every 5 minutes."""
while True:
@@ -658,7 +659,6 @@ async def detect_language(
raise HTTPException(status_code=500, detail=f"Error detecting language: {e!s}")
async def validate_basic_auth(
credentials: HTTPBasicCredentials = Depends(security_basic),
) -> str:
@@ -754,23 +754,37 @@ async def handle_webhook_unified(
# Extract hotel_id from payload based on webhook type
if webhook_type == "wix_form":
# Wix forms: field:hotelid or use default
hotel_id_from_payload = payload.get("data", {}).get("field:hotelid") if isinstance(payload.get("data"), dict) else payload.get("field:hotelid")
hotel_id_from_payload = (
payload.get("data", {}).get("field:hotelid")
if isinstance(payload.get("data"), dict)
else payload.get("field:hotelid")
)
if not hotel_id_from_payload:
hotel_id_from_payload = request.app.state.config.get("default_hotel_code", "123")
_LOGGER.info("Legacy wix-form endpoint: using default hotel_code=%s", hotel_id_from_payload)
hotel_id_from_payload = request.app.state.config.get(
"default_hotel_code", "123"
)
_LOGGER.info(
"Legacy wix-form endpoint: using default hotel_code=%s",
hotel_id_from_payload,
)
elif webhook_type == "generic":
# Generic webhooks: hotel_data.hotelcode or use default
hotel_data = payload.get("hotel_data", {})
hotel_id_from_payload = hotel_data.get("hotelcode")
if not hotel_id_from_payload:
hotel_id_from_payload = request.app.state.config.get("default_hotel_code", "123")
_LOGGER.info("Legacy generic endpoint: using default hotel_code=%s", hotel_id_from_payload)
hotel_id_from_payload = request.app.state.config.get(
"default_hotel_code", "123"
)
_LOGGER.info(
"Legacy generic endpoint: using default hotel_code=%s",
hotel_id_from_payload,
)
_LOGGER.info(
"Legacy endpoint detected: %s, webhook_type=%s, hotel_id=%s",
webhook_secret,
webhook_type,
hotel_id_from_payload
hotel_id_from_payload,
)
# Look up the webhook endpoint for this hotel and type
@@ -780,7 +794,7 @@ async def handle_webhook_unified(
and_(
WebhookEndpoint.hotel_id == hotel_id_from_payload,
WebhookEndpoint.webhook_type == webhook_type,
WebhookEndpoint.is_enabled == True
WebhookEndpoint.is_enabled == True,
)
)
.options(selectinload(WebhookEndpoint.hotel))
@@ -791,11 +805,11 @@ async def handle_webhook_unified(
_LOGGER.error(
"No webhook endpoint found for legacy endpoint: hotel_id=%s, type=%s",
hotel_id_from_payload,
webhook_type
webhook_type,
)
raise HTTPException(
status_code=404,
detail=f"No webhook configuration found for hotel {hotel_id_from_payload}"
detail=f"No webhook configuration found for hotel {hotel_id_from_payload}",
)
else:
# New secure endpoint - look up by webhook_secret
@@ -804,7 +818,7 @@ async def handle_webhook_unified(
.where(
and_(
WebhookEndpoint.webhook_secret == webhook_secret,
WebhookEndpoint.is_enabled == True
WebhookEndpoint.is_enabled == True,
)
)
.options(selectinload(WebhookEndpoint.hotel))
@@ -842,7 +856,7 @@ async def handle_webhook_unified(
_LOGGER.info(
"Webhook already processed (webhook_id=%d, hotel=%s)",
existing.id,
webhook_endpoint.hotel_id
webhook_endpoint.hotel_id,
)
return {
"status": "success",
@@ -850,11 +864,11 @@ async def handle_webhook_unified(
"webhook_id": existing.id,
"duplicate": True,
}
elif existing.status == WebhookStatus.PROCESSING:
if existing.status == WebhookStatus.PROCESSING:
# Another worker is processing right now
_LOGGER.info(
"Webhook is being processed by another worker (webhook_id=%d)",
existing.id
existing.id,
)
return {
"status": "success",
@@ -862,12 +876,12 @@ async def handle_webhook_unified(
"webhook_id": existing.id,
"duplicate": True,
}
elif existing.status == WebhookStatus.FAILED:
if existing.status == WebhookStatus.FAILED:
# Retry failed webhook
_LOGGER.info(
"Retrying failed webhook (webhook_id=%d, retry_count=%d)",
existing.id,
existing.retry_count
existing.retry_count,
)
webhook_request = existing
webhook_request.retry_count += 1
@@ -895,13 +909,11 @@ async def handle_webhook_unified(
if not processor:
raise ValueError(f"No processor for type: {webhook_endpoint.webhook_type}")
# 7. Process webhook
# 7. Process webhook with simplified interface
result = await processor.process(
payload=payload,
webhook_request=webhook_request,
hotel=webhook_endpoint.hotel,
db_session=db_session,
request=request,
config=request.app.state.config,
)
# 8. Update status
@@ -927,101 +939,6 @@ async def handle_webhook_unified(
raise HTTPException(status_code=500, detail="Error processing webhook")
@api_router.post("/webhook/wix-form")
@webhook_limiter.limit(WEBHOOK_RATE_LIMIT)
async def handle_wix_form(
request: Request, data: dict[str, Any], db_session=Depends(get_async_session)
):
"""Unified endpoint to handle Wix form submissions (test and production).
No authentication required for this endpoint.
"""
try:
return await process_wix_form_submission(request, data, db_session)
except IntegrityError as e:
# Handle duplicate submissions gracefully - likely same form sent twice
# or race condition between workers
if "unique constraint" in str(e).lower() and "unique_id" in str(e).lower():
_LOGGER.warning(
"Duplicate submission detected (unique_id already exists). "
"Returning success to prevent retry. Error: %s",
str(e),
)
# Return success since the reservation already exists
return {"status": "success", "message": "Reservation already processed"}
# Re-raise if it's a different integrity error
raise
except Exception as e:
_LOGGER.exception("Error in handle_wix_form")
log_entry = {
"timestamp": datetime.now().isoformat(),
"client_ip": request.client.host if request.client else "unknown",
"headers": dict(request.headers),
"data": data,
"error": str(e),
}
# Use asyncio to run file I/O in thread pool to avoid blocking
logs_dir = Path("logs/errors")
await asyncio.to_thread(logs_dir.mkdir, parents=True, mode=0o755, exist_ok=True)
log_filename = (
logs_dir / f"wix_error_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
)
await asyncio.to_thread(
log_filename.write_text,
json.dumps(log_entry, indent=2, default=str, ensure_ascii=False),
encoding="utf-8",
)
_LOGGER.error("Error data logged to: %s", log_filename)
raise HTTPException(status_code=500, detail="Error processing Wix form data")
@api_router.post("/webhook/wix-form/test")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def handle_wix_form_test(
request: Request, data: dict[str, Any], db_session=Depends(get_async_session)
):
"""Test endpoint to verify the API is working with raw JSON data.
No authentication required for testing purposes.
"""
try:
return await process_wix_form_submission(request, data, db_session)
except Exception as e:
_LOGGER.exception("Error in handle_wix_form_test: %s", e)
# Log error data to file asynchronously
import traceback
log_entry = {
"timestamp": datetime.now().isoformat(),
"client_ip": request.client.host if request.client else "unknown",
"headers": dict(request.headers),
"data": data,
"error": str(e),
"traceback": traceback.format_exc(),
}
# Use asyncio to run file I/O in thread pool to avoid blocking
logs_dir = Path("logs/errors")
await asyncio.to_thread(logs_dir.mkdir, parents=True, mode=0o755, exist_ok=True)
log_filename = (
logs_dir / f"wix_test_error_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
)
await asyncio.to_thread(
log_filename.write_text,
json.dumps(log_entry, indent=2, default=str, ensure_ascii=False),
encoding="utf-8",
)
_LOGGER.error("Error data logged to: %s", log_filename)
raise HTTPException(status_code=500, detail="Error processing test data")
async def _process_csv_import_background(
csv_content: str,
filename: str,
@@ -1044,6 +961,7 @@ async def _process_csv_import_background(
session_maker: SessionMaker for creating database sessions
config: Application configuration
log_filename: Path to save the CSV file
"""
try:
# First, save the CSV file (in background)
@@ -1067,18 +985,14 @@ async def _process_csv_import_background(
dryrun=False,
pre_acknowledge=True,
client_id=hotel_code,
username=username
username=username,
)
_LOGGER.info(
"CSV import complete for %s: %s", filename, stats
)
_LOGGER.info("CSV import complete for %s: %s", filename, stats)
finally:
await db_session.close()
except Exception:
_LOGGER.exception(
"Error processing CSV import in background for %s", filename
)
_LOGGER.exception("Error processing CSV import in background for %s", filename)
@api_router.put("/admin/import-csv/{hotel_code}/{filename:path}")
@@ -1150,9 +1064,7 @@ async def import_csv_endpoint(
# Basic validation that it looks like CSV
if not csv_content.strip():
raise HTTPException(
status_code=400, detail="ERROR: CSV content is empty"
)
raise HTTPException(status_code=400, detail="ERROR: CSV content is empty")
# Create logs directory for CSV imports (blocking, but fast)
logs_dir = Path("logs/csv_imports")
@@ -1191,12 +1103,14 @@ async def import_csv_endpoint(
# Return immediate acknowledgment
return Response(
content=json.dumps({
"status": "accepted",
"message": "CSV file received and queued for processing",
"filename": filename,
"timestamp": datetime.now().isoformat(),
}),
content=json.dumps(
{
"status": "accepted",
"message": "CSV file received and queued for processing",
"filename": filename,
"timestamp": datetime.now().isoformat(),
}
),
headers=response_headers,
status_code=202,
)
@@ -1208,116 +1122,6 @@ async def import_csv_endpoint(
raise HTTPException(status_code=500, detail="Error processing CSV upload")
@api_router.post("/webhook/generic")
@webhook_limiter.limit(WEBHOOK_RATE_LIMIT)
async def handle_generic_webhook(
request: Request, db_session=Depends(get_async_session)
):
"""Handle generic webhook endpoint for receiving JSON payloads.
Supports gzip compression, extracts customer and reservation data,
saves to database, and triggers push notifications.
No authentication required for this endpoint.
"""
# Store data for error logging if needed
data = None
try:
timestamp = datetime.now().isoformat()
_LOGGER.info("Received generic webhook data at %s", timestamp)
# Get the raw body content
body = await request.body()
if not body:
raise HTTPException(status_code=400, detail="ERROR: No content provided")
# Check if content is gzip compressed
content_encoding = request.headers.get("content-encoding", "").lower()
is_gzipped = content_encoding == "gzip"
# Decompress if gzipped
if is_gzipped:
try:
body = gzip.decompress(body)
_LOGGER.info("Successfully decompressed gzip content")
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"ERROR: Failed to decompress gzip content: {e}",
) from e
# Parse JSON
try:
data = json.loads(body.decode("utf-8"))
except json.JSONDecodeError as e:
raise HTTPException(
status_code=400,
detail=f"ERROR: Invalid JSON content: {e}",
) from e
if True:
# log to file for now
logs_dir = Path("logs/generic_webhooks")
await asyncio.to_thread(
logs_dir.mkdir, parents=True, mode=0o755, exist_ok=True
)
log_filename = (
logs_dir
/ f"generic_webhook_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
)
await asyncio.to_thread(
log_filename.write_text,
json.dumps(data, indent=2, default=str, ensure_ascii=False),
encoding="utf-8",
)
# Process the webhook data and save to database
await process_generic_webhook_submission(request, data, db_session)
return {
"status": "success",
"message": "Generic webhook data received and processed successfully",
"timestamp": timestamp,
}
except HTTPException:
raise
except Exception as e:
_LOGGER.exception("Error in handle_generic_webhook")
# Log error data to file asynchronously (only on error)
error_log_entry = {
"timestamp": datetime.now().isoformat(),
"client_ip": request.client.host if request.client else "unknown",
"headers": dict(request.headers),
"data": data, # Include the parsed data if available
"error": str(e),
"traceback": traceback.format_exc(),
}
# Use asyncio to run file I/O in thread pool to avoid blocking
error_logs_dir = Path("logs/errors")
await asyncio.to_thread(
error_logs_dir.mkdir, parents=True, mode=0o755, exist_ok=True
)
error_log_filename = error_logs_dir / (
f"generic_webhook_error_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
)
await asyncio.to_thread(
error_log_filename.write_text,
json.dumps(error_log_entry, indent=2, default=str, ensure_ascii=False),
encoding="utf-8",
)
_LOGGER.error("Error data logged to: %s", error_log_filename)
raise HTTPException(
status_code=500, detail="Error processing generic webhook data"
) from e
async def _process_conversion_xml_background(
xml_content: str,
filename: str,