diff --git a/src/alpine_bits_python/db.py b/src/alpine_bits_python/db.py index e7964b8..1bca0ce 100644 --- a/src/alpine_bits_python/db.py +++ b/src/alpine_bits_python/db.py @@ -1,11 +1,26 @@ import asyncio import hashlib import os -from typing import Any, AsyncGenerator, Callable, TypeVar +from collections.abc import AsyncGenerator, Callable +from typing import TypeVar -from sqlalchemy import Boolean, Column, Date, DateTime, ForeignKey, Integer, String, JSON +from sqlalchemy import ( + JSON, + Boolean, + Column, + Date, + DateTime, + ForeignKey, + Integer, + String, +) from sqlalchemy.exc import DBAPIError -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine, async_sessionmaker +from sqlalchemy.ext.asyncio import ( + AsyncEngine, + AsyncSession, + async_sessionmaker, + create_async_engine, +) from sqlalchemy.orm import declarative_base, relationship from .logging_config import get_logger @@ -95,9 +110,7 @@ def create_database_engine(config=None, echo=False) -> AsyncEngine: # Create engine with connect_args to set search_path for PostgreSQL connect_args = {} if schema_name and "postgresql" in database_url: - connect_args = { - "server_settings": {"search_path": f"{schema_name},public"} - } + connect_args = {"server_settings": {"search_path": f"{schema_name},public"}} _LOGGER.info("Setting PostgreSQL search_path to: %s,public", schema_name) return create_async_engine(database_url, echo=echo, connect_args=connect_args) @@ -120,13 +133,12 @@ class ResilientAsyncSession: Args: async_sessionmaker_: Factory for creating async sessions engine: The SQLAlchemy async engine for connection recovery + """ self.async_sessionmaker = async_sessionmaker_ self.engine = engine - async def execute_with_retry( - self, func: Callable[..., T], *args, **kwargs - ) -> T: + async def execute_with_retry(self, func: Callable[..., T], *args, **kwargs) -> T: """Execute a function with automatic retry on connection errors. Args: @@ -139,6 +151,7 @@ class ResilientAsyncSession: Raises: The original exception if all retries are exhausted + """ last_error = None @@ -169,7 +182,7 @@ class ResilientAsyncSession: # Wait before retry (exponential backoff) if attempt < MAX_RETRIES - 1: - wait_time = RETRY_DELAY * (2 ** attempt) + wait_time = RETRY_DELAY * (2**attempt) await asyncio.sleep(wait_time) else: # Not a connection-related error, re-raise immediately @@ -201,6 +214,7 @@ class SessionMaker: Args: async_sessionmaker_: SQLAlchemy async_sessionmaker factory + """ self.async_sessionmaker = async_sessionmaker_ @@ -210,13 +224,14 @@ class SessionMaker: Returns: A new AsyncSession instance ready for use. Caller is responsible for managing the session lifecycle (closing when done). + """ return self.async_sessionmaker() async def get_resilient_session( resilient_session: "ResilientAsyncSession", -) -> AsyncGenerator[AsyncSession, None]: +) -> AsyncGenerator[AsyncSession]: """Dependency for FastAPI that provides a resilient async session. This generator creates a new session with automatic retry capability @@ -227,6 +242,7 @@ async def get_resilient_session( Yields: AsyncSession instance for database operations + """ async with resilient_session.async_sessionmaker() as session: yield session @@ -356,10 +372,19 @@ class Reservation(Base): # Table for tracking acknowledged requests by client class AckedRequest(Base): + """Tracks which Reservations the Client has already seen via ReadAction. + + Clients can report successfull transfers via ReportNotifAction. This gets stored in this table. + This prevents re-sending the same reservation multiple times to the client. + + """ + __tablename__ = "acked_requests" id = Column(Integer, primary_key=True) client_id = Column(String, index=True) - username = Column(String, index=True, nullable=True) # Username of the client making the request + username = Column( + String, index=True, nullable=True + ) # Username of the client making the request unique_id = Column( String, index=True ) # Should match Reservation.form_id or another unique field @@ -371,10 +396,13 @@ class Conversion(Base): Represents a single reservation event from the PMS XML with all its metadata. Each row links to one reservation from the PMS system. A reservation can have - multiple room reservations (stored in RoomReservation table). + multiple room reservations (stored in ConversionRoom table). Linked to reservations via advertising tracking data (fbclid, gclid, etc) stored in advertisingCampagne field. + The tracking data transferered by the PMS is however somewhat shorter. + We therefore also need to match on guest name/email and other metadata. + """ __tablename__ = "conversions" @@ -423,12 +451,12 @@ class Conversion(Base): reservation = relationship("Reservation", backref="conversions") customer = relationship("Customer", backref="conversions") hashed_customer = relationship("HashedCustomer", backref="conversions") - room_reservations = relationship( - "RoomReservation", back_populates="conversion", cascade="all, delete-orphan" + conversion_rooms = relationship( + "ConversionRoom", back_populates="conversion", cascade="all, delete-orphan" ) -class RoomReservation(Base): +class ConversionRoom(Base): """Room reservation data from hotel PMS. Represents a single room reservation within a conversion/PMS reservation. @@ -438,7 +466,7 @@ class RoomReservation(Base): for efficient querying. """ - __tablename__ = "room_reservations" + __tablename__ = "conversion_rooms" id = Column(Integer, primary_key=True) # Link to the parent conversion/PMS reservation @@ -478,4 +506,4 @@ class RoomReservation(Base): updated_at = Column(DateTime(timezone=True)) # When this record was last updated # Relationships - conversion = relationship("Conversion", back_populates="room_reservations") + conversion = relationship("Conversion", back_populates="conversion_rooms")