2 Commits

Author SHA1 Message Date
Jonas Linter
2c3d779ab2 Reduced logging impact 2025-12-02 16:01:45 +01:00
Jonas Linter
163b4be9cc Fixed small issue in webhook-processor not saving the results to the webhook_request table 2025-12-02 16:00:43 +01:00
3 changed files with 44 additions and 46 deletions

View File

@@ -2,7 +2,6 @@
import asyncio import asyncio
import gzip import gzip
import hashlib
import json import json
import multiprocessing import multiprocessing
import os import os
@@ -836,16 +835,27 @@ async def handle_webhook_unified(
if not webhook_endpoint.hotel.is_active: if not webhook_endpoint.hotel.is_active:
raise HTTPException(status_code=404, detail="Hotel is not active") raise HTTPException(status_code=404, detail="Hotel is not active")
# 3. Hash payload (canonical JSON for consistent hashing) # 3. Track payload metadata with canonical hashing handled by WebhookRequestData
payload_json_str = json.dumps(payload, sort_keys=True) payload_size = len(body)
payload_hash = hashlib.sha256(payload_json_str.encode("utf-8")).hexdigest()
payload_size = len(payload_json_str.encode("utf-8"))
# Check payload size limit (10MB) # Check payload size limit (10MB)
if payload_size > 10 * 1024 * 1024: if payload_size > 10 * 1024 * 1024:
_LOGGER.error("Payload too large: %d bytes", payload_size) _LOGGER.error("Payload too large: %d bytes", payload_size)
raise HTTPException(status_code=413, detail="Payload too large (max 10MB)") raise HTTPException(status_code=413, detail="Payload too large (max 10MB)")
webhook_request_data = WebhookRequestData(
payload_json=payload,
webhook_endpoint_id=webhook_endpoint.id,
hotel_id=webhook_endpoint.hotel_id,
status=WebhookStatus.PROCESSING,
processing_started_at=timestamp,
created_at=timestamp,
source_ip=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
)
payload_hash = webhook_request_data.payload_hash
# 4. Check for duplicate with row-level locking # 4. Check for duplicate with row-level locking
duplicate = await db_session.execute( duplicate = await db_session.execute(
select(WebhookRequest) select(WebhookRequest)
@@ -892,18 +902,7 @@ async def handle_webhook_unified(
webhook_request.status = WebhookStatus.PROCESSING webhook_request.status = WebhookStatus.PROCESSING
webhook_request.processing_started_at = timestamp webhook_request.processing_started_at = timestamp
else: else:
webhook_request_data = WebhookRequestData( # 5. Create new webhook_request from validated data
payload_hash=payload_hash,
webhook_endpoint_id=webhook_endpoint.id,
hotel_id=webhook_endpoint.hotel_id,
status=WebhookStatus.PROCESSING,
payload_json=payload,
processing_started_at=timestamp,
created_at=timestamp,
source_ip=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
)
# 5. Create new webhook_request
webhook_request = WebhookRequest(**webhook_request_data.model_dump()) webhook_request = WebhookRequest(**webhook_request_data.model_dump())
db_session.add(webhook_request) db_session.add(webhook_request)
@@ -923,10 +922,20 @@ async def handle_webhook_unified(
event_dispatcher=request.app.state.event_dispatcher, event_dispatcher=request.app.state.event_dispatcher,
) )
# 8. Update status # 8. Update status and link created entities when available
webhook_request.status = WebhookStatus.COMPLETED webhook_request.status = WebhookStatus.COMPLETED
webhook_request.processing_completed_at = datetime.now(UTC) webhook_request.processing_completed_at = datetime.now(UTC)
created_customer_id = result.get("customer_id") if isinstance(result, dict) else None
created_reservation_id = (
result.get("reservation_id") if isinstance(result, dict) else None
)
if created_customer_id:
webhook_request.created_customer_id = created_customer_id
if created_reservation_id:
webhook_request.created_reservation_id = created_reservation_id
await db_session.commit() await db_session.commit()
return { return {

View File

@@ -106,13 +106,13 @@ class ConversionService:
if isinstance(session, SessionMaker): if isinstance(session, SessionMaker):
self.session_maker = session self.session_maker = session
self.supports_concurrent = True self.supports_concurrent = True
_LOGGER.info( _LOGGER.debug(
"ConversionService initialized in concurrent mode with SessionMaker" "ConversionService initialized in concurrent mode with SessionMaker"
) )
elif isinstance(session, AsyncSession): elif isinstance(session, AsyncSession):
self.session = session self.session = session
self.supports_concurrent = False self.supports_concurrent = False
_LOGGER.info( _LOGGER.debug(
"ConversionService initialized in sequential mode with single session" "ConversionService initialized in sequential mode with single session"
) )
elif session is not None: elif session is not None:
@@ -123,19 +123,19 @@ class ConversionService:
@staticmethod @staticmethod
def _parse_required_int(value: str | None, field_name: str) -> int: def _parse_required_int(value: str | None, field_name: str) -> int:
"""Parse an integer attribute that must be present.""" """Parse an integer attribute that must be present."""
if value in (None, ""): if value in (None, ""):
raise ValueError(f"{field_name} is required") raise ValueError(f"{field_name} is required")
try: try:
return int(value) return int(value)
except (TypeError, ValueError) as exc: except (TypeError, ValueError) as exc:
raise ValueError(f"{field_name} must be an integer (value={value})") from exc raise ValueError(
f"{field_name} must be an integer (value={value})"
) from exc
@staticmethod @staticmethod
def _parse_optional_int(value: str | None, field_name: str) -> int | None: def _parse_optional_int(value: str | None, field_name: str) -> int | None:
"""Parse an optional integer attribute, logging on failure.""" """Parse an optional integer attribute, logging on failure."""
if value in (None, ""): if value in (None, ""):
return None return None
@@ -148,7 +148,6 @@ class ConversionService:
@staticmethod @staticmethod
def _parse_date(value: str | None, field_name: str) -> date | None: def _parse_date(value: str | None, field_name: str) -> date | None:
"""Parse a YYYY-MM-DD formatted date string.""" """Parse a YYYY-MM-DD formatted date string."""
if not value: if not value:
return None return None
@@ -161,7 +160,6 @@ class ConversionService:
@staticmethod @staticmethod
def _parse_datetime(value: str | None, field_name: str) -> datetime | None: def _parse_datetime(value: str | None, field_name: str) -> datetime | None:
"""Parse an ISO timestamp string.""" """Parse an ISO timestamp string."""
if not value: if not value:
return None return None
@@ -176,7 +174,6 @@ class ConversionService:
self, daily_sales_elem: ET.Element | None self, daily_sales_elem: ET.Element | None
) -> tuple[list[dict[str, str]], Decimal | None, int]: ) -> tuple[list[dict[str, str]], Decimal | None, int]:
"""Extract the list of sale dictionaries and aggregate revenue information.""" """Extract the list of sale dictionaries and aggregate revenue information."""
if daily_sales_elem is None: if daily_sales_elem is None:
return [], None, 0 return [], None, 0
@@ -198,9 +195,7 @@ class ConversionService:
try: try:
total_revenue += Decimal(revenue_total_str) total_revenue += Decimal(revenue_total_str)
except (ValueError, TypeError): except (ValueError, TypeError):
_LOGGER.warning( _LOGGER.warning("Invalid revenueTotal value: %s", revenue_total_str)
"Invalid revenueTotal value: %s", revenue_total_str
)
# Copy the remaining optional revenue buckets if present # Copy the remaining optional revenue buckets if present
for field_name in ( for field_name in (
@@ -224,16 +219,15 @@ class ConversionService:
self, room_elem: ET.Element, pms_reservation_id: int, room_index: int self, room_elem: ET.Element, pms_reservation_id: int, room_index: int
) -> ParsedRoomReservation: ) -> ParsedRoomReservation:
"""Convert a <roomReservation> element into ParsedRoomReservation.""" """Convert a <roomReservation> element into ParsedRoomReservation."""
arrival_date = self._parse_date(room_elem.get("arrival"), "arrival date") arrival_date = self._parse_date(room_elem.get("arrival"), "arrival date")
departure_date = self._parse_date( departure_date = self._parse_date(room_elem.get("departure"), "departure date")
room_elem.get("departure"), "departure date"
)
num_adults = self._parse_optional_int(room_elem.get("adults"), "adults") num_adults = self._parse_optional_int(room_elem.get("adults"), "adults")
room_number = room_elem.get("roomNumber") room_number = room_elem.get("roomNumber")
if room_number is None: if room_number is None:
_LOGGER.debug( _LOGGER.debug(
"Room reservation %s #%d has no roomNumber", pms_reservation_id, room_index "Room reservation %s #%d has no roomNumber",
pms_reservation_id,
room_index,
) )
daily_sales, total_revenue, sale_count = self._parse_daily_sales( daily_sales, total_revenue, sale_count = self._parse_daily_sales(
@@ -259,7 +253,6 @@ class ConversionService:
self, reservation_elem: ET.Element self, reservation_elem: ET.Element
) -> ParsedReservationData | None: ) -> ParsedReservationData | None:
"""Convert a <reservation> element into a structured representation.""" """Convert a <reservation> element into a structured representation."""
try: try:
pms_reservation_id = self._parse_required_int( pms_reservation_id = self._parse_required_int(
reservation_elem.get("id"), "reservation id" reservation_elem.get("id"), "reservation id"
@@ -447,7 +440,7 @@ class ConversionService:
await session.execute(stmt) await session.execute(stmt)
_LOGGER.info( _LOGGER.debug(
"Phase 1: Upserted batch %d-%d of %d guests", "Phase 1: Upserted batch %d-%d of %d guests",
batch_start + 1, batch_start + 1,
batch_end, batch_end,
@@ -983,15 +976,11 @@ class ConversionService:
if existing_room_reservation: if existing_room_reservation:
# Update existing room reservation with all fields # Update existing room reservation with all fields
existing_room_reservation.arrival_date = ( existing_room_reservation.arrival_date = room_reservation.arrival_date
room_reservation.arrival_date
)
existing_room_reservation.departure_date = ( existing_room_reservation.departure_date = (
room_reservation.departure_date room_reservation.departure_date
) )
existing_room_reservation.room_status = ( existing_room_reservation.room_status = room_reservation.room_status
room_reservation.room_status
)
existing_room_reservation.room_type = room_reservation.room_type existing_room_reservation.room_type = room_reservation.room_type
existing_room_reservation.num_adults = room_reservation.num_adults existing_room_reservation.num_adults = room_reservation.num_adults
existing_room_reservation.rate_plan_code = ( existing_room_reservation.rate_plan_code = (
@@ -1005,9 +994,7 @@ class ConversionService:
if room_reservation.daily_sales if room_reservation.daily_sales
else None else None
) )
existing_room_reservation.total_revenue = ( existing_room_reservation.total_revenue = room_reservation.total_revenue
room_reservation.total_revenue
)
existing_room_reservation.updated_at = datetime.now() existing_room_reservation.updated_at = datetime.now()
_LOGGER.debug( _LOGGER.debug(
"Updated room reservation %s (pms_id=%s, room=%s)", "Updated room reservation %s (pms_id=%s, room=%s)",
@@ -1872,7 +1859,7 @@ class ConversionService:
conversion_guest.is_regular = is_regular conversion_guest.is_regular = is_regular
if is_regular: if is_regular:
_LOGGER.info( _LOGGER.debug(
"Marking guest %s as regular: earliest paying conversion %s predates first reservation created at %s", "Marking guest %s as regular: earliest paying conversion %s predates first reservation created at %s",
guest_id, guest_id,
earliest_paying_conversion.reservation_date, earliest_paying_conversion.reservation_date,

View File

@@ -323,6 +323,8 @@ async def process_wix_form_submission(
"received_keys": list(data.keys()), "received_keys": list(data.keys()),
"timestamp": timestamp, "timestamp": timestamp,
"note": "No authentication required for this endpoint", "note": "No authentication required for this endpoint",
"customer_id": db_customer.id,
"reservation_id": db_reservation.id,
} }