Files
alpinebits_python/src/alpine_bits_python/db.py
Jonas Linter 10dcbae5ad Renamed table
2025-11-18 10:09:06 +01:00

510 lines
18 KiB
Python

import asyncio
import hashlib
import os
from collections.abc import AsyncGenerator, Callable
from typing import TypeVar
from sqlalchemy import (
JSON,
Boolean,
Column,
Date,
DateTime,
ForeignKey,
Integer,
String,
)
from sqlalchemy.exc import DBAPIError
from sqlalchemy.ext.asyncio import (
AsyncEngine,
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.orm import declarative_base, relationship
from .logging_config import get_logger
_LOGGER = get_logger(__name__)
Base = declarative_base()
# Type variable for async functions
T = TypeVar("T")
# Maximum number of retries for session operations
MAX_RETRIES = 3
# Delay between retries in seconds
RETRY_DELAY = 0.5
# Async SQLAlchemy setup
def get_database_url(config=None):
db_url = None
if config and "database" in config and "url" in config["database"]:
db_url = config["database"]["url"]
if not db_url:
db_url = os.environ.get("DATABASE_URL")
if not db_url:
db_url = "sqlite+aiosqlite:///alpinebits.db"
return db_url
def get_database_schema(config=None):
"""Get the PostgreSQL schema name from config.
Args:
config: Configuration dictionary
Returns:
Schema name string, or None if not configured
"""
if config and "database" in config and "schema" in config["database"]:
return config["database"]["schema"]
return os.environ.get("DATABASE_SCHEMA")
def configure_schema(schema_name=None):
"""Configure the database schema for all models.
This should be called before creating tables or running migrations.
For PostgreSQL, this sets the schema for all tables.
For other databases, this is a no-op.
Args:
schema_name: Name of the schema to use (e.g., "alpinebits")
"""
if schema_name:
# Update the schema for all tables in Base metadata
for table in Base.metadata.tables.values():
table.schema = schema_name
def create_database_engine(config=None, echo=False) -> AsyncEngine:
"""Create a configured database engine with schema support.
This function:
1. Gets the database URL from config
2. Gets the schema name (if configured)
3. Configures all models to use the schema
4. Creates the async engine with appropriate connect_args for PostgreSQL
Args:
config: Configuration dictionary
echo: Whether to echo SQL statements (default: False)
Returns:
Configured AsyncEngine instance
"""
database_url = get_database_url(config)
schema_name = get_database_schema(config)
# Configure schema for all models if specified
if schema_name:
configure_schema(schema_name)
_LOGGER.info("Configured database schema: %s", schema_name)
# Create engine with connect_args to set search_path for PostgreSQL
connect_args = {}
if schema_name and "postgresql" in database_url:
connect_args = {"server_settings": {"search_path": f"{schema_name},public"}}
_LOGGER.info("Setting PostgreSQL search_path to: %s,public", schema_name)
return create_async_engine(database_url, echo=echo, connect_args=connect_args)
class ResilientAsyncSession:
"""Wrapper around AsyncSession that handles connection recovery.
This wrapper automatically retries operations on connection loss or OID errors,
disposing the connection pool and creating a fresh session on failure.
"""
def __init__(
self,
async_sessionmaker_: async_sessionmaker[AsyncSession],
engine: AsyncEngine,
):
"""Initialize the resilient session wrapper.
Args:
async_sessionmaker_: Factory for creating async sessions
engine: The SQLAlchemy async engine for connection recovery
"""
self.async_sessionmaker = async_sessionmaker_
self.engine = engine
async def execute_with_retry(self, func: Callable[..., T], *args, **kwargs) -> T:
"""Execute a function with automatic retry on connection errors.
Args:
func: Async function that takes a session as first argument
*args: Positional arguments to pass to func (first arg should be session)
**kwargs: Keyword arguments to pass to func
Returns:
Result of the function call
Raises:
The original exception if all retries are exhausted
"""
last_error = None
for attempt in range(MAX_RETRIES):
try:
async with self.async_sessionmaker() as session:
return await func(session, *args, **kwargs)
except DBAPIError as e:
last_error = e
error_msg = str(e).lower()
# Check if this is an OID error or connection loss
if (
"could not open relation" in error_msg
or "lost connection" in error_msg
or "connection closed" in error_msg
or "connection refused" in error_msg
):
_LOGGER.warning(
"Connection error on attempt %d/%d: %s. Disposing pool and retrying...",
attempt + 1,
MAX_RETRIES,
e.__class__.__name__,
)
# Dispose the entire connection pool to force new connections
await self.engine.dispose()
# Wait before retry (exponential backoff)
if attempt < MAX_RETRIES - 1:
wait_time = RETRY_DELAY * (2**attempt)
await asyncio.sleep(wait_time)
else:
# Not a connection-related error, re-raise immediately
raise
except Exception:
# Any other exception, re-raise immediately
raise
# All retries exhausted
_LOGGER.error(
"Failed to execute query after %d retries: %s",
MAX_RETRIES,
last_error.__class__.__name__,
)
raise last_error
class SessionMaker:
"""Factory for creating independent AsyncSession instances.
This class enables concurrent processing by allowing each task to create
and manage its own database session. Useful for processing large datasets
where concurrent execution is desired but each concurrent task needs its own
database transaction context.
"""
def __init__(self, async_sessionmaker_: async_sessionmaker[AsyncSession]):
"""Initialize the SessionMaker.
Args:
async_sessionmaker_: SQLAlchemy async_sessionmaker factory
"""
self.async_sessionmaker = async_sessionmaker_
async def create_session(self) -> AsyncSession:
"""Create a new independent AsyncSession.
Returns:
A new AsyncSession instance ready for use. Caller is responsible
for managing the session lifecycle (closing when done).
"""
return self.async_sessionmaker()
async def get_resilient_session(
resilient_session: "ResilientAsyncSession",
) -> AsyncGenerator[AsyncSession]:
"""Dependency for FastAPI that provides a resilient async session.
This generator creates a new session with automatic retry capability
on connection errors. Used as a dependency in FastAPI endpoints.
Args:
resilient_session: ResilientAsyncSession instance from app state
Yields:
AsyncSession instance for database operations
"""
async with resilient_session.async_sessionmaker() as session:
yield session
class Customer(Base):
__tablename__ = "customers"
id = Column(Integer, primary_key=True)
given_name = Column(String)
contact_id = Column(String, unique=True)
surname = Column(String)
name_prefix = Column(String)
email_address = Column(String)
phone = Column(String)
email_newsletter = Column(Boolean)
address_line = Column(String)
city_name = Column(String)
postal_code = Column(String)
country_code = Column(String)
gender = Column(String)
birth_date = Column(String)
language = Column(String)
address_catalog = Column(Boolean) # Added for XML
name_title = Column(String) # Added for XML
reservations = relationship("Reservation", back_populates="customer")
def __repr__(self):
return f"Customer (id={self.id}, contact_id={self.contact_id}, email={self.email_address}), given_name={self.given_name} surname={self.surname}), phone={self.phone}, city={self.city_name}), postal_code={self.postal_code}, country_code={self.country_code})"
@staticmethod
def _normalize_and_hash(value):
"""Normalize and hash a value according to Meta Conversion API requirements."""
if not value:
return None
# Normalize: lowercase, strip whitespace
normalized = str(value).lower().strip()
# Remove spaces for phone numbers
is_phone = (
normalized.startswith("+")
or normalized.replace("-", "").replace(" ", "").isdigit()
)
if is_phone:
chars_to_remove = [" ", "-", "(", ")"]
for char in chars_to_remove:
normalized = normalized.replace(char, "")
# SHA256 hash
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()
def create_hashed_customer(self):
"""Create a HashedCustomer instance from this Customer."""
return HashedCustomer(
customer_id=self.id,
contact_id=self.contact_id,
hashed_email=self._normalize_and_hash(self.email_address),
hashed_phone=self._normalize_and_hash(self.phone),
hashed_given_name=self._normalize_and_hash(self.given_name),
hashed_surname=self._normalize_and_hash(self.surname),
hashed_city=self._normalize_and_hash(self.city_name),
hashed_postal_code=self._normalize_and_hash(self.postal_code),
hashed_country_code=self._normalize_and_hash(self.country_code),
hashed_gender=self._normalize_and_hash(self.gender),
hashed_birth_date=self._normalize_and_hash(self.birth_date),
)
class HashedCustomer(Base):
"""Hashed customer data for Meta Conversion API.
Stores SHA256 hashed versions of customer PII according to Meta's requirements.
This allows sending conversion events without exposing raw customer data.
"""
__tablename__ = "hashed_customers"
id = Column(Integer, primary_key=True)
customer_id = Column(
Integer, ForeignKey("customers.id"), unique=True, nullable=False
)
contact_id = Column(String, unique=True) # Keep unhashed for reference
hashed_email = Column(String(64)) # SHA256 produces 64 hex chars
hashed_phone = Column(String(64))
hashed_given_name = Column(String(64))
hashed_surname = Column(String(64))
hashed_city = Column(String(64))
hashed_postal_code = Column(String(64))
hashed_country_code = Column(String(64))
hashed_gender = Column(String(64))
hashed_birth_date = Column(String(64))
created_at = Column(DateTime(timezone=True))
customer = relationship("Customer", backref="hashed_version")
class Reservation(Base):
__tablename__ = "reservations"
id = Column(Integer, primary_key=True)
customer_id = Column(Integer, ForeignKey("customers.id"))
unique_id = Column(String, unique=True)
md5_unique_id = Column(String(32), unique=True) # max length 32 guaranteed
start_date = Column(Date)
end_date = Column(Date)
num_adults = Column(Integer)
num_children = Column(Integer)
children_ages = Column(String) # comma-separated
offer = Column(String)
created_at = Column(DateTime(timezone=True))
# Add all UTM fields and user comment for XML
utm_source = Column(String)
utm_medium = Column(String)
utm_campaign = Column(String)
utm_term = Column(String)
utm_content = Column(String)
user_comment = Column(String)
fbclid = Column(String)
gclid = Column(String)
# Advertising account IDs (stored conditionally based on fbclid/gclid presence)
meta_account_id = Column(String)
google_account_id = Column(String)
# Add hotel_code and hotel_name for XML
hotel_code = Column(String)
hotel_name = Column(String)
# RoomTypes fields (optional)
room_type_code = Column(String)
room_classification_code = Column(String)
room_type = Column(String)
customer = relationship("Customer", back_populates="reservations")
# Table for tracking acknowledged requests by client
class AckedRequest(Base):
"""Tracks which Reservations the Client has already seen via ReadAction.
Clients can report successfull transfers via ReportNotifAction. This gets stored in this table.
This prevents re-sending the same reservation multiple times to the client.
"""
__tablename__ = "acked_requests"
id = Column(Integer, primary_key=True)
client_id = Column(String, index=True)
username = Column(
String, index=True, nullable=True
) # Username of the client making the request
unique_id = Column(
String, index=True
) # Should match Reservation.form_id or another unique field
timestamp = Column(DateTime(timezone=True))
class Conversion(Base):
"""Conversion data from hotel PMS.
Represents a single reservation event from the PMS XML with all its metadata.
Each row links to one reservation from the PMS system. A reservation can have
multiple room reservations (stored in ConversionRoom table).
Linked to reservations via advertising tracking data (fbclid, gclid, etc)
stored in advertisingCampagne field.
The tracking data transferered by the PMS is however somewhat shorter.
We therefore also need to match on guest name/email and other metadata.
"""
__tablename__ = "conversions"
id = Column(Integer, primary_key=True)
# Link to reservation (nullable since matching may not always work)
reservation_id = Column(
Integer, ForeignKey("reservations.id"), nullable=True, index=True
)
customer_id = Column(Integer, ForeignKey("customers.id"), nullable=True, index=True)
hashed_customer_id = Column(
Integer, ForeignKey("hashed_customers.id"), nullable=True, index=True
)
# Reservation metadata from XML
hotel_id = Column(String, index=True) # hotelID attribute
pms_reservation_id = Column(String, index=True) # id attribute from reservation
reservation_number = Column(String) # number attribute
reservation_date = Column(Date) # date attribute (when reservation was made)
creation_time = Column(DateTime(timezone=True)) # creationTime attribute
reservation_type = Column(String) # type attribute (e.g., "reservation")
booking_channel = Column(String) # bookingChannel attribute
# Guest information from reservation XML - used for matching
guest_first_name = Column(String, index=True) # firstName from guest element
guest_last_name = Column(String, index=True) # lastName from guest element
guest_email = Column(String, index=True) # email from guest element
guest_country_code = Column(String) # countryCode from guest element
# Advertising/tracking data - used for matching to existing reservations
advertising_medium = Column(
String, index=True
) # advertisingMedium (e.g., "99TALES")
advertising_partner = Column(
String, index=True
) # advertisingPartner (e.g., "cpc", "website")
advertising_campagne = Column(
String, index=True
) # advertisingCampagne (contains fbclid/gclid)
# Metadata
created_at = Column(DateTime(timezone=True)) # When this record was imported
updated_at = Column(DateTime(timezone=True)) # When this record was last updated
# Relationships
reservation = relationship("Reservation", backref="conversions")
customer = relationship("Customer", backref="conversions")
hashed_customer = relationship("HashedCustomer", backref="conversions")
conversion_rooms = relationship(
"ConversionRoom", back_populates="conversion", cascade="all, delete-orphan"
)
class ConversionRoom(Base):
"""Room reservation data from hotel PMS.
Represents a single room reservation within a conversion/PMS reservation.
One conversion can have multiple room reservations (e.g., customer books 3 rooms).
Daily sales are stored as a JSON blob with an extracted total_revenue field
for efficient querying.
"""
__tablename__ = "conversion_rooms"
id = Column(Integer, primary_key=True)
# Link to the parent conversion/PMS reservation
conversion_id = Column(
Integer, ForeignKey("conversions.id"), nullable=False, index=True
)
# Identifier for this room reservation (for upserts)
# Composite: pms_reservation_id + room_number
# Note: Not globally unique - same room number can exist across different hotels
pms_hotel_reservation_id = Column(String, index=True)
# Room reservation details
arrival_date = Column(Date, index=True) # arrival attribute
departure_date = Column(Date, index=True) # departure attribute
room_status = Column(String) # status attribute (e.g., "reserved", "departed")
room_type = Column(String) # roomType attribute (e.g., "VDS", "EZR")
room_number = Column(String, index=True) # roomNumber attribute
num_adults = Column(Integer) # adults attribute
rate_plan_code = Column(String) # ratePlanCode attribute
connected_room_type = Column(String) # connectedRoomType attribute
# Daily sales data stored as JSON
# Format: [
# {"date": "2021-10-09", "revenueTotal": "13.6", "revenueOther": "13.6"},
# {"date": "2021-10-10", "revenueTotal": "306.1", "revenueLogis": "254", ...},
# ...
# ]
daily_sales = Column(JSON, nullable=True) # JSON array of daily sales
# Extracted total revenue for efficient querying (sum of all revenue_total in daily_sales)
# Kept as string to preserve decimal precision
total_revenue = Column(String, nullable=True)
# Metadata
created_at = Column(DateTime(timezone=True)) # When this record was imported
updated_at = Column(DateTime(timezone=True)) # When this record was last updated
# Relationships
conversion = relationship("Conversion", back_populates="conversion_rooms")