From c402b28b72117df297635cbeb7b008229a8c3ba9 Mon Sep 17 00:00:00 2001 From: Jonas Linter <{email_address}> Date: Thu, 27 Nov 2025 19:35:30 +0100 Subject: [PATCH] Duplicate detection improved but refactoring necessary to make the whole thing more managable --- src/alpine_bits_python/db_setup.py | 199 +++++++----- src/alpine_bits_python/webhook_processor.py | 56 +++- tests/test_webhook_duplicates.py | 337 ++++++++++++++++++++ 3 files changed, 502 insertions(+), 90 deletions(-) create mode 100644 tests/test_webhook_duplicates.py diff --git a/src/alpine_bits_python/db_setup.py b/src/alpine_bits_python/db_setup.py index 82b69fa..bb526ca 100644 --- a/src/alpine_bits_python/db_setup.py +++ b/src/alpine_bits_python/db_setup.py @@ -249,110 +249,143 @@ async def reprocess_stuck_webhooks( These are webhooks that were not fully processed in the previous run, likely due to a crash or unexpected shutdown. + This function is designed to NEVER block application startup. + All errors are caught and logged, but the app will start regardless. + Args: sessionmaker: SQLAlchemy async sessionmaker config: Application configuration dictionary """ - _LOGGER.info("Checking for stuck webhooks to reprocess...") + try: + _LOGGER.info("Checking for stuck webhooks to reprocess...") - async with sessionmaker() as session: - # Find all webhooks stuck in 'processing' state - result = await session.execute( - select(WebhookRequest) - .where(WebhookRequest.status == WebhookStatus.PROCESSING) - .options( - selectinload(WebhookRequest.webhook_endpoint).selectinload( - WebhookEndpoint.hotel + async with sessionmaker() as session: + # Find all webhooks stuck in 'processing' state + result = await session.execute( + select(WebhookRequest) + .where(WebhookRequest.status == WebhookStatus.PROCESSING) + .options( + selectinload(WebhookRequest.webhook_endpoint).selectinload( + WebhookEndpoint.hotel + ) ) ) - ) - stuck_webhooks = result.scalars().all() + stuck_webhooks = result.scalars().all() - if not stuck_webhooks: - _LOGGER.info("No stuck webhooks found") - return + if not stuck_webhooks: + _LOGGER.info("No stuck webhooks found") + return - _LOGGER.info("Found %d stuck webhooks to reprocess", len(stuck_webhooks)) + _LOGGER.info("Found %d stuck webhooks to reprocess", len(stuck_webhooks)) - reprocessed_count = 0 - failed_count = 0 + reprocessed_count = 0 + failed_count = 0 - for webhook_request in stuck_webhooks: - webhook_id = webhook_request.id - webhook_endpoint = webhook_request.webhook_endpoint + for webhook_request in stuck_webhooks: + webhook_id = webhook_request.id + webhook_endpoint = webhook_request.webhook_endpoint - if not webhook_endpoint: - _LOGGER.error( - "Webhook request %d has no webhook_endpoint, skipping", webhook_id - ) - webhook_request.status = WebhookStatus.FAILED - webhook_request.last_error = ( - "No webhook endpoint found during startup reprocessing" - ) - webhook_request.processing_completed_at = datetime.now(UTC) - failed_count += 1 - continue + if not webhook_endpoint: + _LOGGER.error( + "Webhook request %d has no webhook_endpoint, skipping", webhook_id + ) + webhook_request.status = WebhookStatus.FAILED + webhook_request.last_error = ( + "No webhook endpoint found during startup reprocessing" + ) + webhook_request.processing_completed_at = datetime.now(UTC) + failed_count += 1 + continue - if not webhook_request.payload_json: - _LOGGER.error( - "Webhook request %d has no payload (purged?), marking as failed", - webhook_id, - ) - webhook_request.status = WebhookStatus.FAILED - webhook_request.last_error = ( - "No payload available for reprocessing (purged)" - ) - webhook_request.processing_completed_at = datetime.now(UTC) - failed_count += 1 - continue + if not webhook_request.payload_json: + _LOGGER.error( + "Webhook request %d has no payload (purged?), marking as failed", + webhook_id, + ) + webhook_request.status = WebhookStatus.FAILED + webhook_request.last_error = ( + "No payload available for reprocessing (purged)" + ) + webhook_request.processing_completed_at = datetime.now(UTC) + failed_count += 1 + continue - try: - _LOGGER.info( - "Reprocessing webhook %d (hotel=%s, type=%s)", - webhook_id, - webhook_endpoint.hotel_id, - webhook_endpoint.webhook_type, - ) - - # Get processor for webhook_type - processor = webhook_registry.get_processor( - webhook_endpoint.webhook_type - ) - if not processor: - raise ValueError( - f"No processor for type: {webhook_endpoint.webhook_type}" + try: + _LOGGER.info( + "Reprocessing webhook %d (hotel=%s, type=%s)", + webhook_id, + webhook_endpoint.hotel_id, + webhook_endpoint.webhook_type, ) - # Reprocess webhook with simplified interface - await processor.process( - webhook_request=webhook_request, - db_session=session, - config=config, - ) + # Get processor for webhook_type + processor = webhook_registry.get_processor( + webhook_endpoint.webhook_type + ) + if not processor: + raise ValueError( + f"No processor for type: {webhook_endpoint.webhook_type}" + ) - # Update status to completed - webhook_request.status = WebhookStatus.COMPLETED - webhook_request.processing_completed_at = datetime.now(UTC) - reprocessed_count += 1 + # Reprocess webhook with simplified interface + result = await processor.process( + webhook_request=webhook_request, + db_session=session, + config=config, + ) - _LOGGER.info("Successfully reprocessed webhook %d", webhook_id) + # Check result status + result_status = result.get("status") if isinstance(result, dict) else "success" - except Exception as e: - _LOGGER.exception("Failed to reprocess webhook %d: %s", webhook_id, e) - webhook_request.status = WebhookStatus.FAILED - webhook_request.last_error = ( - f"Reprocessing failed during startup: {str(e)[:1950]}" - ) - webhook_request.processing_completed_at = datetime.now(UTC) - failed_count += 1 + if result_status == "duplicate": + # Duplicate is not an error - mark as completed and continue + webhook_request.status = WebhookStatus.COMPLETED + webhook_request.processing_completed_at = datetime.now(UTC) + reprocessed_count += 1 + _LOGGER.info( + "Webhook %d was a duplicate (already processed), marked as completed", + webhook_id + ) + elif result_status in ("success", "completed"): + # Update status to completed + webhook_request.status = WebhookStatus.COMPLETED + webhook_request.processing_completed_at = datetime.now(UTC) + reprocessed_count += 1 + _LOGGER.info("Successfully reprocessed webhook %d", webhook_id) + else: + # Unexpected status - treat as failure + _LOGGER.warning( + "Webhook %d returned unexpected status: %s", + webhook_id, + result_status + ) + webhook_request.status = WebhookStatus.FAILED + webhook_request.last_error = f"Unexpected status: {result_status}" + webhook_request.processing_completed_at = datetime.now(UTC) + failed_count += 1 - # Commit all changes - await session.commit() + except Exception as e: + _LOGGER.exception("Failed to reprocess webhook %d: %s", webhook_id, e) + webhook_request.status = WebhookStatus.FAILED + webhook_request.last_error = ( + f"Reprocessing failed during startup: {str(e)[:1950]}" + ) + webhook_request.processing_completed_at = datetime.now(UTC) + failed_count += 1 - _LOGGER.info( - "Webhook reprocessing complete: %d successful, %d failed", - reprocessed_count, - failed_count, + # Commit all changes + await session.commit() + + _LOGGER.info( + "Webhook reprocessing complete: %d successful, %d failed", + reprocessed_count, + failed_count, + ) + except Exception as e: + # CRITICAL: Never let reprocessing block application startup + _LOGGER.exception( + "CRITICAL ERROR during webhook reprocessing, but allowing app to start: %s", + e ) diff --git a/src/alpine_bits_python/webhook_processor.py b/src/alpine_bits_python/webhook_processor.py index 4e37c15..293add5 100644 --- a/src/alpine_bits_python/webhook_processor.py +++ b/src/alpine_bits_python/webhook_processor.py @@ -273,10 +273,28 @@ async def process_wix_form_submission( reservation, db_customer.id ) except IntegrityError as e: - _LOGGER.exception("Database integrity error creating reservation: %s", e) - raise HTTPException( - status_code=500, detail="Database error creating reservation" - ) from e + await db_session.rollback() + # Check if this is a duplicate (unique constraint violation) + error_msg = str(e.orig) if hasattr(e, 'orig') else str(e) + is_duplicate = any(keyword in error_msg.lower() for keyword in ['unique', 'duplicate', 'already exists']) + + if is_duplicate: + _LOGGER.info( + "Duplicate reservation detected for unique_id=%s, skipping (this is expected for reprocessing)", + unique_id + ) + return { + "status": "duplicate", + "message": "Reservation already exists (duplicate submission)", + "unique_id": unique_id, + "timestamp": timestamp, + } + else: + # Real integrity error (not a duplicate) + _LOGGER.exception("Database integrity error creating reservation: %s", e) + raise HTTPException( + status_code=500, detail="Database error creating reservation" + ) from e async def push_event(): # Fire event for listeners (push, etc.) - hotel-specific dispatch @@ -581,9 +599,33 @@ async def process_generic_webhook_submission( # Use ReservationService to create reservation reservation_service = ReservationService(db_session) - db_reservation = await reservation_service.create_reservation( - reservation, db_customer.id - ) + try: + db_reservation = await reservation_service.create_reservation( + reservation, db_customer.id + ) + except IntegrityError as e: + await db_session.rollback() + # Check if this is a duplicate (unique constraint violation) + error_msg = str(e.orig) if hasattr(e, 'orig') else str(e) + is_duplicate = any(keyword in error_msg.lower() for keyword in ['unique', 'duplicate', 'already exists']) + + if is_duplicate: + _LOGGER.info( + "Duplicate reservation detected for unique_id=%s, skipping (this is expected for reprocessing)", + unique_id + ) + return { + "status": "duplicate", + "message": "Reservation already exists (duplicate submission)", + "unique_id": unique_id, + "timestamp": timestamp, + } + else: + # Real integrity error (not a duplicate) + _LOGGER.exception("Database integrity error creating reservation: %s", e) + raise HTTPException( + status_code=500, detail="Database error creating reservation" + ) from e async def push_event(): # Fire event for listeners (push, etc.) - hotel-specific dispatch diff --git a/tests/test_webhook_duplicates.py b/tests/test_webhook_duplicates.py new file mode 100644 index 0000000..99c5741 --- /dev/null +++ b/tests/test_webhook_duplicates.py @@ -0,0 +1,337 @@ +"""Tests for webhook duplicate handling and reprocessing. + +This module tests: +- Duplicate detection during normal operation +- Duplicate handling during app startup reprocessing +- Stuck webhooks that are duplicates +""" + +import asyncio +import uuid +from datetime import UTC, datetime +from pathlib import Path +from unittest.mock import patch + +import pytest +import pytest_asyncio +from fastapi.testclient import TestClient +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine + +from alpine_bits_python.api import app +from alpine_bits_python.const import WebhookStatus +from alpine_bits_python.db import Base, Reservation, WebhookRequest +from alpine_bits_python.db_setup import reprocess_stuck_webhooks + + +@pytest_asyncio.fixture +async def test_db_engine(): + """Create an in-memory SQLite database for testing.""" + engine = create_async_engine( + "sqlite+aiosqlite:///:memory:", + echo=False, + ) + + # Create tables + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + yield engine + + # Cleanup + await engine.dispose() + + +@pytest.fixture +def test_config(): + """Test configuration.""" + return { + "server": { + "codecontext": "ADVERTISING", + "code": "70597314", + "companyname": "99tales Gmbh", + "res_id_source_context": "99tales", + }, + "alpine_bits_auth": [ + { + "hotel_id": "HOTEL123", + "hotel_name": "Test Hotel", + "username": "testuser", + "password": "testpass", + } + ], + "default_hotel_code": "HOTEL123", + "default_hotel_name": "Test Hotel", + "database": {"url": "sqlite+aiosqlite:///:memory:"}, + } + + +@pytest.fixture +def sample_wix_form_data(): + """Sample Wix form submission data with FIXED submissionId for duplicate testing.""" + return { + "data": { + "submissionId": "FIXED-DUPLICATE-TEST-ID", # Fixed ID to trigger duplicates + "submissionTime": "2025-10-07T05:48:41.855Z", + "contact": { + "name": {"first": "John", "last": "Doe"}, + "email": "john.doe.duplicate.test@example.com", + "phones": [{"e164Phone": "+1234567890"}], + "locale": "en-US", + "contactId": "contact-duplicate-test", + }, + "field:anrede": "Mr.", + "field:form_field_5a7b": True, + "field:date_picker_a7c8": "2024-12-25", + "field:date_picker_7e65": "2024-12-31", + "field:number_7cf5": "2", + "field:anzahl_kinder": "1", + "field:alter_kind_1": "8", + "field:angebot_auswaehlen": "Christmas Special", + "field:utm_source": "google", + "field:utm_medium": "cpc", + "field:utm_campaign": "winter2024", + "field:fbclid": "test_fbclid_123", + "field:long_answer_3524": "Late check-in please", + } + } + + +class TestWebhookDuplicateHandling: + """Test duplicate webhook handling during normal operation.""" + + def test_duplicate_webhook_during_operation(self, test_config, sample_wix_form_data): + """Test that sending the same webhook twice handles duplicates gracefully.""" + # Create engine and tables + engine = create_async_engine( + "sqlite+aiosqlite:///:memory:", + echo=False, + ) + + async def create_tables(): + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + asyncio.run(create_tables()) + + # Mock config and database to use our test database + with patch("alpine_bits_python.api.load_config", return_value=test_config), \ + patch("alpine_bits_python.api.create_database_engine", return_value=engine): + + from alpine_bits_python.alpinebits_server import AlpineBitsServer + + # Setup app state + app.state.engine = engine + app.state.async_sessionmaker = async_sessionmaker( + engine, expire_on_commit=False + ) + app.state.config = test_config + app.state.alpine_bits_server = AlpineBitsServer(test_config) + + with TestClient(app) as client: + # First submission - should succeed + response1 = client.post( + "/api/webhook/wix-form", + json=sample_wix_form_data + ) + assert response1.status_code == 200 + data1 = response1.json() + assert data1["status"] == "success" + + # Second submission with same data - should detect duplicate at API level + response2 = client.post( + "/api/webhook/wix-form", + json=sample_wix_form_data + ) + assert response2.status_code == 200 + data2 = response2.json() + # API returns success for already-processed webhooks, but sets duplicate flag + assert data2["status"] == "success" + assert data2.get("duplicate") is True + assert "already processed" in data2["message"].lower() + + # Cleanup + asyncio.run(engine.dispose()) + + +class TestWebhookReprocessing: + """Test webhook reprocessing on app restart.""" + + @pytest.mark.asyncio + async def test_reprocess_stuck_duplicate_webhook(self, test_db_engine, test_config): + """Test that stuck webhooks that are duplicates are handled correctly on restart.""" + AsyncSessionLocal = async_sessionmaker(test_db_engine, expire_on_commit=False) + + # Step 1: Process a webhook normally to create a reservation + from alpine_bits_python.webhook_processor import process_wix_form_submission + + test_data = { + "data": { + "submissionId": "STUCK-WEBHOOK-TEST-ID", + "submissionTime": "2025-10-07T05:48:41.855Z", + "contact": { + "name": {"first": "Jane", "last": "Smith"}, + "email": "jane.smith@example.com", + "phones": [{"e164Phone": "+9876543210"}], + "locale": "en-US", + "contactId": "contact-stuck-test", + }, + "field:date_picker_a7c8": "2024-12-25", + "field:date_picker_7e65": "2024-12-31", + "field:number_7cf5": "2", + "field:anzahl_kinder": "0", + } + } + + async with AsyncSessionLocal() as session: + result = await process_wix_form_submission( + test_data, session, config=test_config + ) + await session.commit() + assert result["status"] == "success" + + # Step 2: Verify the reservation was created + async with AsyncSessionLocal() as session: + stmt = select(Reservation).where( + Reservation.unique_id == "STUCK-WEBHOOK-TEST-ID" + ) + result = await session.execute(stmt) + reservation = result.scalar_one_or_none() + assert reservation is not None + assert reservation.unique_id == "STUCK-WEBHOOK-TEST-ID" + + # Step 3: Manually create a webhook request stuck in "processing" status + # This simulates a webhook that was being processed when the app crashed + from alpine_bits_python.db import WebhookEndpoint, Hotel + + async with AsyncSessionLocal() as session: + # Create hotel + hotel = Hotel( + hotel_id="HOTEL123", + hotel_name="Test Hotel", + username="testuser", + password_hash="dummy", + created_at=datetime.now(UTC), + updated_at=datetime.now(UTC), + is_active=True, + ) + session.add(hotel) + await session.flush() + + # Create webhook endpoint + endpoint = WebhookEndpoint( + hotel_id="HOTEL123", + webhook_type="wix_form", + webhook_secret="test-secret-123", + is_enabled=True, + created_at=datetime.now(UTC), + ) + session.add(endpoint) + await session.flush() + + # Create stuck webhook request with the SAME payload + stuck_webhook = WebhookRequest( + webhook_endpoint_id=endpoint.id, + hotel_id="HOTEL123", + payload_json=test_data, + status=WebhookStatus.PROCESSING, # Stuck in processing! + created_at=datetime.now(UTC), + ) + session.add(stuck_webhook) + await session.commit() + + # Step 4: Run reprocessing (simulates app restart) + await reprocess_stuck_webhooks(AsyncSessionLocal, test_config) + + # Step 5: Verify the stuck webhook was marked as completed (not failed) + async with AsyncSessionLocal() as session: + stmt = select(WebhookRequest).where( + WebhookRequest.status == WebhookStatus.COMPLETED + ) + result = await session.execute(stmt) + completed_webhooks = result.scalars().all() + assert len(completed_webhooks) == 1 + assert completed_webhooks[0].last_error is None + + # Verify no failed webhooks + stmt = select(WebhookRequest).where( + WebhookRequest.status == WebhookStatus.FAILED + ) + result = await session.execute(stmt) + failed_webhooks = result.scalars().all() + assert len(failed_webhooks) == 0 + + # Step 6: Verify only ONE reservation exists (no duplicate) + async with AsyncSessionLocal() as session: + stmt = select(Reservation) + result = await session.execute(stmt) + reservations = result.scalars().all() + assert len(reservations) == 1 + + +class TestWebhookReprocessingNeverBlocksStartup: + """Test that reprocessing never blocks app startup.""" + + @pytest.mark.asyncio + async def test_reprocessing_error_does_not_block_startup( + self, test_db_engine, test_config + ): + """Test that even if reprocessing fails, app startup continues.""" + AsyncSessionLocal = async_sessionmaker(test_db_engine, expire_on_commit=False) + + from alpine_bits_python.db import WebhookEndpoint, Hotel + + # Create a stuck webhook with invalid data that will cause processing to fail + async with AsyncSessionLocal() as session: + # Create hotel + hotel = Hotel( + hotel_id="HOTEL123", + hotel_name="Test Hotel", + username="testuser", + password_hash="dummy", + created_at=datetime.now(UTC), + updated_at=datetime.now(UTC), + is_active=True, + ) + session.add(hotel) + await session.flush() + + # Create webhook endpoint + endpoint = WebhookEndpoint( + hotel_id="HOTEL123", + webhook_type="wix_form", + webhook_secret="test-secret-123", + is_enabled=True, + created_at=datetime.now(UTC), + ) + session.add(endpoint) + await session.flush() + + # Create stuck webhook with INVALID data (missing required fields) + stuck_webhook = WebhookRequest( + webhook_endpoint_id=endpoint.id, + hotel_id="HOTEL123", + payload_json={"data": {"invalid": "data"}}, # Missing required fields + status=WebhookStatus.PROCESSING, + received_at=datetime.now(UTC), + ) + session.add(stuck_webhook) + await session.commit() + + # This should NOT raise an exception - it should log and continue + try: + await reprocess_stuck_webhooks(AsyncSessionLocal, test_config) + except Exception as e: + pytest.fail( + f"reprocess_stuck_webhooks should NEVER raise exceptions, but got: {e}" + ) + + # Verify the webhook was marked as failed + async with AsyncSessionLocal() as session: + stmt = select(WebhookRequest).where( + WebhookRequest.status == WebhookStatus.FAILED + ) + result = await session.execute(stmt) + failed_webhooks = result.scalars().all() + assert len(failed_webhooks) == 1 + assert failed_webhooks[0].last_error is not None