Files
alpinebits_python/src/alpine_bits_python/db.py
2025-11-27 18:57:45 +01:00

855 lines
31 KiB
Python

import asyncio
import hashlib
import os
from collections.abc import AsyncGenerator, Callable
from typing import TypeVar
from .const import WebhookStatus
from sqlalchemy import (
JSON,
Boolean,
Column,
Date,
DateTime,
Double,
ForeignKey,
ForeignKeyConstraint,
Index,
Integer,
String,
UniqueConstraint,
func,
)
from sqlalchemy.exc import DBAPIError
from sqlalchemy.ext.asyncio import (
AsyncEngine,
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.orm import backref, declarative_base, relationship
from .logging_config import get_logger
_LOGGER = get_logger(__name__)
# Load schema from config at module level
# This happens once when the module is imported
try:
from .config_loader import load_config
_app_config = load_config()
_SCHEMA = _app_config.get("database", {}).get("schema")
except (FileNotFoundError, KeyError, ValueError, ImportError):
_SCHEMA = None
# If schema isn't in config, try environment variable
if not _SCHEMA:
_SCHEMA = os.environ.get("DATABASE_SCHEMA")
class Base:
"""Base class that applies schema to all tables."""
# # Set schema on all tables if configured
# if _SCHEMA:
# __table_args__ = {"schema": _SCHEMA}
Base = declarative_base(cls=Base)
# Type variable for async functions
T = TypeVar("T")
# Maximum number of retries for session operations
MAX_RETRIES = 3
# Delay between retries in seconds
RETRY_DELAY = 0.5
# Async SQLAlchemy setup
def get_database_url(config=None):
db_url = None
if config and "database" in config and "url" in config["database"]:
db_url = config["database"]["url"]
if not db_url:
db_url = os.environ.get("DATABASE_URL")
if not db_url:
db_url = "sqlite+aiosqlite:///alpinebits.db"
return db_url
def get_database_schema(config=None):
"""Get the PostgreSQL schema name from config.
Args:
config: Configuration dictionary
Returns:
Schema name string, or None if not configured
"""
# Check environment variable first (takes precedence)
schema = os.environ.get("DATABASE_SCHEMA")
if schema:
return schema
# Fall back to config file
if config and "database" in config and "schema" in config["database"]:
return config["database"]["schema"]
return None
def configure_schema(schema_name):
"""Configure the database schema for all models.
IMPORTANT: This must be called BEFORE any models are imported/defined.
It modifies the Base class to apply schema to all tables.
Args:
schema_name: Name of the schema to use (e.g., "alpinebits")
"""
if schema_name:
# Set __table_args__ on the Base class to apply schema to all tables
Base.__table_args__ = {"schema": _SCHEMA}
def create_database_engine(config=None, echo=False) -> AsyncEngine:
"""Create a configured database engine with schema support.
This function:
1. Gets the database URL from config
2. Gets the schema name (if configured)
3. Configures all models to use the schema
4. Creates the async engine with appropriate connect_args for PostgreSQL
Args:
config: Configuration dictionary
echo: Whether to echo SQL statements (default: False)
Returns:
Configured AsyncEngine instance
"""
database_url = get_database_url(config)
schema_name = get_database_schema(config)
# # Configure schema for all models if specified
if schema_name:
configure_schema(schema_name)
_LOGGER.info("Configured database schema: %s", schema_name)
# Create engine with connect_args to set search_path for PostgreSQL
connect_args = {}
if schema_name and "postgresql" in database_url:
connect_args = {"server_settings": {"search_path": f"{schema_name},public"}}
_LOGGER.info("Setting PostgreSQL search_path to: %s,public", schema_name)
return create_async_engine(database_url, echo=echo, connect_args=connect_args)
class ResilientAsyncSession:
"""Wrapper around AsyncSession that handles connection recovery.
This wrapper automatically retries operations on connection loss or OID errors,
disposing the connection pool and creating a fresh session on failure.
"""
def __init__(
self,
async_sessionmaker_: async_sessionmaker[AsyncSession],
engine: AsyncEngine,
):
"""Initialize the resilient session wrapper.
Args:
async_sessionmaker_: Factory for creating async sessions
engine: The SQLAlchemy async engine for connection recovery
"""
self.async_sessionmaker = async_sessionmaker_
self.engine = engine
async def execute_with_retry(self, func: Callable[..., T], *args, **kwargs) -> T:
"""Execute a function with automatic retry on connection errors.
Args:
func: Async function that takes a session as first argument
*args: Positional arguments to pass to func (first arg should be session)
**kwargs: Keyword arguments to pass to func
Returns:
Result of the function call
Raises:
The original exception if all retries are exhausted
"""
last_error = None
for attempt in range(MAX_RETRIES):
try:
async with self.async_sessionmaker() as session:
return await func(session, *args, **kwargs)
except DBAPIError as e:
last_error = e
error_msg = str(e).lower()
# Check if this is an OID error or connection loss
if (
"could not open relation" in error_msg
or "lost connection" in error_msg
or "connection closed" in error_msg
or "connection refused" in error_msg
):
_LOGGER.warning(
"Connection error on attempt %d/%d: %s. Disposing pool and retrying...",
attempt + 1,
MAX_RETRIES,
e.__class__.__name__,
)
# Dispose the entire connection pool to force new connections
await self.engine.dispose()
# Wait before retry (exponential backoff)
if attempt < MAX_RETRIES - 1:
wait_time = RETRY_DELAY * (2**attempt)
await asyncio.sleep(wait_time)
else:
# Not a connection-related error, re-raise immediately
raise
except Exception:
# Any other exception, re-raise immediately
raise
# All retries exhausted
_LOGGER.error(
"Failed to execute query after %d retries: %s",
MAX_RETRIES,
last_error.__class__.__name__,
)
raise last_error
class SessionMaker:
"""Factory for creating independent AsyncSession instances.
This class enables concurrent processing by allowing each task to create
and manage its own database session. Useful for processing large datasets
where concurrent execution is desired but each concurrent task needs its own
database transaction context.
"""
def __init__(self, async_sessionmaker_: async_sessionmaker[AsyncSession]):
"""Initialize the SessionMaker.
Args:
async_sessionmaker_: SQLAlchemy async_sessionmaker factory
"""
self.async_sessionmaker = async_sessionmaker_
async def create_session(self) -> AsyncSession:
"""Create a new independent AsyncSession.
Returns:
A new AsyncSession instance ready for use. Caller is responsible
for managing the session lifecycle (closing when done).
"""
return self.async_sessionmaker()
async def get_resilient_session(
resilient_session: "ResilientAsyncSession",
) -> AsyncGenerator[AsyncSession]:
"""Dependency for FastAPI that provides a resilient async session.
This generator creates a new session with automatic retry capability
on connection errors. Used as a dependency in FastAPI endpoints.
Args:
resilient_session: ResilientAsyncSession instance from app state
Yields:
AsyncSession instance for database operations
"""
async with resilient_session.async_sessionmaker() as session:
yield session
class Customer(Base):
__tablename__ = "customers"
id = Column(Integer, primary_key=True)
given_name = Column(String)
contact_id = Column(String, unique=True)
surname = Column(String)
name_prefix = Column(String)
email_address = Column(String)
phone = Column(String)
email_newsletter = Column(Boolean)
address_line = Column(String)
city_name = Column(String)
postal_code = Column(String)
country_code = Column(String)
gender = Column(String)
birth_date = Column(String)
language = Column(String)
address_catalog = Column(Boolean) # Added for XML
name_title = Column(String) # Added for XML
reservations = relationship("Reservation", back_populates="customer")
def __repr__(self):
return f"Customer (id={self.id}, contact_id={self.contact_id}, email={self.email_address}), given_name={self.given_name} surname={self.surname}), phone={self.phone}, city={self.city_name}), postal_code={self.postal_code}, country_code={self.country_code})"
@staticmethod
def _normalize_and_hash(value):
"""Normalize and hash a value according to Meta Conversion API requirements."""
if not value:
return None
# Normalize: lowercase, strip whitespace
normalized = str(value).lower().strip()
# Remove spaces for phone numbers
is_phone = (
normalized.startswith("+")
or normalized.replace("-", "").replace(" ", "").isdigit()
)
if is_phone:
chars_to_remove = [" ", "-", "(", ")"]
for char in chars_to_remove:
normalized = normalized.replace(char, "")
# SHA256 hash
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()
def create_hashed_customer(self):
"""Create a HashedCustomer instance from this Customer."""
return HashedCustomer(
customer_id=self.id,
contact_id=self.contact_id,
hashed_email=self._normalize_and_hash(self.email_address),
hashed_phone=self._normalize_and_hash(self.phone),
hashed_given_name=self._normalize_and_hash(self.given_name),
hashed_surname=self._normalize_and_hash(self.surname),
hashed_city=self._normalize_and_hash(self.city_name),
hashed_postal_code=self._normalize_and_hash(self.postal_code),
hashed_country_code=self._normalize_and_hash(self.country_code),
hashed_gender=self._normalize_and_hash(self.gender),
hashed_birth_date=self._normalize_and_hash(self.birth_date),
)
class HashedCustomer(Base):
"""Hashed customer data for Meta Conversion API.
Stores SHA256 hashed versions of customer PII according to Meta's requirements.
This allows sending conversion events without exposing raw customer data.
"""
__tablename__ = "hashed_customers"
id = Column(Integer, primary_key=True)
customer_id = Column(
Integer, ForeignKey("customers.id", ondelete="SET NULL"), unique=True, nullable=True
)
contact_id = Column(String, unique=True) # Keep unhashed for reference
hashed_email = Column(String(64)) # SHA256 produces 64 hex chars
hashed_phone = Column(String(64))
hashed_given_name = Column(String(64))
hashed_surname = Column(String(64))
hashed_city = Column(String(64))
hashed_postal_code = Column(String(64))
hashed_country_code = Column(String(64))
hashed_gender = Column(String(64))
hashed_birth_date = Column(String(64))
created_at = Column(DateTime(timezone=True))
customer = relationship("Customer", backref=backref("hashed_version", uselist=False, lazy="joined"))
class ConversionGuest(Base):
"""Guest information from hotel PMS conversions, with hashed fields for privacy.
Stores both unhashed (for reference during transition) and hashed (SHA256 per Meta API)
versions of guest PII. Uses composite primary key (hotel_id, guest_id) from the PMS.
When multiple conversions for the same guest arrive with different guest info,
the most recent (by last_seen) data is kept as the canonical version.
"""
__tablename__ = "conversion_guests"
# Natural keys from PMS - composite primary key
hotel_id = Column(String, nullable=False, primary_key=True, index=True)
guest_id = Column(String, nullable=False, primary_key=True, index=True)
# Unhashed guest information (for reference/transition period)
guest_first_name = Column(String)
guest_last_name = Column(String)
guest_email = Column(String)
guest_country_code = Column(String)
guest_birth_date = Column(Date)
# Hashed guest information (SHA256, for privacy compliance)
hashed_first_name = Column(String(64), index=True)
hashed_last_name = Column(String(64), index=True)
hashed_email = Column(String(64), index=True)
hashed_country_code = Column(String(64))
hashed_birth_date = Column(String(64))
# Matched customer reference (nullable, filled after matching)
hashed_customer_id = Column(Integer, ForeignKey("hashed_customers.id"), nullable=True, index=True)
# Guest classification
is_regular = Column(Boolean, default=False) # True if guest has many prior stays before appearing in our reservations
# Metadata
first_seen = Column(DateTime(timezone=True))
last_seen = Column(DateTime(timezone=True))
# Relationships
conversions = relationship("Conversion", back_populates="guest")
hashed_customer = relationship("HashedCustomer", backref="conversion_guests")
@staticmethod
def _normalize_and_hash(value):
"""Normalize and hash a value according to Meta Conversion API requirements."""
if not value:
return None
# Normalize: lowercase, strip whitespace
normalized = str(value).lower().strip()
# SHA256 hash
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()
@classmethod
def create_from_conversion_data(
cls,
hotel_id: str,
guest_id: str | None,
guest_first_name: str | None,
guest_last_name: str | None,
guest_email: str | None,
guest_country_code: str | None,
guest_birth_date: Date | None,
now: DateTime,
is_regular: bool = False,
):
"""Create a ConversionGuest from conversion guest data."""
return cls(
hotel_id=hotel_id,
guest_id=guest_id,
guest_first_name=guest_first_name,
guest_last_name=guest_last_name,
guest_email=guest_email,
guest_country_code=guest_country_code,
guest_birth_date=guest_birth_date,
hashed_first_name=cls._normalize_and_hash(guest_first_name),
hashed_last_name=cls._normalize_and_hash(guest_last_name),
hashed_email=cls._normalize_and_hash(guest_email),
hashed_country_code=cls._normalize_and_hash(guest_country_code),
hashed_birth_date=cls._normalize_and_hash(
guest_birth_date.isoformat() if guest_birth_date else None
),
is_regular=is_regular,
first_seen=now,
last_seen=now,
)
def update_from_conversion_data(
self,
guest_first_name: str | None,
guest_last_name: str | None,
guest_email: str | None,
guest_country_code: str | None,
guest_birth_date: Date | None,
now: DateTime,
):
"""Update ConversionGuest with newer guest data, preferring non-null values."""
# Only update if new data is provided (not null)
if guest_first_name:
self.guest_first_name = guest_first_name
self.hashed_first_name = self._normalize_and_hash(guest_first_name)
if guest_last_name:
self.guest_last_name = guest_last_name
self.hashed_last_name = self._normalize_and_hash(guest_last_name)
if guest_email:
self.guest_email = guest_email
self.hashed_email = self._normalize_and_hash(guest_email)
if guest_country_code:
self.guest_country_code = guest_country_code
self.hashed_country_code = self._normalize_and_hash(guest_country_code)
if guest_birth_date:
self.guest_birth_date = guest_birth_date
self.hashed_birth_date = self._normalize_and_hash(guest_birth_date.isoformat())
self.last_seen = now
class Reservation(Base):
__tablename__ = "reservations"
id = Column(Integer, primary_key=True)
customer_id = Column(Integer, ForeignKey("customers.id", ondelete="SET NULL"))
hashed_customer_id = Column(Integer, ForeignKey("hashed_customers.id", ondelete="CASCADE"))
unique_id = Column(String, unique=True)
md5_unique_id = Column(String(32), unique=True) # max length 32 guaranteed
start_date = Column(Date)
end_date = Column(Date)
num_adults = Column(Integer)
num_children = Column(Integer)
children_ages = Column(String) # comma-separated
offer = Column(String)
created_at = Column(DateTime(timezone=True))
# Add all UTM fields and user comment for XML
utm_source = Column(String)
utm_medium = Column(String)
utm_campaign = Column(String)
utm_term = Column(String)
utm_content = Column(String)
user_comment = Column(String)
fbclid = Column(String)
gclid = Column(String)
# Advertising account IDs (stored conditionally based on fbclid/gclid presence)
meta_account_id = Column(String)
google_account_id = Column(String)
# Add hotel_code and hotel_name for XML
hotel_code = Column(String)
hotel_name = Column(String)
# RoomTypes fields (optional)
room_type_code = Column(String)
room_classification_code = Column(String)
room_type = Column(String)
customer = relationship("Customer", back_populates="reservations")
hashed_customer = relationship("HashedCustomer", backref="reservations")
# Table for tracking acknowledged requests by client
class AckedRequest(Base):
"""Tracks which Reservations the Client has already seen via ReadAction.
Clients can report successfull transfers via ReportNotifAction. This gets stored in this table.
This prevents re-sending the same reservation multiple times to the client.
"""
__tablename__ = "acked_requests"
id = Column(Integer, primary_key=True)
client_id = Column(String, index=True)
username = Column(
String, index=True, nullable=True
) # Username of the client making the request
unique_id = Column(
String, index=True
) # Should match Reservation.form_id or another unique field
timestamp = Column(DateTime(timezone=True))
class Conversion(Base):
"""Conversion data from hotel PMS.
Represents a single reservation event from the PMS XML with all its metadata.
Each row links to one reservation from the PMS system. A reservation can have
multiple room reservations (stored in ConversionRoom table).
Linked to reservations via advertising tracking data (fbclid, gclid, etc)
stored in advertisingCampagne field.
The tracking data transferered by the PMS is however somewhat shorter.
We therefore also need to match on guest name/email and other metadata.
Attribution flags:
- directly_attributable: True if matched by ID (reservation_id is set), meaning
this conversion is directly responsible for this reservation
- guest_matched: True if matched only by guest details (customer_id/hashed_customer_id set),
meaning the same person made this request but the reservation may not be directly attributable
"""
__tablename__ = "conversions"
id = Column(Integer, primary_key=True)
# Link to reservation (nullable since matching may not always work)
reservation_id = Column(
Integer, ForeignKey("reservations.id"), nullable=True, index=True
)
customer_id = Column(Integer, ForeignKey("customers.id"), nullable=True, index=True)
hashed_customer_id = Column(
Integer, ForeignKey("hashed_customers.id"), nullable=True, index=True
)
# Reservation metadata from XML
hotel_id = Column(String, index=True) # hotelID attribute
guest_id = Column(String, nullable=True, index=True) # PMS guest ID, FK to conversion_guests
pms_reservation_id = Column(String, index=True) # id attribute from reservation
reservation_number = Column(String) # number attribute
reservation_date = Column(Date) # date attribute (when reservation was made)
creation_time = Column(DateTime(timezone=True)) # creationTime attribute
reservation_type = Column(String) # type attribute (e.g., "reservation")
booking_channel = Column(String) # bookingChannel attribute
# Advertising/tracking data - used for matching to existing reservations
advertising_medium = Column(
String, index=True
) # advertisingMedium (e.g., "99TALES")
advertising_partner = Column(
String, index=True
) # advertisingPartner (e.g., "cpc", "website")
advertising_campagne = Column(
String, index=True
) # advertisingCampagne (contains fbclid/gclid)
# Attribution flags - track how this conversion was matched
directly_attributable = Column(Boolean, default=False) # Matched by ID (high confidence)
guest_matched = Column(Boolean, default=False) # Matched by guest details only
# Metadata
created_at = Column(DateTime(timezone=True)) # When this record was imported
updated_at = Column(DateTime(timezone=True)) # When this record was last updated
# Composite foreign key constraint for ConversionGuest (hotel_id, guest_id)
__table_args__ = (
ForeignKeyConstraint(
["hotel_id", "guest_id"],
["conversion_guests.hotel_id", "conversion_guests.guest_id"],
ondelete="SET NULL",
),
)
# Relationships
reservation = relationship("Reservation", backref="conversions")
customer = relationship("Customer", backref="conversions")
hashed_customer = relationship("HashedCustomer", backref="conversions")
guest = relationship("ConversionGuest", back_populates="conversions")
conversion_rooms = relationship(
"ConversionRoom", back_populates="conversion", cascade="all, delete-orphan"
)
class ConversionRoom(Base):
"""Room reservation data from hotel PMS.
Represents a single room reservation within a conversion/PMS reservation.
One conversion can have multiple room reservations (e.g., customer books 3 rooms).
Daily sales are stored as a JSON blob with an extracted total_revenue field
for efficient querying.
"""
__tablename__ = "conversion_rooms"
id = Column(Integer, primary_key=True)
# Link to the parent conversion/PMS reservation
conversion_id = Column(
Integer, ForeignKey("conversions.id"), nullable=False, index=True
)
# Identifier for this room reservation (for upserts)
# Composite: pms_reservation_id + room_number
# Note: Not globally unique - same room number can exist across different hotels
pms_hotel_reservation_id = Column(String, index=True)
# Room reservation details
arrival_date = Column(Date, index=True) # arrival attribute
departure_date = Column(Date, index=True) # departure attribute
room_status = Column(String) # status attribute (e.g., "reserved", "departed")
room_type = Column(String) # roomType attribute (e.g., "VDS", "EZR")
room_number = Column(String, index=True) # roomNumber attribute
num_adults = Column(Integer) # adults attribute
rate_plan_code = Column(String) # ratePlanCode attribute
connected_room_type = Column(String) # connectedRoomType attribute
# Daily sales data stored as JSON
# Format: [
# {"date": "2021-10-09", "revenueTotal": "13.6", "revenueOther": "13.6"},
# {"date": "2021-10-10", "revenueTotal": "306.1", "revenueLogis": "254", ...},
# ...
# ]
daily_sales = Column(JSON, nullable=True) # JSON array of daily sales
# Extracted total revenue for efficient querying (sum of all revenue_total in daily_sales)
# Kept as string to preserve decimal precision
total_revenue = Column(Double, nullable=True)
# Metadata
created_at = Column(DateTime(timezone=True)) # When this record was imported
updated_at = Column(DateTime(timezone=True)) # When this record was last updated
# Relationships
conversion = relationship("Conversion", back_populates="conversion_rooms")
class HotelInventory(Base):
"""Room and category definitions synchronized via AlpineBits."""
__tablename__ = "hotel_inventory"
id = Column(Integer, primary_key=True)
hotel_id = Column(
String(50), ForeignKey("hotels.hotel_id", ondelete="CASCADE"), nullable=False, index=True
)
inv_type_code = Column(String(8), nullable=False, index=True)
inv_code = Column(String(16), nullable=True, index=True)
room_name = Column(String(200), nullable=True)
max_occupancy = Column(Integer, nullable=True)
source = Column(String(20), nullable=False)
first_seen = Column(DateTime(timezone=True), nullable=False)
last_updated = Column(DateTime(timezone=True), nullable=False)
hotel = relationship("Hotel", back_populates="inventory_items")
availability = relationship(
"RoomAvailability",
back_populates="inventory_item",
cascade="all, delete-orphan",
passive_deletes=True,
)
__table_args__ = (
Index(
"uq_hotel_inventory_unique_key",
"hotel_id",
"inv_type_code",
func.coalesce(inv_code, ""),
unique=True,
),
)
class RoomAvailability(Base):
"""Daily availability counts for inventory items."""
__tablename__ = "room_availability"
id = Column(Integer, primary_key=True)
inventory_id = Column(
Integer, ForeignKey("hotel_inventory.id", ondelete="CASCADE"), nullable=False, index=True
)
date = Column(Date, nullable=False, index=True)
count_type_2 = Column(Integer, nullable=True)
count_type_6 = Column(Integer, nullable=True)
count_type_9 = Column(Integer, nullable=True)
is_closing_season = Column(Boolean, nullable=False, default=False)
last_updated = Column(DateTime(timezone=True), nullable=False)
update_type = Column(String(20), nullable=False)
inventory_item = relationship("HotelInventory", back_populates="availability")
__table_args__ = (
UniqueConstraint("inventory_id", "date", name="uq_room_availability_unique_key"),
)
class Hotel(Base):
"""Hotel configuration (migrated from alpine_bits_auth in config.yaml)."""
__tablename__ = "hotels"
id = Column(Integer, primary_key=True)
# Core identification
hotel_id = Column(String(50), unique=True, nullable=False, index=True)
hotel_name = Column(String(200), nullable=False)
# AlpineBits authentication
username = Column(String(100), unique=True, nullable=False, index=True)
password_hash = Column(String(200), nullable=False) # bcrypt
# Advertising accounts
meta_account_id = Column(String(50), nullable=True)
google_account_id = Column(String(50), nullable=True)
# Push endpoint (optional)
push_endpoint_url = Column(String(500), nullable=True)
push_endpoint_token = Column(String(200), nullable=True)
push_endpoint_username = Column(String(100), nullable=True)
# Metadata
created_at = Column(DateTime(timezone=True), nullable=False)
updated_at = Column(DateTime(timezone=True), nullable=False)
is_active = Column(Boolean, default=True, nullable=False, index=True)
# Relationships
webhook_endpoints = relationship("WebhookEndpoint", back_populates="hotel")
inventory_items = relationship(
"HotelInventory", back_populates="hotel", cascade="all, delete-orphan"
)
class WebhookEndpoint(Base):
"""Webhook configurations per hotel (supports multiple webhook types per hotel)."""
__tablename__ = "webhook_endpoints"
id = Column(Integer, primary_key=True)
# Hotel association
hotel_id = Column(String(50), ForeignKey("hotels.hotel_id"), nullable=False, index=True)
# Webhook configuration
webhook_secret = Column(String(64), unique=True, nullable=False, index=True)
webhook_type = Column(String(50), nullable=False) # 'wix_form', 'generic', etc.
# Metadata
description = Column(String(200), nullable=True) # Human-readable label
is_enabled = Column(Boolean, default=True, nullable=False)
created_at = Column(DateTime(timezone=True), nullable=False)
# Relationships
hotel = relationship("Hotel", back_populates="webhook_endpoints")
webhook_requests = relationship("WebhookRequest", back_populates="webhook_endpoint")
__table_args__ = (
Index('idx_webhook_endpoint_hotel_type', 'hotel_id', 'webhook_type'),
)
class WebhookRequest(Base):
"""Tracks incoming webhooks for deduplication and retry handling."""
__tablename__ = "webhook_requests"
id = Column(Integer, primary_key=True)
# Request identification
payload_hash = Column(String(64), unique=True, nullable=False, index=True) # SHA256
webhook_endpoint_id = Column(Integer, ForeignKey("webhook_endpoints.id"), nullable=True, index=True)
hotel_id = Column(String(50), ForeignKey("hotels.hotel_id"), nullable=True, index=True)
# Processing tracking
status = Column(String(20), nullable=False, default=WebhookStatus.PENDING.value, index=True)
# Status values: 'pending', 'processing', 'completed', 'failed' set by Enum WebhookStatus
processing_started_at = Column(DateTime(timezone=True), nullable=True)
processing_completed_at = Column(DateTime(timezone=True), nullable=True)
# Retry handling
retry_count = Column(Integer, default=0)
last_error = Column(String(2000), nullable=True)
# Payload storage
payload_json = Column(JSON, nullable=True) # NULL after purge, kept for retries
purged_at = Column(DateTime(timezone=True), nullable=True) # When JSON was purged
# Metadata
created_at = Column(DateTime(timezone=True), nullable=False, index=True)
source_ip = Column(String(45), nullable=True)
user_agent = Column(String(500), nullable=True)
# Result tracking
created_customer_id = Column(Integer, ForeignKey("customers.id"), nullable=True)
created_reservation_id = Column(Integer, ForeignKey("reservations.id"), nullable=True)
# Relationships
webhook_endpoint = relationship("WebhookEndpoint", back_populates="webhook_requests")
hotel = relationship("Hotel")
customer = relationship("Customer")
reservation = relationship("Reservation")
__table_args__ = (
Index('idx_webhook_status_created', 'status', 'created_at'),
Index('idx_webhook_hotel_created', 'hotel_id', 'created_at'),
Index('idx_webhook_purge_candidate', 'status', 'purged_at', 'created_at'),
)