Duplicate detection improved but refactoring necessary to make the whole thing more managable

This commit is contained in:
Jonas Linter
2025-11-27 19:35:30 +01:00
parent a07edfe3ec
commit c402b28b72
3 changed files with 502 additions and 90 deletions

View File

@@ -249,110 +249,143 @@ async def reprocess_stuck_webhooks(
These are webhooks that were not fully processed in the previous run, These are webhooks that were not fully processed in the previous run,
likely due to a crash or unexpected shutdown. likely due to a crash or unexpected shutdown.
This function is designed to NEVER block application startup.
All errors are caught and logged, but the app will start regardless.
Args: Args:
sessionmaker: SQLAlchemy async sessionmaker sessionmaker: SQLAlchemy async sessionmaker
config: Application configuration dictionary config: Application configuration dictionary
""" """
_LOGGER.info("Checking for stuck webhooks to reprocess...") try:
_LOGGER.info("Checking for stuck webhooks to reprocess...")
async with sessionmaker() as session: async with sessionmaker() as session:
# Find all webhooks stuck in 'processing' state # Find all webhooks stuck in 'processing' state
result = await session.execute( result = await session.execute(
select(WebhookRequest) select(WebhookRequest)
.where(WebhookRequest.status == WebhookStatus.PROCESSING) .where(WebhookRequest.status == WebhookStatus.PROCESSING)
.options( .options(
selectinload(WebhookRequest.webhook_endpoint).selectinload( selectinload(WebhookRequest.webhook_endpoint).selectinload(
WebhookEndpoint.hotel WebhookEndpoint.hotel
)
) )
) )
) stuck_webhooks = result.scalars().all()
stuck_webhooks = result.scalars().all()
if not stuck_webhooks: if not stuck_webhooks:
_LOGGER.info("No stuck webhooks found") _LOGGER.info("No stuck webhooks found")
return return
_LOGGER.info("Found %d stuck webhooks to reprocess", len(stuck_webhooks)) _LOGGER.info("Found %d stuck webhooks to reprocess", len(stuck_webhooks))
reprocessed_count = 0 reprocessed_count = 0
failed_count = 0 failed_count = 0
for webhook_request in stuck_webhooks: for webhook_request in stuck_webhooks:
webhook_id = webhook_request.id webhook_id = webhook_request.id
webhook_endpoint = webhook_request.webhook_endpoint webhook_endpoint = webhook_request.webhook_endpoint
if not webhook_endpoint: if not webhook_endpoint:
_LOGGER.error( _LOGGER.error(
"Webhook request %d has no webhook_endpoint, skipping", webhook_id "Webhook request %d has no webhook_endpoint, skipping", webhook_id
) )
webhook_request.status = WebhookStatus.FAILED webhook_request.status = WebhookStatus.FAILED
webhook_request.last_error = ( webhook_request.last_error = (
"No webhook endpoint found during startup reprocessing" "No webhook endpoint found during startup reprocessing"
) )
webhook_request.processing_completed_at = datetime.now(UTC) webhook_request.processing_completed_at = datetime.now(UTC)
failed_count += 1 failed_count += 1
continue continue
if not webhook_request.payload_json: if not webhook_request.payload_json:
_LOGGER.error( _LOGGER.error(
"Webhook request %d has no payload (purged?), marking as failed", "Webhook request %d has no payload (purged?), marking as failed",
webhook_id, webhook_id,
) )
webhook_request.status = WebhookStatus.FAILED webhook_request.status = WebhookStatus.FAILED
webhook_request.last_error = ( webhook_request.last_error = (
"No payload available for reprocessing (purged)" "No payload available for reprocessing (purged)"
) )
webhook_request.processing_completed_at = datetime.now(UTC) webhook_request.processing_completed_at = datetime.now(UTC)
failed_count += 1 failed_count += 1
continue continue
try: try:
_LOGGER.info( _LOGGER.info(
"Reprocessing webhook %d (hotel=%s, type=%s)", "Reprocessing webhook %d (hotel=%s, type=%s)",
webhook_id, webhook_id,
webhook_endpoint.hotel_id, webhook_endpoint.hotel_id,
webhook_endpoint.webhook_type, webhook_endpoint.webhook_type,
)
# Get processor for webhook_type
processor = webhook_registry.get_processor(
webhook_endpoint.webhook_type
)
if not processor:
raise ValueError(
f"No processor for type: {webhook_endpoint.webhook_type}"
) )
# Reprocess webhook with simplified interface # Get processor for webhook_type
await processor.process( processor = webhook_registry.get_processor(
webhook_request=webhook_request, webhook_endpoint.webhook_type
db_session=session, )
config=config, if not processor:
) raise ValueError(
f"No processor for type: {webhook_endpoint.webhook_type}"
)
# Update status to completed # Reprocess webhook with simplified interface
webhook_request.status = WebhookStatus.COMPLETED result = await processor.process(
webhook_request.processing_completed_at = datetime.now(UTC) webhook_request=webhook_request,
reprocessed_count += 1 db_session=session,
config=config,
)
_LOGGER.info("Successfully reprocessed webhook %d", webhook_id) # Check result status
result_status = result.get("status") if isinstance(result, dict) else "success"
except Exception as e: if result_status == "duplicate":
_LOGGER.exception("Failed to reprocess webhook %d: %s", webhook_id, e) # Duplicate is not an error - mark as completed and continue
webhook_request.status = WebhookStatus.FAILED webhook_request.status = WebhookStatus.COMPLETED
webhook_request.last_error = ( webhook_request.processing_completed_at = datetime.now(UTC)
f"Reprocessing failed during startup: {str(e)[:1950]}" reprocessed_count += 1
) _LOGGER.info(
webhook_request.processing_completed_at = datetime.now(UTC) "Webhook %d was a duplicate (already processed), marked as completed",
failed_count += 1 webhook_id
)
elif result_status in ("success", "completed"):
# Update status to completed
webhook_request.status = WebhookStatus.COMPLETED
webhook_request.processing_completed_at = datetime.now(UTC)
reprocessed_count += 1
_LOGGER.info("Successfully reprocessed webhook %d", webhook_id)
else:
# Unexpected status - treat as failure
_LOGGER.warning(
"Webhook %d returned unexpected status: %s",
webhook_id,
result_status
)
webhook_request.status = WebhookStatus.FAILED
webhook_request.last_error = f"Unexpected status: {result_status}"
webhook_request.processing_completed_at = datetime.now(UTC)
failed_count += 1
# Commit all changes except Exception as e:
await session.commit() _LOGGER.exception("Failed to reprocess webhook %d: %s", webhook_id, e)
webhook_request.status = WebhookStatus.FAILED
webhook_request.last_error = (
f"Reprocessing failed during startup: {str(e)[:1950]}"
)
webhook_request.processing_completed_at = datetime.now(UTC)
failed_count += 1
_LOGGER.info( # Commit all changes
"Webhook reprocessing complete: %d successful, %d failed", await session.commit()
reprocessed_count,
failed_count, _LOGGER.info(
"Webhook reprocessing complete: %d successful, %d failed",
reprocessed_count,
failed_count,
)
except Exception as e:
# CRITICAL: Never let reprocessing block application startup
_LOGGER.exception(
"CRITICAL ERROR during webhook reprocessing, but allowing app to start: %s",
e
) )

View File

@@ -273,10 +273,28 @@ async def process_wix_form_submission(
reservation, db_customer.id reservation, db_customer.id
) )
except IntegrityError as e: except IntegrityError as e:
_LOGGER.exception("Database integrity error creating reservation: %s", e) await db_session.rollback()
raise HTTPException( # Check if this is a duplicate (unique constraint violation)
status_code=500, detail="Database error creating reservation" error_msg = str(e.orig) if hasattr(e, 'orig') else str(e)
) from e is_duplicate = any(keyword in error_msg.lower() for keyword in ['unique', 'duplicate', 'already exists'])
if is_duplicate:
_LOGGER.info(
"Duplicate reservation detected for unique_id=%s, skipping (this is expected for reprocessing)",
unique_id
)
return {
"status": "duplicate",
"message": "Reservation already exists (duplicate submission)",
"unique_id": unique_id,
"timestamp": timestamp,
}
else:
# Real integrity error (not a duplicate)
_LOGGER.exception("Database integrity error creating reservation: %s", e)
raise HTTPException(
status_code=500, detail="Database error creating reservation"
) from e
async def push_event(): async def push_event():
# Fire event for listeners (push, etc.) - hotel-specific dispatch # Fire event for listeners (push, etc.) - hotel-specific dispatch
@@ -581,9 +599,33 @@ async def process_generic_webhook_submission(
# Use ReservationService to create reservation # Use ReservationService to create reservation
reservation_service = ReservationService(db_session) reservation_service = ReservationService(db_session)
db_reservation = await reservation_service.create_reservation( try:
reservation, db_customer.id db_reservation = await reservation_service.create_reservation(
) reservation, db_customer.id
)
except IntegrityError as e:
await db_session.rollback()
# Check if this is a duplicate (unique constraint violation)
error_msg = str(e.orig) if hasattr(e, 'orig') else str(e)
is_duplicate = any(keyword in error_msg.lower() for keyword in ['unique', 'duplicate', 'already exists'])
if is_duplicate:
_LOGGER.info(
"Duplicate reservation detected for unique_id=%s, skipping (this is expected for reprocessing)",
unique_id
)
return {
"status": "duplicate",
"message": "Reservation already exists (duplicate submission)",
"unique_id": unique_id,
"timestamp": timestamp,
}
else:
# Real integrity error (not a duplicate)
_LOGGER.exception("Database integrity error creating reservation: %s", e)
raise HTTPException(
status_code=500, detail="Database error creating reservation"
) from e
async def push_event(): async def push_event():
# Fire event for listeners (push, etc.) - hotel-specific dispatch # Fire event for listeners (push, etc.) - hotel-specific dispatch

View File

@@ -0,0 +1,337 @@
"""Tests for webhook duplicate handling and reprocessing.
This module tests:
- Duplicate detection during normal operation
- Duplicate handling during app startup reprocessing
- Stuck webhooks that are duplicates
"""
import asyncio
import uuid
from datetime import UTC, datetime
from pathlib import Path
from unittest.mock import patch
import pytest
import pytest_asyncio
from fastapi.testclient import TestClient
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from alpine_bits_python.api import app
from alpine_bits_python.const import WebhookStatus
from alpine_bits_python.db import Base, Reservation, WebhookRequest
from alpine_bits_python.db_setup import reprocess_stuck_webhooks
@pytest_asyncio.fixture
async def test_db_engine():
"""Create an in-memory SQLite database for testing."""
engine = create_async_engine(
"sqlite+aiosqlite:///:memory:",
echo=False,
)
# Create tables
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
# Cleanup
await engine.dispose()
@pytest.fixture
def test_config():
"""Test configuration."""
return {
"server": {
"codecontext": "ADVERTISING",
"code": "70597314",
"companyname": "99tales Gmbh",
"res_id_source_context": "99tales",
},
"alpine_bits_auth": [
{
"hotel_id": "HOTEL123",
"hotel_name": "Test Hotel",
"username": "testuser",
"password": "testpass",
}
],
"default_hotel_code": "HOTEL123",
"default_hotel_name": "Test Hotel",
"database": {"url": "sqlite+aiosqlite:///:memory:"},
}
@pytest.fixture
def sample_wix_form_data():
"""Sample Wix form submission data with FIXED submissionId for duplicate testing."""
return {
"data": {
"submissionId": "FIXED-DUPLICATE-TEST-ID", # Fixed ID to trigger duplicates
"submissionTime": "2025-10-07T05:48:41.855Z",
"contact": {
"name": {"first": "John", "last": "Doe"},
"email": "john.doe.duplicate.test@example.com",
"phones": [{"e164Phone": "+1234567890"}],
"locale": "en-US",
"contactId": "contact-duplicate-test",
},
"field:anrede": "Mr.",
"field:form_field_5a7b": True,
"field:date_picker_a7c8": "2024-12-25",
"field:date_picker_7e65": "2024-12-31",
"field:number_7cf5": "2",
"field:anzahl_kinder": "1",
"field:alter_kind_1": "8",
"field:angebot_auswaehlen": "Christmas Special",
"field:utm_source": "google",
"field:utm_medium": "cpc",
"field:utm_campaign": "winter2024",
"field:fbclid": "test_fbclid_123",
"field:long_answer_3524": "Late check-in please",
}
}
class TestWebhookDuplicateHandling:
"""Test duplicate webhook handling during normal operation."""
def test_duplicate_webhook_during_operation(self, test_config, sample_wix_form_data):
"""Test that sending the same webhook twice handles duplicates gracefully."""
# Create engine and tables
engine = create_async_engine(
"sqlite+aiosqlite:///:memory:",
echo=False,
)
async def create_tables():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
asyncio.run(create_tables())
# Mock config and database to use our test database
with patch("alpine_bits_python.api.load_config", return_value=test_config), \
patch("alpine_bits_python.api.create_database_engine", return_value=engine):
from alpine_bits_python.alpinebits_server import AlpineBitsServer
# Setup app state
app.state.engine = engine
app.state.async_sessionmaker = async_sessionmaker(
engine, expire_on_commit=False
)
app.state.config = test_config
app.state.alpine_bits_server = AlpineBitsServer(test_config)
with TestClient(app) as client:
# First submission - should succeed
response1 = client.post(
"/api/webhook/wix-form",
json=sample_wix_form_data
)
assert response1.status_code == 200
data1 = response1.json()
assert data1["status"] == "success"
# Second submission with same data - should detect duplicate at API level
response2 = client.post(
"/api/webhook/wix-form",
json=sample_wix_form_data
)
assert response2.status_code == 200
data2 = response2.json()
# API returns success for already-processed webhooks, but sets duplicate flag
assert data2["status"] == "success"
assert data2.get("duplicate") is True
assert "already processed" in data2["message"].lower()
# Cleanup
asyncio.run(engine.dispose())
class TestWebhookReprocessing:
"""Test webhook reprocessing on app restart."""
@pytest.mark.asyncio
async def test_reprocess_stuck_duplicate_webhook(self, test_db_engine, test_config):
"""Test that stuck webhooks that are duplicates are handled correctly on restart."""
AsyncSessionLocal = async_sessionmaker(test_db_engine, expire_on_commit=False)
# Step 1: Process a webhook normally to create a reservation
from alpine_bits_python.webhook_processor import process_wix_form_submission
test_data = {
"data": {
"submissionId": "STUCK-WEBHOOK-TEST-ID",
"submissionTime": "2025-10-07T05:48:41.855Z",
"contact": {
"name": {"first": "Jane", "last": "Smith"},
"email": "jane.smith@example.com",
"phones": [{"e164Phone": "+9876543210"}],
"locale": "en-US",
"contactId": "contact-stuck-test",
},
"field:date_picker_a7c8": "2024-12-25",
"field:date_picker_7e65": "2024-12-31",
"field:number_7cf5": "2",
"field:anzahl_kinder": "0",
}
}
async with AsyncSessionLocal() as session:
result = await process_wix_form_submission(
test_data, session, config=test_config
)
await session.commit()
assert result["status"] == "success"
# Step 2: Verify the reservation was created
async with AsyncSessionLocal() as session:
stmt = select(Reservation).where(
Reservation.unique_id == "STUCK-WEBHOOK-TEST-ID"
)
result = await session.execute(stmt)
reservation = result.scalar_one_or_none()
assert reservation is not None
assert reservation.unique_id == "STUCK-WEBHOOK-TEST-ID"
# Step 3: Manually create a webhook request stuck in "processing" status
# This simulates a webhook that was being processed when the app crashed
from alpine_bits_python.db import WebhookEndpoint, Hotel
async with AsyncSessionLocal() as session:
# Create hotel
hotel = Hotel(
hotel_id="HOTEL123",
hotel_name="Test Hotel",
username="testuser",
password_hash="dummy",
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
is_active=True,
)
session.add(hotel)
await session.flush()
# Create webhook endpoint
endpoint = WebhookEndpoint(
hotel_id="HOTEL123",
webhook_type="wix_form",
webhook_secret="test-secret-123",
is_enabled=True,
created_at=datetime.now(UTC),
)
session.add(endpoint)
await session.flush()
# Create stuck webhook request with the SAME payload
stuck_webhook = WebhookRequest(
webhook_endpoint_id=endpoint.id,
hotel_id="HOTEL123",
payload_json=test_data,
status=WebhookStatus.PROCESSING, # Stuck in processing!
created_at=datetime.now(UTC),
)
session.add(stuck_webhook)
await session.commit()
# Step 4: Run reprocessing (simulates app restart)
await reprocess_stuck_webhooks(AsyncSessionLocal, test_config)
# Step 5: Verify the stuck webhook was marked as completed (not failed)
async with AsyncSessionLocal() as session:
stmt = select(WebhookRequest).where(
WebhookRequest.status == WebhookStatus.COMPLETED
)
result = await session.execute(stmt)
completed_webhooks = result.scalars().all()
assert len(completed_webhooks) == 1
assert completed_webhooks[0].last_error is None
# Verify no failed webhooks
stmt = select(WebhookRequest).where(
WebhookRequest.status == WebhookStatus.FAILED
)
result = await session.execute(stmt)
failed_webhooks = result.scalars().all()
assert len(failed_webhooks) == 0
# Step 6: Verify only ONE reservation exists (no duplicate)
async with AsyncSessionLocal() as session:
stmt = select(Reservation)
result = await session.execute(stmt)
reservations = result.scalars().all()
assert len(reservations) == 1
class TestWebhookReprocessingNeverBlocksStartup:
"""Test that reprocessing never blocks app startup."""
@pytest.mark.asyncio
async def test_reprocessing_error_does_not_block_startup(
self, test_db_engine, test_config
):
"""Test that even if reprocessing fails, app startup continues."""
AsyncSessionLocal = async_sessionmaker(test_db_engine, expire_on_commit=False)
from alpine_bits_python.db import WebhookEndpoint, Hotel
# Create a stuck webhook with invalid data that will cause processing to fail
async with AsyncSessionLocal() as session:
# Create hotel
hotel = Hotel(
hotel_id="HOTEL123",
hotel_name="Test Hotel",
username="testuser",
password_hash="dummy",
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
is_active=True,
)
session.add(hotel)
await session.flush()
# Create webhook endpoint
endpoint = WebhookEndpoint(
hotel_id="HOTEL123",
webhook_type="wix_form",
webhook_secret="test-secret-123",
is_enabled=True,
created_at=datetime.now(UTC),
)
session.add(endpoint)
await session.flush()
# Create stuck webhook with INVALID data (missing required fields)
stuck_webhook = WebhookRequest(
webhook_endpoint_id=endpoint.id,
hotel_id="HOTEL123",
payload_json={"data": {"invalid": "data"}}, # Missing required fields
status=WebhookStatus.PROCESSING,
received_at=datetime.now(UTC),
)
session.add(stuck_webhook)
await session.commit()
# This should NOT raise an exception - it should log and continue
try:
await reprocess_stuck_webhooks(AsyncSessionLocal, test_config)
except Exception as e:
pytest.fail(
f"reprocess_stuck_webhooks should NEVER raise exceptions, but got: {e}"
)
# Verify the webhook was marked as failed
async with AsyncSessionLocal() as session:
stmt = select(WebhookRequest).where(
WebhookRequest.status == WebhookStatus.FAILED
)
result = await session.execute(stmt)
failed_webhooks = result.scalars().all()
assert len(failed_webhooks) == 1
assert failed_webhooks[0].last_error is not None