2 Commits

Author SHA1 Message Date
Jonas Linter
f3978381df Matching guests works nicely 2025-11-17 14:25:53 +01:00
Jonas Linter
24067847b4 Done but not really complete 2025-11-17 10:32:26 +01:00
8 changed files with 339507 additions and 229 deletions

6
.gitignore vendored
View File

@@ -31,6 +31,12 @@ config/postgres.yaml
# ignore db
alpinebits.db
# ignore sql
*.sql
*.csv
# test output files
test_output.txt
output.xml

338636
config/alpinebits.log Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -2,54 +2,55 @@
# Use annotatedyaml for secrets and environment-specific overrides
database:
url: "sqlite+aiosqlite:///alpinebits.db" # For local dev, use SQLite. For prod, override with PostgreSQL URL.
# url: "postgresql+asyncpg://user:password@host:port/dbname" # Example for Postgres
# schema: "alpinebits" # Optional: PostgreSQL schema name (default: public)
url: "postgresql+asyncpg://meta_user:meta_password@localhost:5555/meta_insights"
schema: "alpinebits"
# AlpineBits Python config
# Use annotatedyaml for secrets and environment-specific overrides
logger:
level: "INFO" # Set to DEBUG for more verbose output
file: "config/alpinebits.log" # Log file path, or null for console only
server:
codecontext: "ADVERTISING"
code: 70597314
companyname: "99tales Gmbh"
res_id_source_context: "99tales"
logger:
level: "INFO" # Set to DEBUG for more verbose output
file: "alpinebits.log" # Log file path, or null for console only
alpine_bits_auth:
- hotel_id: "39054_001"
hotel_name: "Bemelmans Post"
username: "bemelman"
password: !secret BEMELMANS_PASSWORD
meta_account: null # Optional: Meta advertising account ID
google_account: null # Optional: Google advertising account ID
meta_account: "238334370765317"
google_account: "7581209925" # Optional: Meta advertising account ID
- hotel_id: "135"
hotel_name: "Testhotel"
username: "sebastian"
password: !secret BOB_PASSWORD
meta_account: null # Optional: Meta advertising account ID
google_account: null # Optional: Google advertising account ID
- hotel_id: "39052_001"
hotel_name: "Jagthof Kaltern"
username: "jagthof"
password: !secret JAGTHOF_PASSWORD
meta_account: null # Optional: Meta advertising account ID
google_account: null # Optional: Google advertising account ID
meta_account: "948363300784757"
google_account: "1951919786" # Optional: Meta advertising account ID
- hotel_id: "39040_001"
hotel_name: "Residence Erika"
username: "erika"
password: !secret ERIKA_PASSWORD
meta_account: null # Optional: Meta advertising account ID
google_account: null # Optional: Google advertising account ID
google_account: "6604634947"
api_tokens:
- tLTI8wXF1OVEvUX7kdZRhSW3Qr5feBCz0mHo-kbnEp0
# Email configuration (SMTP service config - kept for when port is unblocked)
email:
# SMTP server configuration
@@ -81,18 +82,18 @@ notifications:
#- type: "email"
# address: "jonas@vaius.ai"
- type: "pushover"
priority: 1 # Pushover priority: -2=lowest, -1=low, 0=normal, 1=high, 2=emergency
priority: 0 # Pushover priority: -2=lowest, -1=low, 0=normal, 1=high, 2=emergency
# Daily report configuration (applies to all recipients)
daily_report:
enabled: true # Set to true to enable daily reports
enabled: false # Set to true to enable daily reports
send_time: "08:00" # Time to send daily report (24h format, local time)
include_stats: true # Include reservation/customer stats
include_errors: true # Include error summary
# Error alert configuration (applies to all recipients)
error_alerts:
enabled: true # Set to true to enable error alerts
enabled: false # Set to true to enable error alerts
# Alert is sent immediately if threshold is reached
error_threshold: 5 # Send immediate alert after N errors
# Otherwise, alert is sent after buffer time expires
@@ -103,3 +104,5 @@ notifications:
log_levels:
- "ERROR"
- "CRITICAL"

