Compare commits
2 Commits
6c46c566ed
...
82d486645b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
82d486645b | ||
|
|
d88a53327f |
@@ -0,0 +1,65 @@
|
||||
"""add hashed_customer_id to reservations with cascade delete
|
||||
|
||||
Revision ID: 08fe946414d8
|
||||
Revises: 70b2579d1d96
|
||||
Create Date: 2025-11-19 14:57:27.178924
|
||||
|
||||
"""
|
||||
from typing import Sequence, Union
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = '08fe946414d8'
|
||||
down_revision: Union[str, Sequence[str], None] = '70b2579d1d96'
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
"""Upgrade schema."""
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.alter_column('hashed_customers', 'customer_id',
|
||||
existing_type=sa.INTEGER(),
|
||||
nullable=True)
|
||||
op.drop_constraint(op.f('hashed_customers_customer_id_fkey'), 'hashed_customers', type_='foreignkey')
|
||||
op.create_foreign_key(None, 'hashed_customers', 'customers', ['customer_id'], ['id'], ondelete='SET NULL')
|
||||
op.drop_constraint(op.f('reservations_customer_id_fkey'), 'reservations', type_='foreignkey')
|
||||
op.create_foreign_key(None, 'reservations', 'customers', ['customer_id'], ['id'], ondelete='SET NULL')
|
||||
|
||||
# Add hashed_customer_id column to reservations with cascade delete
|
||||
op.add_column('reservations', sa.Column('hashed_customer_id', sa.Integer(), nullable=True))
|
||||
op.create_index(op.f('ix_reservations_hashed_customer_id'), 'reservations', ['hashed_customer_id'], unique=False)
|
||||
op.create_foreign_key(None, 'reservations', 'hashed_customers', ['hashed_customer_id'], ['id'], ondelete='CASCADE')
|
||||
# ### end Alembic commands ###
|
||||
|
||||
# Data migration: Populate hashed_customer_id from customer relationship
|
||||
connection = op.get_bind()
|
||||
update_stmt = sa.text("""
|
||||
UPDATE reservations r
|
||||
SET hashed_customer_id = hc.id
|
||||
FROM hashed_customers hc
|
||||
WHERE r.customer_id = hc.customer_id
|
||||
AND hc.customer_id IS NOT NULL
|
||||
""")
|
||||
connection.execute(update_stmt)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
"""Downgrade schema."""
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
# Drop the hashed_customer_id column and its constraints
|
||||
op.drop_constraint(None, 'reservations', type_='foreignkey')
|
||||
op.drop_index(op.f('ix_reservations_hashed_customer_id'), table_name='reservations')
|
||||
op.drop_column('reservations', 'hashed_customer_id')
|
||||
|
||||
op.drop_constraint(None, 'reservations', type_='foreignkey')
|
||||
op.create_foreign_key(op.f('reservations_customer_id_fkey'), 'reservations', 'customers', ['customer_id'], ['id'])
|
||||
op.drop_constraint(None, 'hashed_customers', type_='foreignkey')
|
||||
op.create_foreign_key(op.f('hashed_customers_customer_id_fkey'), 'hashed_customers', 'customers', ['customer_id'], ['id'])
|
||||
op.alter_column('hashed_customers', 'customer_id',
|
||||
existing_type=sa.INTEGER(),
|
||||
nullable=False)
|
||||
# ### end Alembic commands ###
|
||||
@@ -203,19 +203,32 @@ class ConversionService:
|
||||
if self.session_maker:
|
||||
await session.close()
|
||||
|
||||
# Process active reservations
|
||||
# Process active reservations in two phases:
|
||||
# Phase 1: Create/update all conversion records
|
||||
# Phase 2: Match them to existing reservations/customers
|
||||
reservations = root.findall("reservation")
|
||||
stats["total_reservations"] = len(reservations)
|
||||
|
||||
if not reservations:
|
||||
return stats
|
||||
|
||||
# Use concurrent processing if supported, otherwise sequential
|
||||
# Phase 1: Create/update all conversions (no matching, no XML parsing beyond this point)
|
||||
if self.supports_concurrent:
|
||||
await self._process_reservations_concurrent(reservations, stats)
|
||||
else:
|
||||
await self._process_reservations_sequential(reservations, stats)
|
||||
|
||||
# Collect PMS reservation IDs from Phase 1 for Phase 2
|
||||
pms_reservation_ids = [res.get("id") for res in reservations]
|
||||
|
||||
# Phase 2: Match all conversions using database data only
|
||||
# No XML parsing, no re-hashing - complete separation of concerns
|
||||
# This also enables matching historical data that wasn't just created
|
||||
if self.supports_concurrent:
|
||||
await self._match_conversions_from_db_concurrent(pms_reservation_ids, stats)
|
||||
else:
|
||||
await self._match_conversions_from_db_sequential(pms_reservation_ids, stats)
|
||||
|
||||
return stats
|
||||
|
||||
async def _load_reservation_cache(self) -> None:
|
||||
@@ -253,7 +266,8 @@ class ConversionService:
|
||||
# Load all reservations with their hashed customers in one query
|
||||
from sqlalchemy.orm import selectinload
|
||||
query = select(Reservation).options(
|
||||
selectinload(Reservation.customer).selectinload(Customer.hashed_version)
|
||||
selectinload(Reservation.customer).selectinload(Customer.hashed_version),
|
||||
selectinload(Reservation.hashed_customer)
|
||||
)
|
||||
result = await session.execute(query)
|
||||
reservations = result.scalars().all()
|
||||
@@ -265,9 +279,11 @@ class ConversionService:
|
||||
hotel_code = reservation.hotel_code
|
||||
if hotel_code not in self._reservation_cache:
|
||||
self._reservation_cache[hotel_code] = []
|
||||
# Cache the hashed customer instead of raw customer
|
||||
# Cache the hashed customer - prefer direct relationship, fall back to customer relationship
|
||||
hashed_customer = None
|
||||
if reservation.customer and reservation.customer.hashed_version:
|
||||
if reservation.hashed_customer:
|
||||
hashed_customer = reservation.hashed_customer
|
||||
elif reservation.customer and reservation.customer.hashed_version:
|
||||
hashed_customer = reservation.customer.hashed_version
|
||||
self._reservation_cache[hotel_code].append(
|
||||
(reservation, hashed_customer)
|
||||
@@ -329,6 +345,7 @@ class ConversionService:
|
||||
) -> None:
|
||||
"""Safely process a single reservation with semaphore and transaction management.
|
||||
|
||||
Phase 1: Creation/update only (no matching).
|
||||
In concurrent mode, creates its own session from SessionMaker.
|
||||
In sequential mode, uses the shared session.
|
||||
|
||||
@@ -348,26 +365,16 @@ class ConversionService:
|
||||
session = self.session
|
||||
|
||||
try:
|
||||
# Process reservation with this task's session
|
||||
reservation_stats = await self._process_reservation(
|
||||
# Phase 1: Create/update conversion record (no matching)
|
||||
reservation_stats = await self._create_or_update_conversion(
|
||||
reservation_elem, session
|
||||
)
|
||||
stats["total_daily_sales"] += reservation_stats["daily_sales_count"]
|
||||
stats["matched_to_reservation"] += reservation_stats.get(
|
||||
"matched_to_reservation", 0
|
||||
)
|
||||
stats["matched_to_customer"] += reservation_stats.get(
|
||||
"matched_to_customer", 0
|
||||
)
|
||||
stats["matched_to_hashed_customer"] += reservation_stats.get(
|
||||
"matched_to_hashed_customer", 0
|
||||
)
|
||||
stats["unmatched"] += reservation_stats.get("unmatched", 0)
|
||||
|
||||
# Commit this task's transaction
|
||||
await session.commit()
|
||||
_LOGGER.debug(
|
||||
"Successfully processed and committed reservation %s",
|
||||
"Successfully created/updated conversion for reservation %s",
|
||||
pms_reservation_id,
|
||||
)
|
||||
|
||||
@@ -417,27 +424,23 @@ class ConversionService:
|
||||
pms_reservation_id,
|
||||
)
|
||||
|
||||
async def _process_reservation(
|
||||
async def _create_or_update_conversion(
|
||||
self, reservation_elem: ET.Element, session: AsyncSession | None = None
|
||||
) -> dict[str, int]:
|
||||
"""Process a single reservation element and its daily sales.
|
||||
"""Create or update a conversion record from XML (Phase 1: no matching).
|
||||
|
||||
Args:
|
||||
reservation_elem: XML element to process
|
||||
session: AsyncSession to use. If None, uses self.session.
|
||||
In concurrent mode, each task passes its own session.
|
||||
|
||||
Returns statistics about what was matched.
|
||||
Returns statistics about daily sales processed.
|
||||
|
||||
"""
|
||||
if session is None:
|
||||
session = self.session
|
||||
stats = {
|
||||
"daily_sales_count": 0,
|
||||
"matched_to_reservation": 0,
|
||||
"matched_to_customer": 0,
|
||||
"matched_to_hashed_customer": 0,
|
||||
"unmatched": 0,
|
||||
}
|
||||
|
||||
# Extract reservation metadata
|
||||
@@ -757,26 +760,6 @@ class ConversionService:
|
||||
num_adults,
|
||||
)
|
||||
|
||||
# Now that conversion, conversion_guest, and conversion_room records exist,
|
||||
# perform matching using hashed guest data
|
||||
match_stats = await self._match_conversion(
|
||||
conversion,
|
||||
guest_first_name,
|
||||
guest_last_name,
|
||||
guest_email,
|
||||
advertising_campagne,
|
||||
advertising_partner,
|
||||
hotel_id,
|
||||
reservation_date,
|
||||
session,
|
||||
)
|
||||
|
||||
# Update stats
|
||||
stats["matched_to_reservation"] = match_stats["matched_to_reservation"]
|
||||
stats["matched_to_customer"] = match_stats["matched_to_customer"]
|
||||
stats["matched_to_hashed_customer"] = match_stats["matched_to_hashed_customer"]
|
||||
stats["unmatched"] = match_stats["unmatched"]
|
||||
|
||||
return stats
|
||||
|
||||
async def _match_conversion(
|
||||
@@ -1305,3 +1288,200 @@ class ConversionService:
|
||||
|
||||
# No single clear match found
|
||||
return None
|
||||
|
||||
async def _match_conversions_from_db_sequential(
|
||||
self, pms_reservation_ids: list[str], stats: dict[str, int]
|
||||
) -> None:
|
||||
"""Phase 2: Match conversions sequentially using database data only."""
|
||||
semaphore = asyncio.Semaphore(1) # Process one at a time
|
||||
async with asyncio.TaskGroup() as tg:
|
||||
for pms_id in pms_reservation_ids:
|
||||
tg.create_task(
|
||||
self._match_conversion_from_db_safe(pms_id, semaphore, stats)
|
||||
)
|
||||
|
||||
async def _match_conversions_from_db_concurrent(
|
||||
self, pms_reservation_ids: list[str], stats: dict[str, int]
|
||||
) -> None:
|
||||
"""Phase 2: Match conversions concurrently using database data only.
|
||||
|
||||
Each concurrent task gets its own independent database session
|
||||
from the SessionMaker.
|
||||
"""
|
||||
if not self.session_maker:
|
||||
_LOGGER.error(
|
||||
"Concurrent matching requested but SessionMaker not available. "
|
||||
"Falling back to sequential matching."
|
||||
)
|
||||
await self._match_conversions_from_db_sequential(pms_reservation_ids, stats)
|
||||
return
|
||||
|
||||
semaphore = asyncio.Semaphore(MAX_CONCURRENT_RESERVATIONS)
|
||||
async with asyncio.TaskGroup() as tg:
|
||||
for pms_id in pms_reservation_ids:
|
||||
tg.create_task(
|
||||
self._match_conversion_from_db_safe(pms_id, semaphore, stats)
|
||||
)
|
||||
|
||||
async def _match_conversion_from_db_safe(
|
||||
self,
|
||||
pms_reservation_id: str,
|
||||
semaphore: asyncio.Semaphore,
|
||||
stats: dict[str, int],
|
||||
) -> None:
|
||||
"""Phase 2: Safely match a conversion using only database data with transaction management.
|
||||
|
||||
In concurrent mode, creates its own session from SessionMaker.
|
||||
In sequential mode, uses the shared session.
|
||||
|
||||
Args:
|
||||
pms_reservation_id: PMS reservation ID to match
|
||||
semaphore: Semaphore to limit concurrent operations
|
||||
stats: Shared stats dictionary (thread-safe due to GIL)
|
||||
|
||||
"""
|
||||
async with semaphore:
|
||||
# In concurrent mode, create a new session for this task
|
||||
if self.session_maker:
|
||||
session = await self.session_maker.create_session()
|
||||
else:
|
||||
session = self.session
|
||||
|
||||
try:
|
||||
# Phase 2: Match conversion using only database data
|
||||
match_stats = await self._match_conversion_using_db_data(
|
||||
pms_reservation_id, session
|
||||
)
|
||||
stats["matched_to_reservation"] += match_stats.get(
|
||||
"matched_to_reservation", 0
|
||||
)
|
||||
stats["matched_to_customer"] += match_stats.get(
|
||||
"matched_to_customer", 0
|
||||
)
|
||||
stats["matched_to_hashed_customer"] += match_stats.get(
|
||||
"matched_to_hashed_customer", 0
|
||||
)
|
||||
stats["unmatched"] += match_stats.get("unmatched", 0)
|
||||
|
||||
# Commit this task's transaction
|
||||
await session.commit()
|
||||
_LOGGER.debug(
|
||||
"Successfully matched conversion for reservation %s",
|
||||
pms_reservation_id,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
# Rollback this task's transaction
|
||||
await session.rollback()
|
||||
_LOGGER.exception(
|
||||
"Error matching conversion for reservation %s: %s",
|
||||
pms_reservation_id,
|
||||
e,
|
||||
)
|
||||
stats["errors"] += 1
|
||||
finally:
|
||||
# Close the session if it was created by SessionMaker
|
||||
if self.session_maker:
|
||||
await session.close()
|
||||
|
||||
async def _match_conversion_using_db_data(
|
||||
self,
|
||||
pms_reservation_id: str,
|
||||
session: AsyncSession | None = None,
|
||||
) -> dict[str, int]:
|
||||
"""Phase 2: Match a conversion using only persisted database data.
|
||||
|
||||
This method reads both the conversion and conversion_guest from the database
|
||||
and uses their stored hashed data to match to existing reservations/customers.
|
||||
No XML parsing, no re-hashing - complete separation of concerns.
|
||||
|
||||
This enables:
|
||||
- Matching historical data that wasn't just created
|
||||
- Re-running matching logic independently
|
||||
- Consistent hashing (using already-hashed data from DB)
|
||||
|
||||
Args:
|
||||
pms_reservation_id: PMS reservation ID to match
|
||||
session: AsyncSession to use
|
||||
|
||||
Returns:
|
||||
Dictionary with match statistics
|
||||
"""
|
||||
if session is None:
|
||||
session = self.session
|
||||
|
||||
stats = {
|
||||
"matched_to_reservation": 0,
|
||||
"matched_to_customer": 0,
|
||||
"matched_to_hashed_customer": 0,
|
||||
"unmatched": 0,
|
||||
}
|
||||
|
||||
# Get the conversion from the database with related data
|
||||
result = await session.execute(
|
||||
select(Conversion)
|
||||
.where(Conversion.pms_reservation_id == pms_reservation_id)
|
||||
.options(selectinload(Conversion.guest))
|
||||
)
|
||||
conversion = result.scalar_one_or_none()
|
||||
|
||||
if not conversion:
|
||||
_LOGGER.warning(
|
||||
"Conversion not found for pms_reservation_id=%s during matching phase",
|
||||
pms_reservation_id,
|
||||
)
|
||||
return stats
|
||||
|
||||
# Get conversion_guest if it exists (has the hashed data)
|
||||
conversion_guest = conversion.guest
|
||||
|
||||
# Extract hashed data from conversion_guest (already hashed)
|
||||
hashed_first_name = None
|
||||
hashed_last_name = None
|
||||
hashed_email = None
|
||||
|
||||
if conversion_guest:
|
||||
hashed_first_name = conversion_guest.hashed_first_name
|
||||
hashed_last_name = conversion_guest.hashed_last_name
|
||||
hashed_email = conversion_guest.hashed_email
|
||||
|
||||
# Perform matching using already-hashed data from database
|
||||
match_result = await self._find_matching_entities(
|
||||
conversion.advertising_campagne,
|
||||
conversion.hotel_id,
|
||||
conversion.reservation_date,
|
||||
hashed_first_name,
|
||||
hashed_last_name,
|
||||
hashed_email,
|
||||
conversion.advertising_partner,
|
||||
session,
|
||||
)
|
||||
|
||||
matched_reservation = match_result["reservation"]
|
||||
matched_customer = match_result["customer"]
|
||||
matched_hashed_customer = match_result["hashed_customer"]
|
||||
|
||||
# Update the conversion with matched entities if found
|
||||
if matched_reservation or matched_customer or matched_hashed_customer:
|
||||
conversion.reservation_id = (
|
||||
matched_reservation.id if matched_reservation else None
|
||||
)
|
||||
conversion.customer_id = (
|
||||
matched_customer.id if matched_customer else None
|
||||
)
|
||||
conversion.hashed_customer_id = (
|
||||
matched_hashed_customer.id if matched_hashed_customer else None
|
||||
)
|
||||
conversion.updated_at = datetime.now()
|
||||
|
||||
# Update stats
|
||||
if matched_reservation:
|
||||
stats["matched_to_reservation"] = 1
|
||||
if matched_customer:
|
||||
stats["matched_to_customer"] = 1
|
||||
if matched_hashed_customer:
|
||||
stats["matched_to_hashed_customer"] = 1
|
||||
if not any([matched_reservation, matched_customer, matched_hashed_customer]):
|
||||
stats["unmatched"] = 1
|
||||
|
||||
return stats
|
||||
|
||||
@@ -478,6 +478,7 @@ class Reservation(Base):
|
||||
__tablename__ = "reservations"
|
||||
id = Column(Integer, primary_key=True)
|
||||
customer_id = Column(Integer, ForeignKey("customers.id", ondelete="SET NULL"))
|
||||
hashed_customer_id = Column(Integer, ForeignKey("hashed_customers.id", ondelete="CASCADE"))
|
||||
unique_id = Column(String, unique=True)
|
||||
md5_unique_id = Column(String(32), unique=True) # max length 32 guaranteed
|
||||
start_date = Column(Date)
|
||||
@@ -507,6 +508,7 @@ class Reservation(Base):
|
||||
room_classification_code = Column(String)
|
||||
room_type = Column(String)
|
||||
customer = relationship("Customer", back_populates="reservations")
|
||||
hashed_customer = relationship("HashedCustomer", backref="reservations")
|
||||
|
||||
|
||||
# Table for tracking acknowledged requests by client
|
||||
|
||||
@@ -7,7 +7,7 @@ from typing import Optional
|
||||
from sqlalchemy import and_, select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from .db import AckedRequest, Customer, Reservation
|
||||
from .db import AckedRequest, Customer, HashedCustomer, Reservation
|
||||
from .schemas import ReservationData
|
||||
|
||||
|
||||
@@ -63,6 +63,19 @@ class ReservationService:
|
||||
reservation = self._convert_reservation_data_to_db(
|
||||
reservation_data, customer_id
|
||||
)
|
||||
|
||||
# Automatically populate hashed_customer_id from the customer
|
||||
# Since hashed_customer is always created when a customer is created,
|
||||
# we can get it by querying for the hashed_customer with matching customer_id
|
||||
hashed_customer_result = await self.session.execute(
|
||||
select(HashedCustomer).where(
|
||||
HashedCustomer.customer_id == customer_id
|
||||
)
|
||||
)
|
||||
hashed_customer = hashed_customer_result.scalar_one_or_none()
|
||||
if hashed_customer:
|
||||
reservation.hashed_customer_id = hashed_customer.id
|
||||
|
||||
self.session.add(reservation)
|
||||
|
||||
if auto_commit:
|
||||
|
||||
Reference in New Issue
Block a user