Duplicate detection improved but refactoring necessary to make the whole thing more managable

This commit is contained in:
Jonas Linter
2025-11-27 19:35:30 +01:00
parent f7158e7373
commit 7624b70fd0
3 changed files with 502 additions and 90 deletions

View File

@@ -249,110 +249,143 @@ async def reprocess_stuck_webhooks(
These are webhooks that were not fully processed in the previous run,
likely due to a crash or unexpected shutdown.
This function is designed to NEVER block application startup.
All errors are caught and logged, but the app will start regardless.
Args:
sessionmaker: SQLAlchemy async sessionmaker
config: Application configuration dictionary
"""
_LOGGER.info("Checking for stuck webhooks to reprocess...")
try:
_LOGGER.info("Checking for stuck webhooks to reprocess...")
async with sessionmaker() as session:
# Find all webhooks stuck in 'processing' state
result = await session.execute(
select(WebhookRequest)
.where(WebhookRequest.status == WebhookStatus.PROCESSING)
.options(
selectinload(WebhookRequest.webhook_endpoint).selectinload(
WebhookEndpoint.hotel
async with sessionmaker() as session:
# Find all webhooks stuck in 'processing' state
result = await session.execute(
select(WebhookRequest)
.where(WebhookRequest.status == WebhookStatus.PROCESSING)
.options(
selectinload(WebhookRequest.webhook_endpoint).selectinload(
WebhookEndpoint.hotel
)
)
)
)
stuck_webhooks = result.scalars().all()
stuck_webhooks = result.scalars().all()
if not stuck_webhooks:
_LOGGER.info("No stuck webhooks found")
return
if not stuck_webhooks:
_LOGGER.info("No stuck webhooks found")
return
_LOGGER.info("Found %d stuck webhooks to reprocess", len(stuck_webhooks))
_LOGGER.info("Found %d stuck webhooks to reprocess", len(stuck_webhooks))
reprocessed_count = 0
failed_count = 0
reprocessed_count = 0
failed_count = 0
for webhook_request in stuck_webhooks:
webhook_id = webhook_request.id
webhook_endpoint = webhook_request.webhook_endpoint
for webhook_request in stuck_webhooks:
webhook_id = webhook_request.id
webhook_endpoint = webhook_request.webhook_endpoint
if not webhook_endpoint:
_LOGGER.error(
"Webhook request %d has no webhook_endpoint, skipping", webhook_id
)
webhook_request.status = WebhookStatus.FAILED
webhook_request.last_error = (
"No webhook endpoint found during startup reprocessing"
)
webhook_request.processing_completed_at = datetime.now(UTC)
failed_count += 1
continue
if not webhook_endpoint:
_LOGGER.error(
"Webhook request %d has no webhook_endpoint, skipping", webhook_id
)
webhook_request.status = WebhookStatus.FAILED
webhook_request.last_error = (
"No webhook endpoint found during startup reprocessing"
)
webhook_request.processing_completed_at = datetime.now(UTC)
failed_count += 1
continue
if not webhook_request.payload_json:
_LOGGER.error(
"Webhook request %d has no payload (purged?), marking as failed",
webhook_id,
)
webhook_request.status = WebhookStatus.FAILED
webhook_request.last_error = (
"No payload available for reprocessing (purged)"
)
webhook_request.processing_completed_at = datetime.now(UTC)
failed_count += 1
continue
if not webhook_request.payload_json:
_LOGGER.error(
"Webhook request %d has no payload (purged?), marking as failed",
webhook_id,
)
webhook_request.status = WebhookStatus.FAILED
webhook_request.last_error = (
"No payload available for reprocessing (purged)"
)
webhook_request.processing_completed_at = datetime.now(UTC)
failed_count += 1
continue
try:
_LOGGER.info(
"Reprocessing webhook %d (hotel=%s, type=%s)",
webhook_id,
webhook_endpoint.hotel_id,
webhook_endpoint.webhook_type,
)
# Get processor for webhook_type
processor = webhook_registry.get_processor(
webhook_endpoint.webhook_type
)
if not processor:
raise ValueError(
f"No processor for type: {webhook_endpoint.webhook_type}"
try:
_LOGGER.info(
"Reprocessing webhook %d (hotel=%s, type=%s)",
webhook_id,
webhook_endpoint.hotel_id,
webhook_endpoint.webhook_type,
)
# Reprocess webhook with simplified interface
await processor.process(
webhook_request=webhook_request,
db_session=session,
config=config,
)
# Get processor for webhook_type
processor = webhook_registry.get_processor(
webhook_endpoint.webhook_type
)
if not processor:
raise ValueError(
f"No processor for type: {webhook_endpoint.webhook_type}"
)
# Update status to completed
webhook_request.status = WebhookStatus.COMPLETED
webhook_request.processing_completed_at = datetime.now(UTC)
reprocessed_count += 1
# Reprocess webhook with simplified interface
result = await processor.process(
webhook_request=webhook_request,
db_session=session,
config=config,
)
_LOGGER.info("Successfully reprocessed webhook %d", webhook_id)
# Check result status
result_status = result.get("status") if isinstance(result, dict) else "success"
except Exception as e:
_LOGGER.exception("Failed to reprocess webhook %d: %s", webhook_id, e)
webhook_request.status = WebhookStatus.FAILED
webhook_request.last_error = (
f"Reprocessing failed during startup: {str(e)[:1950]}"
)
webhook_request.processing_completed_at = datetime.now(UTC)
failed_count += 1
if result_status == "duplicate":
# Duplicate is not an error - mark as completed and continue
webhook_request.status = WebhookStatus.COMPLETED
webhook_request.processing_completed_at = datetime.now(UTC)
reprocessed_count += 1
_LOGGER.info(
"Webhook %d was a duplicate (already processed), marked as completed",
webhook_id
)
elif result_status in ("success", "completed"):
# Update status to completed
webhook_request.status = WebhookStatus.COMPLETED
webhook_request.processing_completed_at = datetime.now(UTC)
reprocessed_count += 1
_LOGGER.info("Successfully reprocessed webhook %d", webhook_id)
else:
# Unexpected status - treat as failure
_LOGGER.warning(
"Webhook %d returned unexpected status: %s",
webhook_id,
result_status
)
webhook_request.status = WebhookStatus.FAILED
webhook_request.last_error = f"Unexpected status: {result_status}"
webhook_request.processing_completed_at = datetime.now(UTC)
failed_count += 1
# Commit all changes
await session.commit()
except Exception as e:
_LOGGER.exception("Failed to reprocess webhook %d: %s", webhook_id, e)
webhook_request.status = WebhookStatus.FAILED
webhook_request.last_error = (
f"Reprocessing failed during startup: {str(e)[:1950]}"
)
webhook_request.processing_completed_at = datetime.now(UTC)
failed_count += 1
_LOGGER.info(
"Webhook reprocessing complete: %d successful, %d failed",
reprocessed_count,
failed_count,
# Commit all changes
await session.commit()
_LOGGER.info(
"Webhook reprocessing complete: %d successful, %d failed",
reprocessed_count,
failed_count,
)
except Exception as e:
# CRITICAL: Never let reprocessing block application startup
_LOGGER.exception(
"CRITICAL ERROR during webhook reprocessing, but allowing app to start: %s",
e
)

View File

@@ -273,10 +273,28 @@ async def process_wix_form_submission(
reservation, db_customer.id
)
except IntegrityError as e:
_LOGGER.exception("Database integrity error creating reservation: %s", e)
raise HTTPException(
status_code=500, detail="Database error creating reservation"
) from e
await db_session.rollback()
# Check if this is a duplicate (unique constraint violation)
error_msg = str(e.orig) if hasattr(e, 'orig') else str(e)
is_duplicate = any(keyword in error_msg.lower() for keyword in ['unique', 'duplicate', 'already exists'])
if is_duplicate:
_LOGGER.info(
"Duplicate reservation detected for unique_id=%s, skipping (this is expected for reprocessing)",
unique_id
)
return {
"status": "duplicate",
"message": "Reservation already exists (duplicate submission)",
"unique_id": unique_id,
"timestamp": timestamp,
}
else:
# Real integrity error (not a duplicate)
_LOGGER.exception("Database integrity error creating reservation: %s", e)
raise HTTPException(
status_code=500, detail="Database error creating reservation"
) from e
async def push_event():
# Fire event for listeners (push, etc.) - hotel-specific dispatch
@@ -581,9 +599,33 @@ async def process_generic_webhook_submission(
# Use ReservationService to create reservation
reservation_service = ReservationService(db_session)
db_reservation = await reservation_service.create_reservation(
reservation, db_customer.id
)
try:
db_reservation = await reservation_service.create_reservation(
reservation, db_customer.id
)
except IntegrityError as e:
await db_session.rollback()
# Check if this is a duplicate (unique constraint violation)
error_msg = str(e.orig) if hasattr(e, 'orig') else str(e)
is_duplicate = any(keyword in error_msg.lower() for keyword in ['unique', 'duplicate', 'already exists'])
if is_duplicate:
_LOGGER.info(
"Duplicate reservation detected for unique_id=%s, skipping (this is expected for reprocessing)",
unique_id
)
return {
"status": "duplicate",
"message": "Reservation already exists (duplicate submission)",
"unique_id": unique_id,
"timestamp": timestamp,
}
else:
# Real integrity error (not a duplicate)
_LOGGER.exception("Database integrity error creating reservation: %s", e)
raise HTTPException(
status_code=500, detail="Database error creating reservation"
) from e
async def push_event():
# Fire event for listeners (push, etc.) - hotel-specific dispatch