2 Commits

Author SHA1 Message Date
Jonas Linter
f3978381df Matching guests works nicely 2025-11-17 14:25:53 +01:00
Jonas Linter
24067847b4 Done but not really complete 2025-11-17 10:32:26 +01:00
8 changed files with 339507 additions and 229 deletions

6
.gitignore vendored
View File

@@ -31,6 +31,12 @@ config/postgres.yaml
# ignore db # ignore db
alpinebits.db alpinebits.db
# ignore sql
*.sql
*.csv
# test output files # test output files
test_output.txt test_output.txt
output.xml output.xml

338636
config/alpinebits.log Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -2,54 +2,55 @@
# Use annotatedyaml for secrets and environment-specific overrides # Use annotatedyaml for secrets and environment-specific overrides
database: database:
url: "sqlite+aiosqlite:///alpinebits.db" # For local dev, use SQLite. For prod, override with PostgreSQL URL. url: "postgresql+asyncpg://meta_user:meta_password@localhost:5555/meta_insights"
# url: "postgresql+asyncpg://user:password@host:port/dbname" # Example for Postgres schema: "alpinebits"
# schema: "alpinebits" # Optional: PostgreSQL schema name (default: public)
# AlpineBits Python config # AlpineBits Python config
# Use annotatedyaml for secrets and environment-specific overrides # Use annotatedyaml for secrets and environment-specific overrides
logger:
level: "INFO" # Set to DEBUG for more verbose output
file: "config/alpinebits.log" # Log file path, or null for console only
server: server:
codecontext: "ADVERTISING" codecontext: "ADVERTISING"
code: 70597314 code: 70597314
companyname: "99tales Gmbh" companyname: "99tales Gmbh"
res_id_source_context: "99tales" res_id_source_context: "99tales"
logger:
level: "INFO" # Set to DEBUG for more verbose output
file: "alpinebits.log" # Log file path, or null for console only
alpine_bits_auth: alpine_bits_auth:
- hotel_id: "39054_001" - hotel_id: "39054_001"
hotel_name: "Bemelmans Post" hotel_name: "Bemelmans Post"
username: "bemelman" username: "bemelman"
password: !secret BEMELMANS_PASSWORD password: !secret BEMELMANS_PASSWORD
meta_account: null # Optional: Meta advertising account ID meta_account: "238334370765317"
google_account: null # Optional: Google advertising account ID google_account: "7581209925" # Optional: Meta advertising account ID
- hotel_id: "135" - hotel_id: "135"
hotel_name: "Testhotel" hotel_name: "Testhotel"
username: "sebastian" username: "sebastian"
password: !secret BOB_PASSWORD password: !secret BOB_PASSWORD
meta_account: null # Optional: Meta advertising account ID
google_account: null # Optional: Google advertising account ID
- hotel_id: "39052_001" - hotel_id: "39052_001"
hotel_name: "Jagthof Kaltern" hotel_name: "Jagthof Kaltern"
username: "jagthof" username: "jagthof"
password: !secret JAGTHOF_PASSWORD password: !secret JAGTHOF_PASSWORD
meta_account: null # Optional: Meta advertising account ID meta_account: "948363300784757"
google_account: null # Optional: Google advertising account ID google_account: "1951919786" # Optional: Meta advertising account ID
- hotel_id: "39040_001" - hotel_id: "39040_001"
hotel_name: "Residence Erika" hotel_name: "Residence Erika"
username: "erika" username: "erika"
password: !secret ERIKA_PASSWORD password: !secret ERIKA_PASSWORD
meta_account: null # Optional: Meta advertising account ID google_account: "6604634947"
google_account: null # Optional: Google advertising account ID
api_tokens: api_tokens:
- tLTI8wXF1OVEvUX7kdZRhSW3Qr5feBCz0mHo-kbnEp0 - tLTI8wXF1OVEvUX7kdZRhSW3Qr5feBCz0mHo-kbnEp0
# Email configuration (SMTP service config - kept for when port is unblocked) # Email configuration (SMTP service config - kept for when port is unblocked)
email: email:
# SMTP server configuration # SMTP server configuration
@@ -81,18 +82,18 @@ notifications:
#- type: "email" #- type: "email"
# address: "jonas@vaius.ai" # address: "jonas@vaius.ai"
- type: "pushover" - type: "pushover"
priority: 1 # Pushover priority: -2=lowest, -1=low, 0=normal, 1=high, 2=emergency priority: 0 # Pushover priority: -2=lowest, -1=low, 0=normal, 1=high, 2=emergency
# Daily report configuration (applies to all recipients) # Daily report configuration (applies to all recipients)
daily_report: daily_report:
enabled: true # Set to true to enable daily reports enabled: false # Set to true to enable daily reports
send_time: "08:00" # Time to send daily report (24h format, local time) send_time: "08:00" # Time to send daily report (24h format, local time)
include_stats: true # Include reservation/customer stats include_stats: true # Include reservation/customer stats
include_errors: true # Include error summary include_errors: true # Include error summary
# Error alert configuration (applies to all recipients) # Error alert configuration (applies to all recipients)
error_alerts: error_alerts:
enabled: true # Set to true to enable error alerts enabled: false # Set to true to enable error alerts
# Alert is sent immediately if threshold is reached # Alert is sent immediately if threshold is reached
error_threshold: 5 # Send immediate alert after N errors error_threshold: 5 # Send immediate alert after N errors
# Otherwise, alert is sent after buffer time expires # Otherwise, alert is sent after buffer time expires
@@ -103,3 +104,5 @@ notifications:
log_levels: log_levels:
- "ERROR" - "ERROR"
- "CRITICAL" - "CRITICAL"

