From 50ce0ec48623bb9333f51f9da8a217811450beb8 Mon Sep 17 00:00:00 2001 From: Jonas Linter <{email_address}> Date: Wed, 3 Dec 2025 14:13:20 +0100 Subject: [PATCH] Finally fixed greenlet_spawn sqllchemy error. The horror --- src/alpine_bits_python/api.py | 68 ++++++++++++++++++++++------------ tests/test_customer_service.py | 2 +- 2 files changed, 45 insertions(+), 25 deletions(-) diff --git a/src/alpine_bits_python/api.py b/src/alpine_bits_python/api.py index db4b521..20a3b98 100644 --- a/src/alpine_bits_python/api.py +++ b/src/alpine_bits_python/api.py @@ -28,7 +28,7 @@ from fastapi.security import ( from pydantic import BaseModel from slowapi.errors import RateLimitExceeded from sqlalchemy import and_, select, update -from sqlalchemy.ext.asyncio import async_sessionmaker +from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession from sqlalchemy.orm import selectinload from alpine_bits_python.hotel_service import HotelService @@ -704,7 +704,7 @@ async def validate_basic_auth( async def handle_webhook_unified( request: Request, webhook_secret: str, - db_session=Depends(get_async_session), + db_session: AsyncSession = Depends(get_async_session), ): """Unified webhook handler with deduplication and routing. @@ -831,6 +831,9 @@ async def handle_webhook_unified( if not webhook_endpoint: raise HTTPException(status_code=404, detail="Webhook not found") + webhook_endpoint_id = webhook_endpoint.id + webhook_hotel_id = webhook_endpoint.hotel_id + # Verify hotel is active if not webhook_endpoint.hotel.is_active: raise HTTPException(status_code=404, detail="Hotel is not active") @@ -845,8 +848,8 @@ async def handle_webhook_unified( webhook_request_data = WebhookRequestData( payload_json=payload, - webhook_endpoint_id=webhook_endpoint.id, - hotel_id=webhook_endpoint.hotel_id, + webhook_endpoint_id=webhook_endpoint_id, + hotel_id=webhook_hotel_id, status=WebhookStatus.PROCESSING, processing_started_at=timestamp, created_at=timestamp, @@ -908,12 +911,17 @@ async def handle_webhook_unified( db_session.add(webhook_request) await db_session.flush() + webhook_request_id = webhook_request.id + try: # 6. Get processor for webhook_type processor = webhook_registry.get_processor(webhook_endpoint.webhook_type) if not processor: raise ValueError(f"No processor for type: {webhook_endpoint.webhook_type}") + # Persist the webhook row before handing off to processors + await db_session.commit() + # 7. Process webhook with simplified interface result = await processor.process( webhook_request=webhook_request, @@ -922,38 +930,50 @@ async def handle_webhook_unified( event_dispatcher=request.app.state.event_dispatcher, ) - # 8. Update status and link created entities when available - # Re-add to session in case processor called rollback (e.g., for duplicates) - if webhook_request not in db_session: - webhook_request = await db_session.merge(webhook_request) + if not db_session.in_transaction(): + await db_session.begin() - webhook_request.status = WebhookStatus.COMPLETED - webhook_request.processing_completed_at = datetime.now(UTC) + completion_values = { + "status": WebhookStatus.COMPLETED, + "processing_completed_at": datetime.now(UTC), + } - created_customer_id = result.get("customer_id") if isinstance(result, dict) else None - created_reservation_id = ( - result.get("reservation_id") if isinstance(result, dict) else None + if isinstance(result, dict): + created_customer_id = result.get("customer_id") + created_reservation_id = result.get("reservation_id") + if created_customer_id: + completion_values["created_customer_id"] = created_customer_id + if created_reservation_id: + completion_values["created_reservation_id"] = created_reservation_id + + await db_session.execute( + update(WebhookRequest) + .where(WebhookRequest.id == webhook_request_id) + .values(**completion_values) ) - - if created_customer_id: - webhook_request.created_customer_id = created_customer_id - if created_reservation_id: - webhook_request.created_reservation_id = created_reservation_id - await db_session.commit() return { **result, - "webhook_id": webhook_request.id, - "hotel_id": webhook_endpoint.hotel_id, + "webhook_id": webhook_request_id, + "hotel_id": webhook_hotel_id, } except Exception as e: _LOGGER.exception("Error processing webhook: %s", e) - webhook_request.status = WebhookStatus.FAILED - webhook_request.last_error = str(e)[:2000] - webhook_request.processing_completed_at = datetime.now(UTC) + await db_session.rollback() + if not db_session.in_transaction(): + await db_session.begin() + await db_session.execute( + update(WebhookRequest) + .where(WebhookRequest.id == webhook_request_id) + .values( + status=WebhookStatus.FAILED, + last_error=str(e)[:2000], + processing_completed_at=datetime.now(UTC), + ) + ) await db_session.commit() raise HTTPException(status_code=500, detail="Error processing webhook") diff --git a/tests/test_customer_service.py b/tests/test_customer_service.py index 5578e97..e87166e 100644 --- a/tests/test_customer_service.py +++ b/tests/test_customer_service.py @@ -6,7 +6,7 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from alpine_bits_python.customer_service import CustomerService -from alpine_bits_python.db import Base, Customer, HashedCustomer +from alpine_bits_python.db import Base, Customer @pytest_asyncio.fixture