concurrency-fix #15

Merged
jonas merged 24 commits from concurrency-fix into main 2025-12-01 13:34:35 +00:00
Showing only changes of commit aec8a99b71 - Show all commits

View File

@@ -237,12 +237,12 @@ async def cleanup_stale_webhooks(
update(WebhookRequest)
.where(
and_(
WebhookRequest.status == 'processing',
WebhookRequest.status == WebhookStatus.PROCESSING,
WebhookRequest.processing_started_at < timeout_threshold
)
)
.values(
status='failed',
status=WebhookStatus.FAILED,
last_error='Processing timeout - worker may have crashed'
)
)
@@ -277,7 +277,7 @@ async def purge_old_webhook_payloads(
update(WebhookRequest)
.where(
and_(
WebhookRequest.status == 'completed',
WebhookRequest.status == WebhookStatus.COMPLETED,
WebhookRequest.created_at < cutoff_date,
WebhookRequest.purged_at.is_(None) # Not already purged
)
@@ -705,8 +705,13 @@ async def handle_webhook_unified(
):
"""Unified webhook handler with deduplication and routing.
Supports both new secure webhook URLs and legacy endpoints:
- /webhook/{64-char-secret} - New secure endpoints
- /webhook/wix-form - Legacy Wix form endpoint (extracts hotel from payload)
- /webhook/generic - Legacy generic webhook endpoint (extracts hotel from payload)
Flow:
1. Look up webhook_endpoint by webhook_secret
1. Look up webhook_endpoint by webhook_secret (or detect legacy endpoint)
2. Parse and hash payload (SHA256)
3. Check for duplicate using SELECT FOR UPDATE SKIP LOCKED
4. If duplicate and completed: return success (idempotent)
@@ -718,23 +723,7 @@ async def handle_webhook_unified(
"""
timestamp = datetime.now(UTC)
# 1. Look up webhook_endpoint
result = await db_session.execute(
select(WebhookEndpoint)
.where(
and_(
WebhookEndpoint.webhook_secret == webhook_secret,
WebhookEndpoint.is_enabled == True
)
)
.options(selectinload(WebhookEndpoint.hotel))
)
webhook_endpoint: WebhookEndpoint | None = result.scalar_one_or_none()
if not webhook_endpoint or not webhook_endpoint.hotel.is_active:
raise HTTPException(status_code=404, detail="Webhook not found")
# 2. Parse payload
# 2. Parse payload first (needed for legacy endpoint detection)
body = await request.body()
# Handle gzip compression
@@ -751,6 +740,84 @@ async def handle_webhook_unified(
_LOGGER.error("Failed to parse JSON payload: %s", e)
raise HTTPException(status_code=400, detail="Invalid JSON payload")
# 1. Detect if this is a legacy endpoint or look up webhook_endpoint
webhook_endpoint: WebhookEndpoint | None = None
is_legacy = False
webhook_type = None
hotel_id_from_payload = None
# Check if webhook_secret looks like a legacy endpoint name
if webhook_secret in ("wix-form", "generic"):
is_legacy = True
webhook_type = "wix_form" if webhook_secret == "wix-form" else "generic"
# Extract hotel_id from payload based on webhook type
if webhook_type == "wix_form":
# Wix forms: field:hotelid or use default
hotel_id_from_payload = payload.get("data", {}).get("field:hotelid") if isinstance(payload.get("data"), dict) else payload.get("field:hotelid")
if not hotel_id_from_payload:
hotel_id_from_payload = request.app.state.config.get("default_hotel_code", "123")
_LOGGER.info("Legacy wix-form endpoint: using default hotel_code=%s", hotel_id_from_payload)
elif webhook_type == "generic":
# Generic webhooks: hotel_data.hotelcode or use default
hotel_data = payload.get("hotel_data", {})
hotel_id_from_payload = hotel_data.get("hotelcode")
if not hotel_id_from_payload:
hotel_id_from_payload = request.app.state.config.get("default_hotel_code", "123")
_LOGGER.info("Legacy generic endpoint: using default hotel_code=%s", hotel_id_from_payload)
_LOGGER.info(
"Legacy endpoint detected: %s, webhook_type=%s, hotel_id=%s",
webhook_secret,
webhook_type,
hotel_id_from_payload
)
# Look up the webhook endpoint for this hotel and type
result = await db_session.execute(
select(WebhookEndpoint)
.where(
and_(
WebhookEndpoint.hotel_id == hotel_id_from_payload,
WebhookEndpoint.webhook_type == webhook_type,
WebhookEndpoint.is_enabled == True
)
)
.options(selectinload(WebhookEndpoint.hotel))
)
webhook_endpoint = result.scalar_one_or_none()
if not webhook_endpoint:
_LOGGER.error(
"No webhook endpoint found for legacy endpoint: hotel_id=%s, type=%s",
hotel_id_from_payload,
webhook_type
)
raise HTTPException(
status_code=404,
detail=f"No webhook configuration found for hotel {hotel_id_from_payload}"
)
else:
# New secure endpoint - look up by webhook_secret
result = await db_session.execute(
select(WebhookEndpoint)
.where(
and_(
WebhookEndpoint.webhook_secret == webhook_secret,
WebhookEndpoint.is_enabled == True
)
)
.options(selectinload(WebhookEndpoint.hotel))
)
webhook_endpoint = result.scalar_one_or_none()
if not webhook_endpoint:
raise HTTPException(status_code=404, detail="Webhook not found")
# Verify hotel is active
if not webhook_endpoint.hotel.is_active:
raise HTTPException(status_code=404, detail="Hotel is not active")
# 3. Hash payload (canonical JSON for consistent hashing)
payload_json_str = json.dumps(payload, sort_keys=True)
payload_hash = hashlib.sha256(payload_json_str.encode("utf-8")).hexdigest()
@@ -852,7 +919,7 @@ async def handle_webhook_unified(
except Exception as e:
_LOGGER.exception("Error processing webhook: %s", e)
webhook_request.status = 'failed'
webhook_request.status = WebhookStatus.FAILED
webhook_request.last_error = str(e)[:2000]
webhook_request.processing_completed_at = datetime.now(UTC)
await db_session.commit()