Finally fixed greenlet_spawn sqllchemy error. The horror

This commit is contained in:
Jonas Linter
2025-12-03 14:13:20 +01:00
parent d2ed77e008
commit 12350578cc
2 changed files with 45 additions and 25 deletions

View File

@@ -28,7 +28,7 @@ from fastapi.security import (
from pydantic import BaseModel from pydantic import BaseModel
from slowapi.errors import RateLimitExceeded from slowapi.errors import RateLimitExceeded
from sqlalchemy import and_, select, update from sqlalchemy import and_, select, update
from sqlalchemy.ext.asyncio import async_sessionmaker from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession
from sqlalchemy.orm import selectinload from sqlalchemy.orm import selectinload
from alpine_bits_python.hotel_service import HotelService from alpine_bits_python.hotel_service import HotelService
@@ -704,7 +704,7 @@ async def validate_basic_auth(
async def handle_webhook_unified( async def handle_webhook_unified(
request: Request, request: Request,
webhook_secret: str, webhook_secret: str,
db_session=Depends(get_async_session), db_session: AsyncSession = Depends(get_async_session),
): ):
"""Unified webhook handler with deduplication and routing. """Unified webhook handler with deduplication and routing.
@@ -831,6 +831,9 @@ async def handle_webhook_unified(
if not webhook_endpoint: if not webhook_endpoint:
raise HTTPException(status_code=404, detail="Webhook not found") raise HTTPException(status_code=404, detail="Webhook not found")
webhook_endpoint_id = webhook_endpoint.id
webhook_hotel_id = webhook_endpoint.hotel_id
# Verify hotel is active # Verify hotel is active
if not webhook_endpoint.hotel.is_active: if not webhook_endpoint.hotel.is_active:
raise HTTPException(status_code=404, detail="Hotel is not active") raise HTTPException(status_code=404, detail="Hotel is not active")
@@ -845,8 +848,8 @@ async def handle_webhook_unified(
webhook_request_data = WebhookRequestData( webhook_request_data = WebhookRequestData(
payload_json=payload, payload_json=payload,
webhook_endpoint_id=webhook_endpoint.id, webhook_endpoint_id=webhook_endpoint_id,
hotel_id=webhook_endpoint.hotel_id, hotel_id=webhook_hotel_id,
status=WebhookStatus.PROCESSING, status=WebhookStatus.PROCESSING,
processing_started_at=timestamp, processing_started_at=timestamp,
created_at=timestamp, created_at=timestamp,
@@ -908,12 +911,17 @@ async def handle_webhook_unified(
db_session.add(webhook_request) db_session.add(webhook_request)
await db_session.flush() await db_session.flush()
webhook_request_id = webhook_request.id
try: try:
# 6. Get processor for webhook_type # 6. Get processor for webhook_type
processor = webhook_registry.get_processor(webhook_endpoint.webhook_type) processor = webhook_registry.get_processor(webhook_endpoint.webhook_type)
if not processor: if not processor:
raise ValueError(f"No processor for type: {webhook_endpoint.webhook_type}") raise ValueError(f"No processor for type: {webhook_endpoint.webhook_type}")
# Persist the webhook row before handing off to processors
await db_session.commit()
# 7. Process webhook with simplified interface # 7. Process webhook with simplified interface
result = await processor.process( result = await processor.process(
webhook_request=webhook_request, webhook_request=webhook_request,
@@ -922,38 +930,50 @@ async def handle_webhook_unified(
event_dispatcher=request.app.state.event_dispatcher, event_dispatcher=request.app.state.event_dispatcher,
) )
# 8. Update status and link created entities when available if not db_session.in_transaction():
# Re-add to session in case processor called rollback (e.g., for duplicates) await db_session.begin()
if webhook_request not in db_session:
webhook_request = await db_session.merge(webhook_request)
webhook_request.status = WebhookStatus.COMPLETED completion_values = {
webhook_request.processing_completed_at = datetime.now(UTC) "status": WebhookStatus.COMPLETED,
"processing_completed_at": datetime.now(UTC),
}
created_customer_id = result.get("customer_id") if isinstance(result, dict) else None if isinstance(result, dict):
created_reservation_id = ( created_customer_id = result.get("customer_id")
result.get("reservation_id") if isinstance(result, dict) else None created_reservation_id = result.get("reservation_id")
if created_customer_id:
completion_values["created_customer_id"] = created_customer_id
if created_reservation_id:
completion_values["created_reservation_id"] = created_reservation_id
await db_session.execute(
update(WebhookRequest)
.where(WebhookRequest.id == webhook_request_id)
.values(**completion_values)
) )
if created_customer_id:
webhook_request.created_customer_id = created_customer_id
if created_reservation_id:
webhook_request.created_reservation_id = created_reservation_id
await db_session.commit() await db_session.commit()
return { return {
**result, **result,
"webhook_id": webhook_request.id, "webhook_id": webhook_request_id,
"hotel_id": webhook_endpoint.hotel_id, "hotel_id": webhook_hotel_id,
} }
except Exception as e: except Exception as e:
_LOGGER.exception("Error processing webhook: %s", e) _LOGGER.exception("Error processing webhook: %s", e)
webhook_request.status = WebhookStatus.FAILED await db_session.rollback()
webhook_request.last_error = str(e)[:2000] if not db_session.in_transaction():
webhook_request.processing_completed_at = datetime.now(UTC) await db_session.begin()
await db_session.execute(
update(WebhookRequest)
.where(WebhookRequest.id == webhook_request_id)
.values(
status=WebhookStatus.FAILED,
last_error=str(e)[:2000],
processing_completed_at=datetime.now(UTC),
)
)
await db_session.commit() await db_session.commit()
raise HTTPException(status_code=500, detail="Error processing webhook") raise HTTPException(status_code=500, detail="Error processing webhook")

View File

@@ -6,7 +6,7 @@ from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from alpine_bits_python.customer_service import CustomerService from alpine_bits_python.customer_service import CustomerService
from alpine_bits_python.db import Base, Customer, HashedCustomer from alpine_bits_python.db import Base, Customer
@pytest_asyncio.fixture @pytest_asyncio.fixture