concurrency-fix #15

Merged
jonas merged 24 commits from concurrency-fix into main 2025-12-01 13:34:35 +00:00
3 changed files with 26 additions and 37 deletions
Showing only changes of commit 9830ca7cf9 - Show all commits

View File

@@ -914,6 +914,7 @@ async def handle_webhook_unified(
webhook_request=webhook_request, webhook_request=webhook_request,
db_session=db_session, db_session=db_session,
config=request.app.state.config, config=request.app.state.config,
event_dispatcher=request.app.state.event_dispatcher,
) )
# 8. Update status # 8. Update status

View File

@@ -4,7 +4,7 @@ import asyncio
from datetime import date, datetime from datetime import date, datetime
from typing import Any, Protocol from typing import Any, Protocol
from fastapi import HTTPException, Request from fastapi import HTTPException
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
@@ -14,7 +14,7 @@ from alpine_bits_python.customer_service import CustomerService
from alpine_bits_python.reservation_service import ReservationService from alpine_bits_python.reservation_service import ReservationService
from alpine_bits_python.schemas import ReservationData from alpine_bits_python.schemas import ReservationData
from .db import Hotel, WebhookRequest from .db import WebhookRequest
from .logging_config import get_logger from .logging_config import get_logger
_LOGGER = get_logger(__name__) _LOGGER = get_logger(__name__)
@@ -33,6 +33,7 @@ class WebhookProcessorProtocol(Protocol):
webhook_request: WebhookRequest, webhook_request: WebhookRequest,
db_session: AsyncSession, db_session: AsyncSession,
config: dict[str, Any] | None = None, config: dict[str, Any] | None = None,
event_dispatcher: Any | None = None,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Process webhook payload. """Process webhook payload.
@@ -40,6 +41,7 @@ class WebhookProcessorProtocol(Protocol):
webhook_request: WebhookRequest database record (contains payload_json and hotel_id) webhook_request: WebhookRequest database record (contains payload_json and hotel_id)
db_session: Database session db_session: Database session
config: Application configuration (optional) config: Application configuration (optional)
event_dispatcher: Event dispatcher for push notifications (optional)
Returns: Returns:
Response dict with status, message, customer_id, reservation_id Response dict with status, message, customer_id, reservation_id
@@ -82,20 +84,18 @@ class WebhookProcessorRegistry:
async def process_wix_form_submission( async def process_wix_form_submission(
request: Request | None,
data: dict[str, Any], data: dict[str, Any],
db, db_session: AsyncSession,
config: dict[str, Any] | None = None, config: dict[str, Any] | None = None,
hotel_id: str | None = None, hotel_id: str | None = None,
event_dispatcher=None, event_dispatcher: Any | None = None,
): ):
"""Shared business logic for handling Wix form submissions (test and production). """Shared business logic for handling Wix form submissions (test and production).
Args: Args:
request: FastAPI Request object (can be None during startup reprocessing)
data: Webhook payload data data: Webhook payload data
db: Database session db_session: Database session
config: Application config (optional, extracted from request if not provided) config: Application config (optional)
hotel_id: Hotel ID (optional, will use from data or config default if not provided) hotel_id: Hotel ID (optional, will use from data or config default if not provided)
event_dispatcher: Event dispatcher for push notifications (optional) event_dispatcher: Event dispatcher for push notifications (optional)
""" """
@@ -103,12 +103,6 @@ async def process_wix_form_submission(
_LOGGER.info("Received Wix form data at %s", timestamp) _LOGGER.info("Received Wix form data at %s", timestamp)
# Extract config and event_dispatcher from request if not provided
if config is None and request is not None:
config = request.app.state.config
if event_dispatcher is None and request is not None:
event_dispatcher = getattr(request.app.state, "event_dispatcher", None)
# Provide fallback config if still None # Provide fallback config if still None
if config is None: if config is None:
config = {} config = {}
@@ -188,7 +182,7 @@ async def process_wix_form_submission(
unique_id = data.get("submissionId", generate_unique_id()) unique_id = data.get("submissionId", generate_unique_id())
# Use CustomerService to handle customer creation/update with hashing # Use CustomerService to handle customer creation/update with hashing
customer_service = CustomerService(db) customer_service = CustomerService(db_session)
customer_data = { customer_data = {
"given_name": first_name, "given_name": first_name,
@@ -273,7 +267,7 @@ async def process_wix_form_submission(
raise HTTPException(status_code=400, detail="Failed to generate md5_unique_id") raise HTTPException(status_code=400, detail="Failed to generate md5_unique_id")
# Use ReservationService to create reservation # Use ReservationService to create reservation
reservation_service = ReservationService(db) reservation_service = ReservationService(db_session)
try: try:
db_reservation = await reservation_service.create_reservation( db_reservation = await reservation_service.create_reservation(
reservation, db_customer.id reservation, db_customer.id
@@ -327,6 +321,7 @@ class WixFormProcessor:
webhook_request: WebhookRequest, webhook_request: WebhookRequest,
db_session: AsyncSession, db_session: AsyncSession,
config: dict[str, Any] | None = None, config: dict[str, Any] | None = None,
event_dispatcher: Any | None = None,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Process Wix form webhook payload. """Process Wix form webhook payload.
@@ -334,6 +329,7 @@ class WixFormProcessor:
webhook_request: WebhookRequest database record webhook_request: WebhookRequest database record
db_session: Database session db_session: Database session
config: Application configuration (optional) config: Application configuration (optional)
event_dispatcher: Event dispatcher for push notifications (optional)
Returns: Returns:
Response dict with status and details Response dict with status and details
@@ -341,32 +337,29 @@ class WixFormProcessor:
""" """
# Call processing function with data from webhook_request # Call processing function with data from webhook_request
result = await process_wix_form_submission( result = await process_wix_form_submission(
request=None, # No request context needed
data=webhook_request.payload_json, data=webhook_request.payload_json,
db=db_session, db_session=db_session,
config=config, config=config,
hotel_id=webhook_request.hotel_id, hotel_id=webhook_request.hotel_id,
event_dispatcher=None, # No push events during reprocessing event_dispatcher=event_dispatcher,
) )
return result return result
async def process_generic_webhook_submission( async def process_generic_webhook_submission(
request: Request | None,
data: dict[str, Any], data: dict[str, Any],
db, db_session: AsyncSession,
config: dict[str, Any] | None = None, config: dict[str, Any] | None = None,
hotel_id: str | None = None, hotel_id: str | None = None,
event_dispatcher=None, event_dispatcher: Any | None = None,
): ):
"""Process generic webhook submissions with nested structure. """Process generic webhook submissions with nested structure.
Args: Args:
request: FastAPI Request object (can be None during startup reprocessing)
data: Webhook payload data data: Webhook payload data
db: Database session db_session: Database session
config: Application config (optional, extracted from request if not provided) config: Application config (optional)
hotel_id: Hotel ID (optional, will use from data or config default) hotel_id: Hotel ID (optional, will use from data or config default)
event_dispatcher: Event dispatcher for push notifications (optional) event_dispatcher: Event dispatcher for push notifications (optional)
@@ -402,12 +395,6 @@ async def process_generic_webhook_submission(
timestamp = datetime.now().isoformat() timestamp = datetime.now().isoformat()
_LOGGER.info("Processing generic webhook submission at %s", timestamp) _LOGGER.info("Processing generic webhook submission at %s", timestamp)
# Extract config and event_dispatcher from request if not provided
if config is None and request is not None:
config = request.app.state.config
if event_dispatcher is None and request is not None:
event_dispatcher = getattr(request.app.state, "event_dispatcher", None)
# Provide fallback config if still None # Provide fallback config if still None
if config is None: if config is None:
config = {} config = {}
@@ -531,7 +518,7 @@ async def process_generic_webhook_submission(
unique_id = generate_unique_id() unique_id = generate_unique_id()
# Use CustomerService to handle customer creation/update with hashing # Use CustomerService to handle customer creation/update with hashing
customer_service = CustomerService(db) customer_service = CustomerService(db_session)
customer_data = { customer_data = {
"given_name": first_name, "given_name": first_name,
@@ -593,7 +580,7 @@ async def process_generic_webhook_submission(
raise HTTPException(status_code=400, detail="Failed to generate md5_unique_id") raise HTTPException(status_code=400, detail="Failed to generate md5_unique_id")
# Use ReservationService to create reservation # Use ReservationService to create reservation
reservation_service = ReservationService(db) reservation_service = ReservationService(db_session)
db_reservation = await reservation_service.create_reservation( db_reservation = await reservation_service.create_reservation(
reservation, db_customer.id reservation, db_customer.id
) )
@@ -646,6 +633,7 @@ class GenericWebhookProcessor:
webhook_request: WebhookRequest, webhook_request: WebhookRequest,
db_session: AsyncSession, db_session: AsyncSession,
config: dict[str, Any] | None = None, config: dict[str, Any] | None = None,
event_dispatcher: Any | None = None,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Process generic webhook payload. """Process generic webhook payload.
@@ -653,6 +641,7 @@ class GenericWebhookProcessor:
webhook_request: WebhookRequest database record webhook_request: WebhookRequest database record
db_session: Database session db_session: Database session
config: Application configuration (optional) config: Application configuration (optional)
event_dispatcher: Event dispatcher for push notifications (optional)
Returns: Returns:
Response dict with status and details Response dict with status and details
@@ -660,12 +649,11 @@ class GenericWebhookProcessor:
""" """
# Call processing function with data from webhook_request # Call processing function with data from webhook_request
result = await process_generic_webhook_submission( result = await process_generic_webhook_submission(
request=None, # No request context needed
data=webhook_request.payload_json, data=webhook_request.payload_json,
db=db_session, db_session=db_session,
config=config, config=config,
hotel_id=webhook_request.hotel_id, hotel_id=webhook_request.hotel_id,
event_dispatcher=None, # No push events during reprocessing event_dispatcher=event_dispatcher,
) )
return result return result

View File

@@ -260,7 +260,7 @@
"field:angebot_auswaehlen": "Zimmer: Doppelzimmer", "field:angebot_auswaehlen": "Zimmer: Doppelzimmer",
"field:utm_content": "", "field:utm_content": "",
"field:last_name_d97c": "Pohl", "field:last_name_d97c": "Pohl",
"field:hotelid": "39054_001", "field:hotelid": "135",
"submissionsLink": "https://manage.wix.app/forms/submissions/1dea821c-8168-4736-96e4-4b92e8b364cf/e084006b-ae83-4e4d-b2f5-074118cdb3b1?d=https%3A%2F%2Fmanage.wix.com%2Fdashboard%2F1dea821c-8168-4736-96e4-4b92e8b364cf%2Fwix-forms%2Fform%2Fe084006b-ae83-4e4d-b2f5-074118cdb3b1%2Fsubmissions&s=true", "submissionsLink": "https://manage.wix.app/forms/submissions/1dea821c-8168-4736-96e4-4b92e8b364cf/e084006b-ae83-4e4d-b2f5-074118cdb3b1?d=https%3A%2F%2Fmanage.wix.com%2Fdashboard%2F1dea821c-8168-4736-96e4-4b92e8b364cf%2Fwix-forms%2Fform%2Fe084006b-ae83-4e4d-b2f5-074118cdb3b1%2Fsubmissions&s=true",
"field:gbraid": "0AAAAADxR52Ad0oCzeogeTrupgGeMwD7Yp", "field:gbraid": "0AAAAADxR52Ad0oCzeogeTrupgGeMwD7Yp",
"field:fbclid": "", "field:fbclid": "",