643 lines
24 KiB
Python
643 lines
24 KiB
Python
"""Tests for ConversionService using realistic test data.
|
|
|
|
This test module:
|
|
1. Uses the CSV import tests to populate the in-memory database with realistic customer/reservation data
|
|
2. Runs the XML conversion import endpoint with conversions_test_data.xml
|
|
3. Asserts baseline match counts to detect regressions in matching logic
|
|
|
|
The test data is designed to test realistic matching scenarios:
|
|
- Matching by advertising campaign data (fbclid/gclid)
|
|
- Matching by guest name and email using hashed data
|
|
- Handling unmatched conversions
|
|
- Processing daily sales revenue data
|
|
- Testing hashed matching logic and edge cases
|
|
"""
|
|
|
|
import hashlib
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
|
|
|
from alpine_bits_python.conversion_service import ConversionService
|
|
from alpine_bits_python.csv_import import CSVImporter
|
|
from alpine_bits_python.db import (
|
|
Base,
|
|
Conversion,
|
|
ConversionGuest,
|
|
ConversionRoom,
|
|
Customer,
|
|
HashedCustomer,
|
|
Reservation,
|
|
)
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def test_db_engine():
|
|
"""Create an in-memory SQLite database for testing."""
|
|
engine = create_async_engine(
|
|
"sqlite+aiosqlite:///:memory:",
|
|
echo=False,
|
|
)
|
|
|
|
# Create tables
|
|
async with engine.begin() as conn:
|
|
await conn.run_sync(Base.metadata.create_all)
|
|
|
|
yield engine
|
|
|
|
# Cleanup
|
|
await engine.dispose()
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def test_db_session(test_db_engine):
|
|
"""Create a test database session."""
|
|
async_session = async_sessionmaker(
|
|
test_db_engine,
|
|
class_=AsyncSession,
|
|
expire_on_commit=False,
|
|
)
|
|
|
|
async with async_session() as session:
|
|
yield session
|
|
|
|
|
|
@pytest.fixture
|
|
def test_config():
|
|
"""Test configuration."""
|
|
return {
|
|
"server": {
|
|
"codecontext": "ADVERTISING",
|
|
"code": "70597314",
|
|
"companyname": "99tales Gmbh",
|
|
"res_id_source_context": "99tales",
|
|
},
|
|
"alpine_bits_auth": [
|
|
{
|
|
"hotel_id": "bemelmans",
|
|
"hotel_name": "Bemelmans Apartments",
|
|
"username": "bemelmans_user",
|
|
"password": "testpass",
|
|
}
|
|
],
|
|
"default_hotel_code": "bemelmans",
|
|
"default_hotel_name": "Bemelmans Apartments",
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
def test_data_dir():
|
|
"""Return path to test data directory."""
|
|
return Path(__file__).parent / "test_data"
|
|
|
|
|
|
class TestConversionServiceWithImportedData:
|
|
"""Test ConversionService using realistic test data imported via CSV."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_conversion_import_with_csv_test_data(
|
|
self, test_db_session, test_config, test_data_dir
|
|
):
|
|
"""Test full workflow: import CSV data, then process conversions XML.
|
|
|
|
This test demonstrates the intended workflow:
|
|
1. Import CSV test data to populate customers and reservations
|
|
2. Process conversion XML file to match conversions to reservations
|
|
3. Verify match statistics to detect regressions
|
|
|
|
The conversions_test_data.xml file contains realistic conversion data
|
|
from a hotel PMS system with multiple reservations and daily sales.
|
|
"""
|
|
csv_file = test_data_dir / "leads_export.csv"
|
|
xml_file = test_data_dir / "conversions_test_data.xml"
|
|
|
|
# Skip test if data files don't exist
|
|
if not csv_file.exists():
|
|
pytest.skip(f"Test data file not found: {csv_file}")
|
|
if not xml_file.exists():
|
|
pytest.skip(f"Test data file not found: {xml_file}")
|
|
|
|
# Step 1: Import CSV data to populate database with realistic customers/reservations
|
|
importer = CSVImporter(test_db_session, test_config)
|
|
csv_stats = await importer.import_csv_file(
|
|
csv_file_path=str(csv_file),
|
|
hotel_code="bemelmans",
|
|
dryrun=False,
|
|
)
|
|
|
|
print(f"\nCSV Import Stats: {csv_stats}")
|
|
assert csv_stats["total_rows"] > 0, "CSV import should have processed rows"
|
|
assert (
|
|
csv_stats["created_reservations"] > 0
|
|
), "CSV import should create reservations"
|
|
|
|
# Step 2: Load and process conversion XML
|
|
with xml_file.open(encoding="utf-8") as f:
|
|
xml_content = f.read()
|
|
|
|
# File already has proper XML structure, just use it as-is
|
|
xml_content = xml_content.strip()
|
|
|
|
conversion_service = ConversionService(test_db_session)
|
|
stats = await conversion_service.process_conversion_xml(xml_content)
|
|
|
|
# BASELINE ASSERTIONS:
|
|
# These values are established from test runs with conversions_test_data.xml + leads_export.csv.
|
|
# If these change, it indicates a change in matching logic that needs review.
|
|
# Update these values only when intentionally changing the matching behavior.
|
|
#
|
|
# Current test data contains:
|
|
# - CSV import: 576 total rows, 535 created reservations, 41 duplicates skipped
|
|
# - XML conversions: 252 reservations with 2905 daily sales records across 539 room records
|
|
EXPECTED_TOTAL_RESERVATIONS = 252
|
|
EXPECTED_TOTAL_DAILY_SALES = 2905
|
|
EXPECTED_TOTAL_ROOMS = 539
|
|
# Note: Currently no matches by tracking ID because XML data uses different formats
|
|
# This is expected with the test data. Real PMS data would have higher match rates.
|
|
EXPECTED_MATCHED_TO_RESERVATION = 0
|
|
|
|
print(f"\nBaseline Match Counts:")
|
|
print(f" Total reservations in XML: {EXPECTED_TOTAL_RESERVATIONS}")
|
|
print(f" Total daily sales records: {EXPECTED_TOTAL_DAILY_SALES}")
|
|
print(f" Total conversion room records: {EXPECTED_TOTAL_ROOMS}")
|
|
print(f" Matched to reservation: {EXPECTED_MATCHED_TO_RESERVATION}")
|
|
match_rate = (EXPECTED_MATCHED_TO_RESERVATION / EXPECTED_TOTAL_RESERVATIONS * 100) if EXPECTED_TOTAL_RESERVATIONS > 0 else 0
|
|
print(f" Match rate: {match_rate:.1f}%")
|
|
|
|
# Verify baseline stability on subsequent runs
|
|
assert (
|
|
stats["total_reservations"] == EXPECTED_TOTAL_RESERVATIONS
|
|
), f"Total reservations should be {EXPECTED_TOTAL_RESERVATIONS}, got {stats['total_reservations']}"
|
|
assert (
|
|
stats["total_daily_sales"] == EXPECTED_TOTAL_DAILY_SALES
|
|
), f"Total daily sales should be {EXPECTED_TOTAL_DAILY_SALES}, got {stats['total_daily_sales']}"
|
|
assert (
|
|
stats["matched_to_reservation"] == EXPECTED_MATCHED_TO_RESERVATION
|
|
), f"Matched reservations should be {EXPECTED_MATCHED_TO_RESERVATION}, got {stats['matched_to_reservation']}"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_conversion_room_revenue_aggregation(
|
|
self, test_db_session, test_config, test_data_dir
|
|
):
|
|
"""Test that daily sales revenue is correctly aggregated at room level."""
|
|
csv_file = test_data_dir / "leads_export.csv"
|
|
xml_file = test_data_dir / "conversions_test_data.xml"
|
|
|
|
if not csv_file.exists():
|
|
pytest.skip(f"Test data file not found: {csv_file}")
|
|
if not xml_file.exists():
|
|
pytest.skip(f"Test data file not found: {xml_file}")
|
|
|
|
# Import CSV data
|
|
importer = CSVImporter(test_db_session, test_config)
|
|
await importer.import_csv_file(
|
|
csv_file_path=str(csv_file),
|
|
hotel_code="bemelmans",
|
|
dryrun=False,
|
|
)
|
|
|
|
# Process conversions
|
|
with xml_file.open(encoding="utf-8") as f:
|
|
xml_content = f.read()
|
|
|
|
# File already has proper XML structure, just use it as-is
|
|
xml_content = xml_content.strip()
|
|
|
|
conversion_service = ConversionService(test_db_session)
|
|
stats = await conversion_service.process_conversion_xml(xml_content)
|
|
|
|
# Verify conversions were created
|
|
from sqlalchemy import select
|
|
|
|
result = await test_db_session.execute(select(ConversionRoom))
|
|
all_rooms = result.scalars().all()
|
|
assert len(all_rooms) > 0, "Should have created conversion rooms"
|
|
|
|
# Verify there are room records even if no revenue is set
|
|
result = await test_db_session.execute(
|
|
select(ConversionRoom).where(ConversionRoom.total_revenue.isnot(None))
|
|
)
|
|
rooms_with_revenue = result.scalars().all()
|
|
|
|
# Note: Test data may not have revenue values in the XML
|
|
# The important thing is that we're capturing room-level data
|
|
print(f"\nRevenue Aggregation Stats:")
|
|
print(f" Total conversion rooms: {len(all_rooms)}")
|
|
print(f" Rooms with revenue: {len(rooms_with_revenue)}")
|
|
|
|
if rooms_with_revenue:
|
|
# Verify revenue values are numeric and positive
|
|
for room in rooms_with_revenue:
|
|
assert isinstance(
|
|
room.total_revenue, (int, float)
|
|
), f"Revenue should be numeric, got {type(room.total_revenue)}"
|
|
assert (
|
|
room.total_revenue > 0
|
|
), f"Revenue should be positive, got {room.total_revenue}"
|
|
|
|
total_revenue = sum(room.total_revenue for room in rooms_with_revenue)
|
|
print(f" Total aggregated revenue: {total_revenue}")
|
|
print(f" Average revenue per room: {total_revenue / len(rooms_with_revenue)}")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_conversion_matching_by_guest_details(
|
|
self, test_db_session, test_config, test_data_dir
|
|
):
|
|
"""Test conversion matching by guest name and email fallback.
|
|
|
|
Note: The test data may not have matching guest names/emails between
|
|
the CSV and XML files. This test primarily verifies that the matching
|
|
logic runs without errors and that the conversion service attempts to
|
|
match by guest details when advertising data is unavailable.
|
|
"""
|
|
csv_file = test_data_dir / "leads_export.csv"
|
|
xml_file = test_data_dir / "conversions_test_data.xml"
|
|
|
|
if not csv_file.exists():
|
|
pytest.skip(f"Test data file not found: {csv_file}")
|
|
if not xml_file.exists():
|
|
pytest.skip(f"Test data file not found: {xml_file}")
|
|
|
|
# Import CSV data
|
|
importer = CSVImporter(test_db_session, test_config)
|
|
csv_stats = await importer.import_csv_file(
|
|
csv_file_path=str(csv_file),
|
|
hotel_code="bemelmans",
|
|
dryrun=False,
|
|
)
|
|
|
|
assert csv_stats["created_reservations"] > 0, "Should have imported reservations"
|
|
|
|
# Process conversions
|
|
with xml_file.open(encoding="utf-8") as f:
|
|
xml_content = f.read()
|
|
|
|
# File already has proper XML structure, just use it as-is
|
|
xml_content = xml_content.strip()
|
|
|
|
conversion_service = ConversionService(test_db_session)
|
|
stats = await conversion_service.process_conversion_xml(xml_content)
|
|
|
|
# Verify conversions were processed
|
|
from sqlalchemy import select
|
|
|
|
result = await test_db_session.execute(select(Conversion))
|
|
all_conversions = result.scalars().all()
|
|
assert len(all_conversions) > 0, "Should have created conversions"
|
|
|
|
# Check for matched conversions
|
|
result = await test_db_session.execute(
|
|
select(Conversion).where(Conversion.customer_id.isnot(None))
|
|
)
|
|
conversions_with_customers = result.scalars().all()
|
|
|
|
print(f"\nGuest Detail Matching:")
|
|
print(f" Total conversions: {len(all_conversions)}")
|
|
print(f" Conversions matched to customer: {len(conversions_with_customers)}")
|
|
print(f" Stats matched_to_customer: {stats['matched_to_customer']}")
|
|
|
|
# With this test data, matches may be 0 if guest names/emails don't align
|
|
# The important thing is that the matching logic runs without errors
|
|
print(f" Note: Matches depend on data alignment between CSV and XML files")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_conversion_service_error_handling(
|
|
self, test_db_session, test_config
|
|
):
|
|
"""Test ConversionService handles invalid XML gracefully."""
|
|
invalid_xml = "<invalid>unclosed tag"
|
|
|
|
conversion_service = ConversionService(test_db_session)
|
|
|
|
with pytest.raises(ValueError, match="Invalid XML"):
|
|
await conversion_service.process_conversion_xml(invalid_xml)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_conversion_service_empty_xml(self, test_db_session, test_config):
|
|
"""Test ConversionService handles empty/minimal XML."""
|
|
minimal_xml = '<?xml version="1.0"?><root></root>'
|
|
|
|
conversion_service = ConversionService(test_db_session)
|
|
stats = await conversion_service.process_conversion_xml(minimal_xml)
|
|
|
|
assert stats["total_reservations"] == 0
|
|
assert stats["total_daily_sales"] == 0
|
|
assert stats["errors"] == 0
|
|
|
|
|
|
class TestHashedMatchingLogic:
|
|
"""Test the hashed matching logic used in ConversionService."""
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_match_without_hashed_customer(self, test_db_session):
|
|
"""Test that matching fails gracefully when customer has no hashed version."""
|
|
# Create a customer WITHOUT hashed data
|
|
customer = Customer(
|
|
given_name="Bob",
|
|
surname="Jones",
|
|
email_address="bob@example.com",
|
|
contact_id="test_contact_3",
|
|
)
|
|
test_db_session.add(customer)
|
|
await test_db_session.commit()
|
|
|
|
# Create a reservation
|
|
reservation = Reservation(
|
|
customer_id=customer.id,
|
|
unique_id="res_3",
|
|
hotel_code="test_hotel",
|
|
)
|
|
test_db_session.add(reservation)
|
|
await test_db_session.commit()
|
|
|
|
# Test the matching logic
|
|
service = ConversionService(test_db_session)
|
|
|
|
# Eagerly load reservations
|
|
from sqlalchemy.orm import selectinload
|
|
result = await test_db_session.execute(
|
|
select(Reservation)
|
|
.where(Reservation.id == reservation.id)
|
|
.options(selectinload(Reservation.customer).selectinload(Customer.hashed_version))
|
|
)
|
|
reservations = result.scalars().all()
|
|
|
|
hashed_email = hashlib.sha256(
|
|
"bob@example.com".lower().strip().encode("utf-8")
|
|
).hexdigest()
|
|
|
|
matched = service._match_reservations_by_guest_details(
|
|
reservations,
|
|
guest_first_name=None,
|
|
guest_last_name=None,
|
|
guest_email=hashed_email,
|
|
)
|
|
|
|
# Should not match because customer has no hashed version
|
|
assert matched is None, "Should not match without hashed customer"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_conversion_guest_hashed_fields_are_populated(
|
|
self, test_db_session
|
|
):
|
|
"""Test that ConversionGuest properly stores hashed versions of guest data."""
|
|
# Create a conversion guest
|
|
conversion_guest = ConversionGuest.create_from_conversion_data(
|
|
hotel_id="test_hotel",
|
|
guest_id="guest_123",
|
|
guest_first_name="Margaret",
|
|
guest_last_name="Brown",
|
|
guest_email="margaret@example.com",
|
|
guest_country_code="GB",
|
|
guest_birth_date=None,
|
|
now=None,
|
|
)
|
|
test_db_session.add(conversion_guest)
|
|
await test_db_session.flush()
|
|
|
|
# Verify hashed fields are populated
|
|
assert conversion_guest.hashed_first_name is not None
|
|
assert conversion_guest.hashed_last_name is not None
|
|
assert conversion_guest.hashed_email is not None
|
|
|
|
# Verify hashes are correct (SHA256)
|
|
expected_hashed_first = hashlib.sha256(
|
|
"margaret".lower().strip().encode("utf-8")
|
|
).hexdigest()
|
|
expected_hashed_last = hashlib.sha256(
|
|
"brown".lower().strip().encode("utf-8")
|
|
).hexdigest()
|
|
expected_hashed_email = hashlib.sha256(
|
|
"margaret@example.com".lower().strip().encode("utf-8")
|
|
).hexdigest()
|
|
|
|
assert conversion_guest.hashed_first_name == expected_hashed_first
|
|
assert conversion_guest.hashed_last_name == expected_hashed_last
|
|
assert conversion_guest.hashed_email == expected_hashed_email
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_conversion_records_created_before_matching(
|
|
self, test_db_session, test_config
|
|
):
|
|
"""Test that conversion records exist before matching occurs."""
|
|
# Create customer and reservation for matching
|
|
customer = Customer(
|
|
given_name="David",
|
|
surname="Miller",
|
|
email_address="david@example.com",
|
|
contact_id="test_contact_6",
|
|
)
|
|
test_db_session.add(customer)
|
|
await test_db_session.flush()
|
|
|
|
hashed_customer = customer.create_hashed_customer()
|
|
test_db_session.add(hashed_customer)
|
|
await test_db_session.flush()
|
|
|
|
reservation = Reservation(
|
|
customer_id=customer.id,
|
|
unique_id="res_6",
|
|
hotel_code="hotel_1",
|
|
)
|
|
test_db_session.add(reservation)
|
|
await test_db_session.commit()
|
|
|
|
# Create conversion XML with matching hashed data
|
|
xml_content = f"""<?xml version="1.0"?>
|
|
<root>
|
|
<reservation id="pms_123" hotelID="hotel_1" number="RES001" date="2025-01-15">
|
|
<guest firstName="David" lastName="Miller" email="david@example.com"/>
|
|
<roomReservations>
|
|
<roomReservation roomNumber="101" arrival="2025-01-15" departure="2025-01-17" status="confirmed">
|
|
<dailySales>
|
|
<dailySale date="2025-01-15" revenueTotal="100.00"/>
|
|
</dailySales>
|
|
</roomReservation>
|
|
</roomReservations>
|
|
</reservation>
|
|
</root>"""
|
|
|
|
service = ConversionService(test_db_session)
|
|
stats = await service.process_conversion_xml(xml_content)
|
|
|
|
# Verify conversion was created
|
|
result = await test_db_session.execute(
|
|
select(Conversion).where(Conversion.pms_reservation_id == "pms_123")
|
|
)
|
|
conversion = result.scalar_one_or_none()
|
|
|
|
assert conversion is not None, "Conversion should be created"
|
|
assert conversion.hotel_id == "hotel_1"
|
|
assert conversion.guest_first_name == "David"
|
|
assert conversion.guest_last_name == "Miller"
|
|
assert conversion.guest_email == "david@example.com"
|
|
|
|
# Verify conversion_guest was created
|
|
assert conversion.conversion_guest_id is not None, "ConversionGuest should be created"
|
|
|
|
# Verify conversion_room was created
|
|
room_result = await test_db_session.execute(
|
|
select(ConversionRoom).where(
|
|
ConversionRoom.conversion_id == conversion.id
|
|
)
|
|
)
|
|
rooms = room_result.scalars().all()
|
|
assert len(rooms) > 0, "ConversionRoom should be created"
|
|
|
|
# Verify matching occurred (may or may not have matched depending on data)
|
|
# The important thing is that the records exist
|
|
assert stats["total_reservations"] == 1
|
|
assert stats["total_daily_sales"] == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_hashed_customer_missing_fields_handled_gracefully(
|
|
self, test_db_session
|
|
):
|
|
"""Test that matching handles customers with missing hashed fields gracefully."""
|
|
# Create a customer
|
|
customer = Customer(
|
|
given_name="Eve",
|
|
surname="Taylor",
|
|
email_address="eve@example.com",
|
|
contact_id="test_contact_7",
|
|
)
|
|
test_db_session.add(customer)
|
|
await test_db_session.flush()
|
|
|
|
# Create hashed customer but simulate missing fields by manually setting to None
|
|
hashed_customer = HashedCustomer(
|
|
customer_id=customer.id,
|
|
contact_id="test_contact_7_hashed",
|
|
hashed_email=None, # Simulate missing hashed email
|
|
hashed_given_name=None, # Simulate missing hashed name
|
|
hashed_surname=None,
|
|
)
|
|
test_db_session.add(hashed_customer)
|
|
await test_db_session.flush()
|
|
|
|
# Create reservation
|
|
reservation = Reservation(
|
|
customer_id=customer.id,
|
|
unique_id="res_7",
|
|
hotel_code="test_hotel",
|
|
)
|
|
test_db_session.add(reservation)
|
|
await test_db_session.commit()
|
|
|
|
# Test matching - should not crash even with missing hashed fields
|
|
service = ConversionService(test_db_session)
|
|
|
|
# Eagerly load reservations
|
|
from sqlalchemy.orm import selectinload
|
|
result = await test_db_session.execute(
|
|
select(Reservation)
|
|
.where(Reservation.id == reservation.id)
|
|
.options(selectinload(Reservation.customer).selectinload(Customer.hashed_version))
|
|
)
|
|
reservations = result.scalars().all()
|
|
|
|
hashed_email = hashlib.sha256(
|
|
"eve@example.com".lower().strip().encode("utf-8")
|
|
).hexdigest()
|
|
|
|
matched = service._match_reservations_by_guest_details(
|
|
reservations,
|
|
guest_first_name=None,
|
|
guest_last_name=None,
|
|
guest_email=hashed_email,
|
|
)
|
|
|
|
# Should not match because hashed customer fields are None
|
|
assert matched is None, "Should not match with missing hashed fields"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_duplicate_conversion_guests_with_same_hotel_and_guest_id(
|
|
self, test_db_session
|
|
):
|
|
"""Test handling of duplicate ConversionGuest records with same (hotel_id, guest_id).
|
|
|
|
This test reproduces the production issue where multiple ConversionGuest records
|
|
can be created with the same (hotel_id, guest_id) combination, causing
|
|
scalar_one_or_none() to fail with "Multiple rows were found when one or none was required".
|
|
|
|
This can happen when:
|
|
- Multiple conversions arrive for the same hotel and PMS guest within the same batch
|
|
- The XML is processed multiple times
|
|
- Race conditions in concurrent processing
|
|
"""
|
|
hotel_id = "test_hotel"
|
|
guest_id = "guest_123"
|
|
|
|
# Simulate the production scenario: multiple conversion guests with same (hotel_id, guest_id)
|
|
guest1 = ConversionGuest.create_from_conversion_data(
|
|
hotel_id=hotel_id,
|
|
guest_id=guest_id,
|
|
guest_first_name="John",
|
|
guest_last_name="Doe",
|
|
guest_email="john@example.com",
|
|
guest_country_code="US",
|
|
guest_birth_date=None,
|
|
now=None,
|
|
)
|
|
test_db_session.add(guest1)
|
|
await test_db_session.flush()
|
|
|
|
# Create a second guest with the SAME (hotel_id, guest_id)
|
|
# This should not happen, but can occur in production
|
|
guest2 = ConversionGuest.create_from_conversion_data(
|
|
hotel_id=hotel_id,
|
|
guest_id=guest_id,
|
|
guest_first_name="Jane", # Different first name
|
|
guest_last_name="Doe",
|
|
guest_email="jane@example.com",
|
|
guest_country_code="US",
|
|
guest_birth_date=None,
|
|
now=None,
|
|
)
|
|
test_db_session.add(guest2)
|
|
await test_db_session.commit()
|
|
|
|
# Now try to query for the guest by (hotel_id, guest_id)
|
|
# This should return multiple results
|
|
result = await test_db_session.execute(
|
|
select(ConversionGuest).where(
|
|
(ConversionGuest.hotel_id == hotel_id)
|
|
& (ConversionGuest.guest_id == guest_id)
|
|
)
|
|
)
|
|
guests = result.scalars().all()
|
|
|
|
# Verify we have duplicates (the production bug condition)
|
|
assert len(guests) == 2, "Should have created duplicate conversion guests"
|
|
|
|
# Verify that scalars().first() returns one of the guests (the fixed behavior)
|
|
result2 = await test_db_session.execute(
|
|
select(ConversionGuest).where(
|
|
(ConversionGuest.hotel_id == hotel_id)
|
|
& (ConversionGuest.guest_id == guest_id)
|
|
)
|
|
)
|
|
first_guest = result2.scalars().first()
|
|
assert first_guest is not None, "Should find at least one guest with scalars().first()"
|
|
|
|
# The old code would have raised an error here with scalar_one_or_none()
|
|
# when finding multiple results. Now it's fixed to use .first() instead.
|
|
result3 = await test_db_session.execute(
|
|
select(ConversionGuest).where(
|
|
(ConversionGuest.hotel_id == hotel_id)
|
|
& (ConversionGuest.guest_id == guest_id)
|
|
)
|
|
)
|
|
with pytest.raises(Exception): # MultipleResultsFound from old code path
|
|
result3.scalar_one_or_none()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v"])
|