View File

@@ -43,7 +43,7 @@ from .config_loader import load_config
from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT, HttpStatusCode from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT, HttpStatusCode
from .conversion_service import ConversionService from .conversion_service import ConversionService
from .customer_service import CustomerService from .customer_service import CustomerService
from .db import Base, ResilientAsyncSession, create_database_engine from .db import Base, ResilientAsyncSession, SessionMaker, create_database_engine
from .db import Customer as DBCustomer from .db import Customer as DBCustomer
from .db import Reservation as DBReservation from .db import Reservation as DBReservation
from .email_monitoring import ReservationStatsCollector from .email_monitoring import ReservationStatsCollector
@@ -295,9 +295,13 @@ async def lifespan(app: FastAPI):
# Create resilient session wrapper for automatic connection recovery # Create resilient session wrapper for automatic connection recovery
resilient_session = ResilientAsyncSession(AsyncSessionLocal, engine) resilient_session = ResilientAsyncSession(AsyncSessionLocal, engine)
# Create SessionMaker for concurrent processing
session_maker = SessionMaker(AsyncSessionLocal)
app.state.engine = engine app.state.engine = engine
app.state.async_sessionmaker = AsyncSessionLocal app.state.async_sessionmaker = AsyncSessionLocal
app.state.resilient_session = resilient_session app.state.resilient_session = resilient_session
app.state.session_maker = session_maker
app.state.config = config app.state.config = config
app.state.alpine_bits_server = AlpineBitsServer(config) app.state.alpine_bits_server = AlpineBitsServer(config)
app.state.event_dispatcher = event_dispatcher app.state.event_dispatcher = event_dispatcher
@@ -409,6 +413,17 @@ async def get_async_session(request: Request):
yield session yield session
def get_session_maker(request: Request) -> SessionMaker:
"""Get the SessionMaker for creating independent database sessions.
This dependency provides a SessionMaker that can be used to create
multiple independent sessions for concurrent processing tasks.
Useful for processing large datasets concurrently where each task
needs its own database transaction context.
"""
return request.app.state.session_maker
def get_resilient_session(request: Request) -> ResilientAsyncSession: def get_resilient_session(request: Request) -> ResilientAsyncSession:
"""Get the resilient session manager from app state. """Get the resilient session manager from app state.
@@ -1256,6 +1271,7 @@ async def handle_xml_upload(
filename: str, filename: str,
credentials_tupel: tuple = Depends(validate_basic_auth), credentials_tupel: tuple = Depends(validate_basic_auth),
db_session=Depends(get_async_session), db_session=Depends(get_async_session),
session_maker: SessionMaker = Depends(get_session_maker),
): ):
"""Endpoint for receiving XML files for conversion processing via PUT. """Endpoint for receiving XML files for conversion processing via PUT.
@@ -1344,7 +1360,9 @@ async def handle_xml_upload(
) )
# Process the conversion XML and save to database # Process the conversion XML and save to database
conversion_service = ConversionService(db_session) # Use SessionMaker for concurrent processing of large XML files
# This allows multiple reservations to be processed in parallel with independent sessions
conversion_service = ConversionService(session_maker)
processing_stats = await conversion_service.process_conversion_xml(xml_content) processing_stats = await conversion_service.process_conversion_xml(xml_content)
_LOGGER.info( _LOGGER.info(

File diff suppressed because it is too large Load Diff

View File

@@ -3,7 +3,7 @@ import hashlib
import os import os
from typing import Any, AsyncGenerator, Callable, TypeVar from typing import Any, AsyncGenerator, Callable, TypeVar
from sqlalchemy import Boolean, Column, Date, DateTime, ForeignKey, Integer, String from sqlalchemy import Boolean, Column, Date, DateTime, ForeignKey, Integer, String, JSON
from sqlalchemy.exc import DBAPIError from sqlalchemy.exc import DBAPIError
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine, async_sessionmaker from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import declarative_base, relationship from sqlalchemy.orm import declarative_base, relationship
@@ -187,6 +187,33 @@ class ResilientAsyncSession:
raise last_error raise last_error
class SessionMaker:
"""Factory for creating independent AsyncSession instances.
This class enables concurrent processing by allowing each task to create
and manage its own database session. Useful for processing large datasets
where concurrent execution is desired but each concurrent task needs its own
database transaction context.
"""
def __init__(self, async_sessionmaker_: async_sessionmaker[AsyncSession]):
"""Initialize the SessionMaker.
Args:
async_sessionmaker_: SQLAlchemy async_sessionmaker factory
"""
self.async_sessionmaker = async_sessionmaker_
async def create_session(self) -> AsyncSession:
"""Create a new independent AsyncSession.
Returns:
A new AsyncSession instance ready for use. Caller is responsible
for managing the session lifecycle (closing when done).
"""
return self.async_sessionmaker()
async def get_resilient_session( async def get_resilient_session(
resilient_session: "ResilientAsyncSession", resilient_session: "ResilientAsyncSession",
) -> AsyncGenerator[AsyncSession, None]: ) -> AsyncGenerator[AsyncSession, None]:
@@ -337,11 +364,14 @@ class AckedRequest(Base):
class Conversion(Base): class Conversion(Base):
"""Conversion/daily sales data from hotel PMS. """Conversion data from hotel PMS.
Tracks actual sales revenue for reservations. Each row represents one day Represents a single reservation event from the PMS XML with all its metadata.
of a reservation stay. Linked to reservations via advertising tracking data Each row links to one reservation from the PMS system. A reservation can have
(fbclid, gclid, etc) stored in advertisingCampagne field. multiple room reservations (stored in RoomReservation table).
Linked to reservations via advertising tracking data (fbclid, gclid, etc)
stored in advertisingCampagne field.
""" """
__tablename__ = "conversions" __tablename__ = "conversions"
@@ -382,30 +412,67 @@ class Conversion(Base):
String, index=True String, index=True
) # advertisingCampagne (contains fbclid/gclid) ) # advertisingCampagne (contains fbclid/gclid)
# Room reservation details
arrival_date = Column(Date)
departure_date = Column(Date)
room_status = Column(String) # status attribute (e.g., "reserved", "checked-in")
room_type = Column(String) # roomType attribute
room_number = Column(String) # roomNumber attribute
num_adults = Column(Integer) # adults attribute
rate_plan_code = Column(String) # ratePlanCode attribute
# Daily sales data (one row per day)
sale_date = Column(Date, index=True) # date attribute from dailySale
revenue_total = Column(
String
) # revenueTotal - keeping as string to preserve decimals
revenue_logis = Column(String) # revenueLogis (accommodation)
revenue_board = Column(String) # revenueBoard (meal plan)
revenue_fb = Column(String) # revenueFB (food & beverage)
revenue_spa = Column(String) # revenueSpa
revenue_other = Column(String) # revenueOther
# Metadata # Metadata
created_at = Column(DateTime(timezone=True)) # When this record was imported created_at = Column(DateTime(timezone=True)) # When this record was imported
updated_at = Column(DateTime(timezone=True)) # When this record was last updated
# Relationships # Relationships
reservation = relationship("Reservation", backref="conversions") reservation = relationship("Reservation", backref="conversions")
customer = relationship("Customer", backref="conversions") customer = relationship("Customer", backref="conversions")
hashed_customer = relationship("HashedCustomer", backref="conversions") hashed_customer = relationship("HashedCustomer", backref="conversions")
room_reservations = relationship(
"RoomReservation", back_populates="conversion", cascade="all, delete-orphan"
)
class RoomReservation(Base):
"""Room reservation data from hotel PMS.
Represents a single room reservation within a conversion/PMS reservation.
One conversion can have multiple room reservations (e.g., customer books 3 rooms).
Daily sales are stored as a JSON blob with an extracted total_revenue field
for efficient querying.
"""
__tablename__ = "room_reservations"
id = Column(Integer, primary_key=True)
# Link to the parent conversion/PMS reservation
conversion_id = Column(
Integer, ForeignKey("conversions.id"), nullable=False, index=True
)
# Identifier for this room reservation (for upserts)
# Composite: pms_reservation_id + room_number
# Note: Not globally unique - same room number can exist across different hotels
pms_hotel_reservation_id = Column(String, index=True)
# Room reservation details
arrival_date = Column(Date, index=True) # arrival attribute
departure_date = Column(Date, index=True) # departure attribute
room_status = Column(String) # status attribute (e.g., "reserved", "departed")
room_type = Column(String) # roomType attribute (e.g., "VDS", "EZR")
room_number = Column(String, index=True) # roomNumber attribute
num_adults = Column(Integer) # adults attribute
rate_plan_code = Column(String) # ratePlanCode attribute
connected_room_type = Column(String) # connectedRoomType attribute
# Daily sales data stored as JSON
# Format: [
# {"date": "2021-10-09", "revenueTotal": "13.6", "revenueOther": "13.6"},
# {"date": "2021-10-10", "revenueTotal": "306.1", "revenueLogis": "254", ...},
# ...
# ]
daily_sales = Column(JSON, nullable=True) # JSON array of daily sales
# Extracted total revenue for efficient querying (sum of all revenue_total in daily_sales)
# Kept as string to preserve decimal precision
total_revenue = Column(String, nullable=True)
# Metadata
created_at = Column(DateTime(timezone=True)) # When this record was imported
updated_at = Column(DateTime(timezone=True)) # When this record was last updated
# Relationships
conversion = relationship("Conversion", back_populates="room_reservations")

View File

@@ -308,41 +308,35 @@ async def _backfill_acked_requests_username(engine: AsyncEngine, config: dict[st
_LOGGER.info("Backfill complete: %d acknowledgements updated with username", total_updated) _LOGGER.info("Backfill complete: %d acknowledgements updated with username", total_updated)
async def migrate_add_guest_fields_to_conversions(engine: AsyncEngine) -> None: async def migrate_normalize_conversions(engine: AsyncEngine) -> None:
"""Migration: Add guest information fields to conversions table. """Migration: Normalize conversions and room reservations structure.
This migration adds guest details from the PMS XML for improved matching: This migration redesigns the conversion data structure:
- guest_first_name: First name of the guest - conversions: One row per PMS reservation (with guest/advertising metadata)
- guest_last_name: Last name of the guest - room_reservations: One row per room reservation (linked to conversion)
- guest_email: Email address of the guest - daily_sales: JSON array of daily sales within each room reservation
- guest_country_code: Country code of the guest - total_revenue: Extracted sum of all daily sales for efficiency
These fields are indexed to support efficient matching when the same Old structure: One row per daily sale (denormalized, lots of duplication)
fbclid/gclid matches multiple reservations. New structure: One row per room reservation, daily sales as JSON with extracted total
Safe to run multiple times - will skip if columns already exist. This allows:
- Upserts on room reservations (same room doesn't get duplicated)
- Better tracking of room data separate from daily sales data
- Efficient querying via extracted total_revenue field
- All daily sales details preserved in JSON for analysis
The tables are created via Base.metadata.create_all() at startup.
Safe to run multiple times - idempotent.
""" """
_LOGGER.info("Running migration: add_guest_fields_to_conversions") _LOGGER.info("Running migration: normalize_conversions")
_LOGGER.info(
added_count = 0 "Conversion data structure redesigned: "
"conversions (1 per PMS reservation) + "
# Add each column if it doesn't exist "room_reservations (1 per room, daily_sales as JSON). "
if await add_column_if_not_exists(engine, "conversions", "guest_first_name", "VARCHAR"): "Tables created/updated via Base.metadata.create_all()"
added_count += 1 )
if await add_column_if_not_exists(engine, "conversions", "guest_last_name", "VARCHAR"):
added_count += 1
if await add_column_if_not_exists(engine, "conversions", "guest_email", "VARCHAR"):
added_count += 1
if await add_column_if_not_exists(engine, "conversions", "guest_country_code", "VARCHAR"):
added_count += 1
if added_count > 0:
_LOGGER.info("Migration add_guest_fields_to_conversions: Added %d columns", added_count)
else:
_LOGGER.info("Migration add_guest_fields_to_conversions: No changes needed (already applied)")
async def run_all_migrations(engine: AsyncEngine, config: dict[str, Any] | None = None) -> None: async def run_all_migrations(engine: AsyncEngine, config: dict[str, Any] | None = None) -> None:
@@ -362,7 +356,7 @@ async def run_all_migrations(engine: AsyncEngine, config: dict[str, Any] | None
await migrate_add_room_types(engine) await migrate_add_room_types(engine)
await migrate_add_advertising_account_ids(engine, config) await migrate_add_advertising_account_ids(engine, config)
await migrate_add_username_to_acked_requests(engine, config) await migrate_add_username_to_acked_requests(engine, config)
await migrate_add_guest_fields_to_conversions(engine) await migrate_normalize_conversions(engine)
_LOGGER.info("Database migrations completed successfully") _LOGGER.info("Database migrations completed successfully")

View File

@@ -6,9 +6,9 @@ import os
import uvicorn import uvicorn
if __name__ == "__main__": if __name__ == "__main__":
db_path = "alpinebits.db" # Adjust path if needed # db_path = "alpinebits.db" # Adjust path if needed
if os.path.exists(db_path): # if os.path.exists(db_path):
os.remove(db_path) # os.remove(db_path)
uvicorn.run( uvicorn.run(
"alpine_bits_python.api:app", "alpine_bits_python.api:app",