View File

@@ -43,7 +43,7 @@ from .config_loader import load_config
from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT, HttpStatusCode
from .conversion_service import ConversionService
from .customer_service import CustomerService
from .db import Base, ResilientAsyncSession, create_database_engine
from .db import Base, ResilientAsyncSession, SessionMaker, create_database_engine
from .db import Customer as DBCustomer
from .db import Reservation as DBReservation
from .email_monitoring import ReservationStatsCollector
@@ -295,9 +295,13 @@ async def lifespan(app: FastAPI):
# Create resilient session wrapper for automatic connection recovery
resilient_session = ResilientAsyncSession(AsyncSessionLocal, engine)
# Create SessionMaker for concurrent processing
session_maker = SessionMaker(AsyncSessionLocal)
app.state.engine = engine
app.state.async_sessionmaker = AsyncSessionLocal
app.state.resilient_session = resilient_session
app.state.session_maker = session_maker
app.state.config = config
app.state.alpine_bits_server = AlpineBitsServer(config)
app.state.event_dispatcher = event_dispatcher
@@ -409,6 +413,17 @@ async def get_async_session(request: Request):
yield session
def get_session_maker(request: Request) -> SessionMaker:
"""Get the SessionMaker for creating independent database sessions.
This dependency provides a SessionMaker that can be used to create
multiple independent sessions for concurrent processing tasks.
Useful for processing large datasets concurrently where each task
needs its own database transaction context.
"""
return request.app.state.session_maker
def get_resilient_session(request: Request) -> ResilientAsyncSession:
"""Get the resilient session manager from app state.
@@ -1256,6 +1271,7 @@ async def handle_xml_upload(
filename: str,
credentials_tupel: tuple = Depends(validate_basic_auth),
db_session=Depends(get_async_session),
session_maker: SessionMaker = Depends(get_session_maker),
):
"""Endpoint for receiving XML files for conversion processing via PUT.
@@ -1344,7 +1360,9 @@ async def handle_xml_upload(
)
# Process the conversion XML and save to database
conversion_service = ConversionService(db_session)
# Use SessionMaker for concurrent processing of large XML files
# This allows multiple reservations to be processed in parallel with independent sessions
conversion_service = ConversionService(session_maker)
processing_stats = await conversion_service.process_conversion_xml(xml_content)
_LOGGER.info(

File diff suppressed because it is too large Load Diff

View File

@@ -3,7 +3,7 @@ import hashlib
import os
from typing import Any, AsyncGenerator, Callable, TypeVar
from sqlalchemy import Boolean, Column, Date, DateTime, ForeignKey, Integer, String
from sqlalchemy import Boolean, Column, Date, DateTime, ForeignKey, Integer, String, JSON
from sqlalchemy.exc import DBAPIError
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import declarative_base, relationship
@@ -187,6 +187,33 @@ class ResilientAsyncSession:
raise last_error
class SessionMaker:
"""Factory for creating independent AsyncSession instances.
This class enables concurrent processing by allowing each task to create
and manage its own database session. Useful for processing large datasets
where concurrent execution is desired but each concurrent task needs its own
database transaction context.
"""
def __init__(self, async_sessionmaker_: async_sessionmaker[AsyncSession]):
"""Initialize the SessionMaker.
Args:
async_sessionmaker_: SQLAlchemy async_sessionmaker factory
"""
self.async_sessionmaker = async_sessionmaker_
async def create_session(self) -> AsyncSession:
"""Create a new independent AsyncSession.
Returns:
A new AsyncSession instance ready for use. Caller is responsible
for managing the session lifecycle (closing when done).
"""
return self.async_sessionmaker()
async def get_resilient_session(
resilient_session: "ResilientAsyncSession",
) -> AsyncGenerator[AsyncSession, None]:
@@ -337,11 +364,14 @@ class AckedRequest(Base):
class Conversion(Base):
"""Conversion/daily sales data from hotel PMS.
"""Conversion data from hotel PMS.
Tracks actual sales revenue for reservations. Each row represents one day
of a reservation stay. Linked to reservations via advertising tracking data
(fbclid, gclid, etc) stored in advertisingCampagne field.
Represents a single reservation event from the PMS XML with all its metadata.
Each row links to one reservation from the PMS system. A reservation can have
multiple room reservations (stored in RoomReservation table).
Linked to reservations via advertising tracking data (fbclid, gclid, etc)
stored in advertisingCampagne field.
"""
__tablename__ = "conversions"
@@ -382,30 +412,67 @@ class Conversion(Base):
String, index=True
) # advertisingCampagne (contains fbclid/gclid)
# Room reservation details
arrival_date = Column(Date)
departure_date = Column(Date)
room_status = Column(String) # status attribute (e.g., "reserved", "checked-in")
room_type = Column(String) # roomType attribute
room_number = Column(String) # roomNumber attribute
num_adults = Column(Integer) # adults attribute
rate_plan_code = Column(String) # ratePlanCode attribute
# Daily sales data (one row per day)
sale_date = Column(Date, index=True) # date attribute from dailySale
revenue_total = Column(
String
) # revenueTotal - keeping as string to preserve decimals
revenue_logis = Column(String) # revenueLogis (accommodation)
revenue_board = Column(String) # revenueBoard (meal plan)
revenue_fb = Column(String) # revenueFB (food & beverage)
revenue_spa = Column(String) # revenueSpa
revenue_other = Column(String) # revenueOther
# Metadata
created_at = Column(DateTime(timezone=True)) # When this record was imported
updated_at = Column(DateTime(timezone=True)) # When this record was last updated
# Relationships
reservation = relationship("Reservation", backref="conversions")
customer = relationship("Customer", backref="conversions")
hashed_customer = relationship("HashedCustomer", backref="conversions")
room_reservations = relationship(
"RoomReservation", back_populates="conversion", cascade="all, delete-orphan"
)
class RoomReservation(Base):
"""Room reservation data from hotel PMS.
Represents a single room reservation within a conversion/PMS reservation.
One conversion can have multiple room reservations (e.g., customer books 3 rooms).
Daily sales are stored as a JSON blob with an extracted total_revenue field
for efficient querying.
"""
__tablename__ = "room_reservations"
id = Column(Integer, primary_key=True)
# Link to the parent conversion/PMS reservation
conversion_id = Column(
Integer, ForeignKey("conversions.id"), nullable=False, index=True
)
# Identifier for this room reservation (for upserts)
# Composite: pms_reservation_id + room_number
# Note: Not globally unique - same room number can exist across different hotels
pms_hotel_reservation_id = Column(String, index=True)
# Room reservation details
arrival_date = Column(Date, index=True) # arrival attribute
departure_date = Column(Date, index=True) # departure attribute
room_status = Column(String) # status attribute (e.g., "reserved", "departed")
room_type = Column(String) # roomType attribute (e.g., "VDS", "EZR")
room_number = Column(String, index=True) # roomNumber attribute
num_adults = Column(Integer) # adults attribute
rate_plan_code = Column(String) # ratePlanCode attribute
connected_room_type = Column(String) # connectedRoomType attribute
# Daily sales data stored as JSON
# Format: [
# {"date": "2021-10-09", "revenueTotal": "13.6", "revenueOther": "13.6"},
# {"date": "2021-10-10", "revenueTotal": "306.1", "revenueLogis": "254", ...},
# ...
# ]
daily_sales = Column(JSON, nullable=True) # JSON array of daily sales
# Extracted total revenue for efficient querying (sum of all revenue_total in daily_sales)
# Kept as string to preserve decimal precision
total_revenue = Column(String, nullable=True)
# Metadata
created_at = Column(DateTime(timezone=True)) # When this record was imported
updated_at = Column(DateTime(timezone=True)) # When this record was last updated
# Relationships
conversion = relationship("Conversion", back_populates="room_reservations")

View File

@@ -308,41 +308,35 @@ async def _backfill_acked_requests_username(engine: AsyncEngine, config: dict[st
_LOGGER.info("Backfill complete: %d acknowledgements updated with username", total_updated)
async def migrate_add_guest_fields_to_conversions(engine: AsyncEngine) -> None:
"""Migration: Add guest information fields to conversions table.
async def migrate_normalize_conversions(engine: AsyncEngine) -> None:
"""Migration: Normalize conversions and room reservations structure.
This migration adds guest details from the PMS XML for improved matching:
- guest_first_name: First name of the guest
- guest_last_name: Last name of the guest
- guest_email: Email address of the guest
- guest_country_code: Country code of the guest
This migration redesigns the conversion data structure:
- conversions: One row per PMS reservation (with guest/advertising metadata)
- room_reservations: One row per room reservation (linked to conversion)
- daily_sales: JSON array of daily sales within each room reservation
- total_revenue: Extracted sum of all daily sales for efficiency
These fields are indexed to support efficient matching when the same
fbclid/gclid matches multiple reservations.
Old structure: One row per daily sale (denormalized, lots of duplication)
New structure: One row per room reservation, daily sales as JSON with extracted total
Safe to run multiple times - will skip if columns already exist.
This allows:
- Upserts on room reservations (same room doesn't get duplicated)
- Better tracking of room data separate from daily sales data
- Efficient querying via extracted total_revenue field
- All daily sales details preserved in JSON for analysis
The tables are created via Base.metadata.create_all() at startup.
Safe to run multiple times - idempotent.
"""
_LOGGER.info("Running migration: add_guest_fields_to_conversions")
added_count = 0
# Add each column if it doesn't exist
if await add_column_if_not_exists(engine, "conversions", "guest_first_name", "VARCHAR"):
added_count += 1
if await add_column_if_not_exists(engine, "conversions", "guest_last_name", "VARCHAR"):
added_count += 1
if await add_column_if_not_exists(engine, "conversions", "guest_email", "VARCHAR"):
added_count += 1
if await add_column_if_not_exists(engine, "conversions", "guest_country_code", "VARCHAR"):
added_count += 1
if added_count > 0:
_LOGGER.info("Migration add_guest_fields_to_conversions: Added %d columns", added_count)
else:
_LOGGER.info("Migration add_guest_fields_to_conversions: No changes needed (already applied)")
_LOGGER.info("Running migration: normalize_conversions")
_LOGGER.info(
"Conversion data structure redesigned: "
"conversions (1 per PMS reservation) + "
"room_reservations (1 per room, daily_sales as JSON). "
"Tables created/updated via Base.metadata.create_all()"
)
async def run_all_migrations(engine: AsyncEngine, config: dict[str, Any] | None = None) -> None:
@@ -362,7 +356,7 @@ async def run_all_migrations(engine: AsyncEngine, config: dict[str, Any] | None
await migrate_add_room_types(engine)
await migrate_add_advertising_account_ids(engine, config)
await migrate_add_username_to_acked_requests(engine, config)
await migrate_add_guest_fields_to_conversions(engine)
await migrate_normalize_conversions(engine)
_LOGGER.info("Database migrations completed successfully")

View File

@@ -6,9 +6,9 @@ import os
import uvicorn
if __name__ == "__main__":
db_path = "alpinebits.db" # Adjust path if needed
if os.path.exists(db_path):
os.remove(db_path)
# db_path = "alpinebits.db" # Adjust path if needed
# if os.path.exists(db_path):
# os.remove(db_path)
uvicorn.run(
"alpine_bits_python.api:app",