2 Commits

Author SHA1 Message Date
Jonas Linter
82d486645b Complete seperation 2025-11-19 15:10:38 +01:00
Jonas Linter
d88a53327f Migration done 2025-11-19 15:01:16 +01:00
4 changed files with 306 additions and 46 deletions

View File

@@ -0,0 +1,65 @@
"""add hashed_customer_id to reservations with cascade delete
Revision ID: 08fe946414d8
Revises: 70b2579d1d96
Create Date: 2025-11-19 14:57:27.178924
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '08fe946414d8'
down_revision: Union[str, Sequence[str], None] = '70b2579d1d96'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('hashed_customers', 'customer_id',
existing_type=sa.INTEGER(),
nullable=True)
op.drop_constraint(op.f('hashed_customers_customer_id_fkey'), 'hashed_customers', type_='foreignkey')
op.create_foreign_key(None, 'hashed_customers', 'customers', ['customer_id'], ['id'], ondelete='SET NULL')
op.drop_constraint(op.f('reservations_customer_id_fkey'), 'reservations', type_='foreignkey')
op.create_foreign_key(None, 'reservations', 'customers', ['customer_id'], ['id'], ondelete='SET NULL')
# Add hashed_customer_id column to reservations with cascade delete
op.add_column('reservations', sa.Column('hashed_customer_id', sa.Integer(), nullable=True))
op.create_index(op.f('ix_reservations_hashed_customer_id'), 'reservations', ['hashed_customer_id'], unique=False)
op.create_foreign_key(None, 'reservations', 'hashed_customers', ['hashed_customer_id'], ['id'], ondelete='CASCADE')
# ### end Alembic commands ###
# Data migration: Populate hashed_customer_id from customer relationship
connection = op.get_bind()
update_stmt = sa.text("""
UPDATE reservations r
SET hashed_customer_id = hc.id
FROM hashed_customers hc
WHERE r.customer_id = hc.customer_id
AND hc.customer_id IS NOT NULL
""")
connection.execute(update_stmt)
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
# Drop the hashed_customer_id column and its constraints
op.drop_constraint(None, 'reservations', type_='foreignkey')
op.drop_index(op.f('ix_reservations_hashed_customer_id'), table_name='reservations')
op.drop_column('reservations', 'hashed_customer_id')
op.drop_constraint(None, 'reservations', type_='foreignkey')
op.create_foreign_key(op.f('reservations_customer_id_fkey'), 'reservations', 'customers', ['customer_id'], ['id'])
op.drop_constraint(None, 'hashed_customers', type_='foreignkey')
op.create_foreign_key(op.f('hashed_customers_customer_id_fkey'), 'hashed_customers', 'customers', ['customer_id'], ['id'])
op.alter_column('hashed_customers', 'customer_id',
existing_type=sa.INTEGER(),
nullable=False)
# ### end Alembic commands ###

View File

@@ -203,19 +203,32 @@ class ConversionService:
if self.session_maker: if self.session_maker:
await session.close() await session.close()
# Process active reservations # Process active reservations in two phases:
# Phase 1: Create/update all conversion records
# Phase 2: Match them to existing reservations/customers
reservations = root.findall("reservation") reservations = root.findall("reservation")
stats["total_reservations"] = len(reservations) stats["total_reservations"] = len(reservations)
if not reservations: if not reservations:
return stats return stats
# Use concurrent processing if supported, otherwise sequential # Phase 1: Create/update all conversions (no matching, no XML parsing beyond this point)
if self.supports_concurrent: if self.supports_concurrent:
await self._process_reservations_concurrent(reservations, stats) await self._process_reservations_concurrent(reservations, stats)
else: else:
await self._process_reservations_sequential(reservations, stats) await self._process_reservations_sequential(reservations, stats)
# Collect PMS reservation IDs from Phase 1 for Phase 2
pms_reservation_ids = [res.get("id") for res in reservations]
# Phase 2: Match all conversions using database data only
# No XML parsing, no re-hashing - complete separation of concerns
# This also enables matching historical data that wasn't just created
if self.supports_concurrent:
await self._match_conversions_from_db_concurrent(pms_reservation_ids, stats)
else:
await self._match_conversions_from_db_sequential(pms_reservation_ids, stats)
return stats return stats
async def _load_reservation_cache(self) -> None: async def _load_reservation_cache(self) -> None:
@@ -253,7 +266,8 @@ class ConversionService:
# Load all reservations with their hashed customers in one query # Load all reservations with their hashed customers in one query
from sqlalchemy.orm import selectinload from sqlalchemy.orm import selectinload
query = select(Reservation).options( query = select(Reservation).options(
selectinload(Reservation.customer).selectinload(Customer.hashed_version) selectinload(Reservation.customer).selectinload(Customer.hashed_version),
selectinload(Reservation.hashed_customer)
) )
result = await session.execute(query) result = await session.execute(query)
reservations = result.scalars().all() reservations = result.scalars().all()
@@ -265,9 +279,11 @@ class ConversionService:
hotel_code = reservation.hotel_code hotel_code = reservation.hotel_code
if hotel_code not in self._reservation_cache: if hotel_code not in self._reservation_cache:
self._reservation_cache[hotel_code] = [] self._reservation_cache[hotel_code] = []
# Cache the hashed customer instead of raw customer # Cache the hashed customer - prefer direct relationship, fall back to customer relationship
hashed_customer = None hashed_customer = None
if reservation.customer and reservation.customer.hashed_version: if reservation.hashed_customer:
hashed_customer = reservation.hashed_customer
elif reservation.customer and reservation.customer.hashed_version:
hashed_customer = reservation.customer.hashed_version hashed_customer = reservation.customer.hashed_version
self._reservation_cache[hotel_code].append( self._reservation_cache[hotel_code].append(
(reservation, hashed_customer) (reservation, hashed_customer)
@@ -329,6 +345,7 @@ class ConversionService:
) -> None: ) -> None:
"""Safely process a single reservation with semaphore and transaction management. """Safely process a single reservation with semaphore and transaction management.
Phase 1: Creation/update only (no matching).
In concurrent mode, creates its own session from SessionMaker. In concurrent mode, creates its own session from SessionMaker.
In sequential mode, uses the shared session. In sequential mode, uses the shared session.
@@ -348,26 +365,16 @@ class ConversionService:
session = self.session session = self.session
try: try:
# Process reservation with this task's session # Phase 1: Create/update conversion record (no matching)
reservation_stats = await self._process_reservation( reservation_stats = await self._create_or_update_conversion(
reservation_elem, session reservation_elem, session
) )
stats["total_daily_sales"] += reservation_stats["daily_sales_count"] stats["total_daily_sales"] += reservation_stats["daily_sales_count"]
stats["matched_to_reservation"] += reservation_stats.get(
"matched_to_reservation", 0
)
stats["matched_to_customer"] += reservation_stats.get(
"matched_to_customer", 0
)
stats["matched_to_hashed_customer"] += reservation_stats.get(
"matched_to_hashed_customer", 0
)
stats["unmatched"] += reservation_stats.get("unmatched", 0)
# Commit this task's transaction # Commit this task's transaction
await session.commit() await session.commit()
_LOGGER.debug( _LOGGER.debug(
"Successfully processed and committed reservation %s", "Successfully created/updated conversion for reservation %s",
pms_reservation_id, pms_reservation_id,
) )
@@ -417,27 +424,23 @@ class ConversionService:
pms_reservation_id, pms_reservation_id,
) )
async def _process_reservation( async def _create_or_update_conversion(
self, reservation_elem: ET.Element, session: AsyncSession | None = None self, reservation_elem: ET.Element, session: AsyncSession | None = None
) -> dict[str, int]: ) -> dict[str, int]:
"""Process a single reservation element and its daily sales. """Create or update a conversion record from XML (Phase 1: no matching).
Args: Args:
reservation_elem: XML element to process reservation_elem: XML element to process
session: AsyncSession to use. If None, uses self.session. session: AsyncSession to use. If None, uses self.session.
In concurrent mode, each task passes its own session. In concurrent mode, each task passes its own session.
Returns statistics about what was matched. Returns statistics about daily sales processed.
""" """
if session is None: if session is None:
session = self.session session = self.session
stats = { stats = {
"daily_sales_count": 0, "daily_sales_count": 0,
"matched_to_reservation": 0,
"matched_to_customer": 0,
"matched_to_hashed_customer": 0,
"unmatched": 0,
} }
# Extract reservation metadata # Extract reservation metadata
@@ -757,26 +760,6 @@ class ConversionService:
num_adults, num_adults,
) )
# Now that conversion, conversion_guest, and conversion_room records exist,
# perform matching using hashed guest data
match_stats = await self._match_conversion(
conversion,
guest_first_name,
guest_last_name,
guest_email,
advertising_campagne,
advertising_partner,
hotel_id,
reservation_date,
session,
)
# Update stats
stats["matched_to_reservation"] = match_stats["matched_to_reservation"]
stats["matched_to_customer"] = match_stats["matched_to_customer"]
stats["matched_to_hashed_customer"] = match_stats["matched_to_hashed_customer"]
stats["unmatched"] = match_stats["unmatched"]
return stats return stats
async def _match_conversion( async def _match_conversion(
@@ -1305,3 +1288,200 @@ class ConversionService:
# No single clear match found # No single clear match found
return None return None
async def _match_conversions_from_db_sequential(
self, pms_reservation_ids: list[str], stats: dict[str, int]
) -> None:
"""Phase 2: Match conversions sequentially using database data only."""
semaphore = asyncio.Semaphore(1) # Process one at a time
async with asyncio.TaskGroup() as tg:
for pms_id in pms_reservation_ids:
tg.create_task(
self._match_conversion_from_db_safe(pms_id, semaphore, stats)
)
async def _match_conversions_from_db_concurrent(
self, pms_reservation_ids: list[str], stats: dict[str, int]
) -> None:
"""Phase 2: Match conversions concurrently using database data only.
Each concurrent task gets its own independent database session
from the SessionMaker.
"""
if not self.session_maker:
_LOGGER.error(
"Concurrent matching requested but SessionMaker not available. "
"Falling back to sequential matching."
)
await self._match_conversions_from_db_sequential(pms_reservation_ids, stats)
return
semaphore = asyncio.Semaphore(MAX_CONCURRENT_RESERVATIONS)
async with asyncio.TaskGroup() as tg:
for pms_id in pms_reservation_ids:
tg.create_task(
self._match_conversion_from_db_safe(pms_id, semaphore, stats)
)
async def _match_conversion_from_db_safe(
self,
pms_reservation_id: str,
semaphore: asyncio.Semaphore,
stats: dict[str, int],
) -> None:
"""Phase 2: Safely match a conversion using only database data with transaction management.
In concurrent mode, creates its own session from SessionMaker.
In sequential mode, uses the shared session.
Args:
pms_reservation_id: PMS reservation ID to match
semaphore: Semaphore to limit concurrent operations
stats: Shared stats dictionary (thread-safe due to GIL)
"""
async with semaphore:
# In concurrent mode, create a new session for this task
if self.session_maker:
session = await self.session_maker.create_session()
else:
session = self.session
try:
# Phase 2: Match conversion using only database data
match_stats = await self._match_conversion_using_db_data(
pms_reservation_id, session
)
stats["matched_to_reservation"] += match_stats.get(
"matched_to_reservation", 0
)
stats["matched_to_customer"] += match_stats.get(
"matched_to_customer", 0
)
stats["matched_to_hashed_customer"] += match_stats.get(
"matched_to_hashed_customer", 0
)
stats["unmatched"] += match_stats.get("unmatched", 0)
# Commit this task's transaction
await session.commit()
_LOGGER.debug(
"Successfully matched conversion for reservation %s",
pms_reservation_id,
)
except Exception as e:
# Rollback this task's transaction
await session.rollback()
_LOGGER.exception(
"Error matching conversion for reservation %s: %s",
pms_reservation_id,
e,
)
stats["errors"] += 1
finally:
# Close the session if it was created by SessionMaker
if self.session_maker:
await session.close()
async def _match_conversion_using_db_data(
self,
pms_reservation_id: str,
session: AsyncSession | None = None,
) -> dict[str, int]:
"""Phase 2: Match a conversion using only persisted database data.
This method reads both the conversion and conversion_guest from the database
and uses their stored hashed data to match to existing reservations/customers.
No XML parsing, no re-hashing - complete separation of concerns.
This enables:
- Matching historical data that wasn't just created
- Re-running matching logic independently
- Consistent hashing (using already-hashed data from DB)
Args:
pms_reservation_id: PMS reservation ID to match
session: AsyncSession to use
Returns:
Dictionary with match statistics
"""
if session is None:
session = self.session
stats = {
"matched_to_reservation": 0,
"matched_to_customer": 0,
"matched_to_hashed_customer": 0,
"unmatched": 0,
}
# Get the conversion from the database with related data
result = await session.execute(
select(Conversion)
.where(Conversion.pms_reservation_id == pms_reservation_id)
.options(selectinload(Conversion.guest))
)
conversion = result.scalar_one_or_none()
if not conversion:
_LOGGER.warning(
"Conversion not found for pms_reservation_id=%s during matching phase",
pms_reservation_id,
)
return stats
# Get conversion_guest if it exists (has the hashed data)
conversion_guest = conversion.guest
# Extract hashed data from conversion_guest (already hashed)
hashed_first_name = None
hashed_last_name = None
hashed_email = None
if conversion_guest:
hashed_first_name = conversion_guest.hashed_first_name
hashed_last_name = conversion_guest.hashed_last_name
hashed_email = conversion_guest.hashed_email
# Perform matching using already-hashed data from database
match_result = await self._find_matching_entities(
conversion.advertising_campagne,
conversion.hotel_id,
conversion.reservation_date,
hashed_first_name,
hashed_last_name,
hashed_email,
conversion.advertising_partner,
session,
)
matched_reservation = match_result["reservation"]
matched_customer = match_result["customer"]
matched_hashed_customer = match_result["hashed_customer"]
# Update the conversion with matched entities if found
if matched_reservation or matched_customer or matched_hashed_customer:
conversion.reservation_id = (
matched_reservation.id if matched_reservation else None
)
conversion.customer_id = (
matched_customer.id if matched_customer else None
)
conversion.hashed_customer_id = (
matched_hashed_customer.id if matched_hashed_customer else None
)
conversion.updated_at = datetime.now()
# Update stats
if matched_reservation:
stats["matched_to_reservation"] = 1
if matched_customer:
stats["matched_to_customer"] = 1
if matched_hashed_customer:
stats["matched_to_hashed_customer"] = 1
if not any([matched_reservation, matched_customer, matched_hashed_customer]):
stats["unmatched"] = 1
return stats

View File

@@ -478,6 +478,7 @@ class Reservation(Base):
__tablename__ = "reservations" __tablename__ = "reservations"
id = Column(Integer, primary_key=True) id = Column(Integer, primary_key=True)
customer_id = Column(Integer, ForeignKey("customers.id", ondelete="SET NULL")) customer_id = Column(Integer, ForeignKey("customers.id", ondelete="SET NULL"))
hashed_customer_id = Column(Integer, ForeignKey("hashed_customers.id", ondelete="CASCADE"))
unique_id = Column(String, unique=True) unique_id = Column(String, unique=True)
md5_unique_id = Column(String(32), unique=True) # max length 32 guaranteed md5_unique_id = Column(String(32), unique=True) # max length 32 guaranteed
start_date = Column(Date) start_date = Column(Date)
@@ -507,6 +508,7 @@ class Reservation(Base):
room_classification_code = Column(String) room_classification_code = Column(String)
room_type = Column(String) room_type = Column(String)
customer = relationship("Customer", back_populates="reservations") customer = relationship("Customer", back_populates="reservations")
hashed_customer = relationship("HashedCustomer", backref="reservations")
# Table for tracking acknowledged requests by client # Table for tracking acknowledged requests by client

View File

@@ -7,7 +7,7 @@ from typing import Optional
from sqlalchemy import and_, select from sqlalchemy import and_, select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from .db import AckedRequest, Customer, Reservation from .db import AckedRequest, Customer, HashedCustomer, Reservation
from .schemas import ReservationData from .schemas import ReservationData
@@ -63,6 +63,19 @@ class ReservationService:
reservation = self._convert_reservation_data_to_db( reservation = self._convert_reservation_data_to_db(
reservation_data, customer_id reservation_data, customer_id
) )
# Automatically populate hashed_customer_id from the customer
# Since hashed_customer is always created when a customer is created,
# we can get it by querying for the hashed_customer with matching customer_id
hashed_customer_result = await self.session.execute(
select(HashedCustomer).where(
HashedCustomer.customer_id == customer_id
)
)
hashed_customer = hashed_customer_result.scalar_one_or_none()
if hashed_customer:
reservation.hashed_customer_id = hashed_customer.id
self.session.add(reservation) self.session.add(reservation)
if auto_commit: if auto_commit: