"""Tests for ConversionService using realistic test data. This test module: 1. Uses the CSV import tests to populate the in-memory database with realistic customer/reservation data 2. Runs the XML conversion import endpoint with conversions_test_data.xml 3. Asserts baseline match counts to detect regressions in matching logic The test data is designed to test realistic matching scenarios: - Matching by advertising campaign data (fbclid/gclid) - Matching by guest name and email using hashed data - Handling unmatched conversions - Processing daily sales revenue data - Testing hashed matching logic and edge cases """ import hashlib from pathlib import Path import pytest import pytest_asyncio from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from alpine_bits_python.conversion_service import ConversionService from alpine_bits_python.csv_import import CSVImporter from alpine_bits_python.db import ( Base, Conversion, ConversionGuest, ConversionRoom, Customer, HashedCustomer, Reservation, ) @pytest_asyncio.fixture async def test_db_engine(): """Create an in-memory SQLite database for testing.""" engine = create_async_engine( "sqlite+aiosqlite:///:memory:", echo=False, ) # Create tables async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) yield engine # Cleanup await engine.dispose() @pytest_asyncio.fixture async def test_db_session(test_db_engine): """Create a test database session.""" async_session = async_sessionmaker( test_db_engine, class_=AsyncSession, expire_on_commit=False, ) async with async_session() as session: yield session @pytest.fixture def test_config(): """Test configuration.""" return { "server": { "codecontext": "ADVERTISING", "code": "70597314", "companyname": "99tales Gmbh", "res_id_source_context": "99tales", }, "alpine_bits_auth": [ { "hotel_id": "39054_001", "hotel_name": "Bemelmans Apartments", "username": "bemelmans_user", "password": "testpass", } ], "default_hotel_code": "39054_001", "default_hotel_name": "Bemelmans Apartments", } @pytest.fixture def test_data_dir(): """Return path to test data directory.""" return Path(__file__).parent / "test_data" class TestConversionServiceWithImportedData: """Test ConversionService using realistic test data imported via CSV.""" @pytest.mark.asyncio async def test_conversion_import_with_csv_test_data( self, test_db_session, test_config, test_data_dir ): """Test full workflow: import CSV data, then process conversions XML. This test demonstrates the intended workflow: 1. Import CSV test data to populate customers and reservations 2. Process conversion XML file to match conversions to reservations 3. Verify match statistics to detect regressions The conversions_test_data.xml file contains realistic conversion data from a hotel PMS system with multiple reservations and daily sales. """ csv_file = test_data_dir / "leads_export.csv" xml_file = test_data_dir / "conversions_test_data.xml" # Skip test if data files don't exist if not csv_file.exists(): pytest.skip(f"Test data file not found: {csv_file}") if not xml_file.exists(): pytest.skip(f"Test data file not found: {xml_file}") # Step 1: Import CSV data to populate database with realistic customers/reservations importer = CSVImporter(test_db_session, test_config) csv_stats = await importer.import_csv_file( csv_file_path=str(csv_file), hotel_code="39054_001", dryrun=False, ) print(f"\nCSV Import Stats: {csv_stats}") assert csv_stats["total_rows"] > 0, "CSV import should have processed rows" assert ( csv_stats["created_reservations"] > 0 ), "CSV import should create reservations" # Step 2: Load and process conversion XML with xml_file.open(encoding="utf-8") as f: xml_content = f.read() # File already has proper XML structure, just use it as-is xml_content = xml_content.strip() ## Need to check if reservations and customers are now actually available in the db before proceeding conversion_service = ConversionService(test_db_session) stats = await conversion_service.process_conversion_xml(xml_content) # BASELINE ASSERTIONS: # These values are established from test runs with conversions_test_data.xml + leads_export.csv. # If these change, it indicates a change in matching logic that needs review. # Update these values only when intentionally changing the matching behavior. # # Current test data contains: # - CSV import: 576 total rows, 535 created reservations, 41 duplicates skipped # - XML conversions: 252 reservations with 2905 daily sales records across 539 room records EXPECTED_TOTAL_RESERVATIONS = 252 EXPECTED_TOTAL_DAILY_SALES = 2905 EXPECTED_TOTAL_ROOMS = 539 # Note: Currently no matches by tracking ID because XML data uses different formats # This is expected with the test data. Real PMS data would have higher match rates. EXPECTED_MATCHED_TO_RESERVATION = 19 print(f"\nBaseline Match Counts:") print(f" Total reservations in XML: {EXPECTED_TOTAL_RESERVATIONS}") print(f" Total daily sales records: {EXPECTED_TOTAL_DAILY_SALES}") print(f" Total conversion room records: {EXPECTED_TOTAL_ROOMS}") print(f" Matched to reservation: {EXPECTED_MATCHED_TO_RESERVATION}") match_rate = (EXPECTED_MATCHED_TO_RESERVATION / EXPECTED_TOTAL_RESERVATIONS * 100) if EXPECTED_TOTAL_RESERVATIONS > 0 else 0 print(f" Match rate: {match_rate:.1f}%") # Verify baseline stability on subsequent runs assert ( stats["total_reservations"] == EXPECTED_TOTAL_RESERVATIONS ), f"Total reservations should be {EXPECTED_TOTAL_RESERVATIONS}, got {stats['total_reservations']}" assert ( stats["total_daily_sales"] == EXPECTED_TOTAL_DAILY_SALES ), f"Total daily sales should be {EXPECTED_TOTAL_DAILY_SALES}, got {stats['total_daily_sales']}" assert ( stats["matched_to_reservation"] == EXPECTED_MATCHED_TO_RESERVATION ), f"Matched reservations should be {EXPECTED_MATCHED_TO_RESERVATION}, got {stats['matched_to_reservation']}" @pytest.mark.asyncio async def test_conversion_room_revenue_aggregation( self, test_db_session, test_config, test_data_dir ): """Test that daily sales revenue is correctly aggregated at room level.""" csv_file = test_data_dir / "leads_export.csv" xml_file = test_data_dir / "conversions_test_data.xml" if not csv_file.exists(): pytest.skip(f"Test data file not found: {csv_file}") if not xml_file.exists(): pytest.skip(f"Test data file not found: {xml_file}") # Import CSV data importer = CSVImporter(test_db_session, test_config) await importer.import_csv_file( csv_file_path=str(csv_file), hotel_code="39054_001", dryrun=False, ) # Process conversions with xml_file.open(encoding="utf-8") as f: xml_content = f.read() # File already has proper XML structure, just use it as-is xml_content = xml_content.strip() conversion_service = ConversionService(test_db_session) stats = await conversion_service.process_conversion_xml(xml_content) # Verify conversions were created from sqlalchemy import select result = await test_db_session.execute(select(ConversionRoom)) all_rooms = result.scalars().all() assert len(all_rooms) > 0, "Should have created conversion rooms" # Verify there are room records even if no revenue is set result = await test_db_session.execute( select(ConversionRoom).where(ConversionRoom.total_revenue.isnot(None)) ) rooms_with_revenue = result.scalars().all() # Note: Test data may not have revenue values in the XML # The important thing is that we're capturing room-level data print(f"\nRevenue Aggregation Stats:") print(f" Total conversion rooms: {len(all_rooms)}") print(f" Rooms with revenue: {len(rooms_with_revenue)}") if rooms_with_revenue: # Verify revenue values are numeric and positive for room in rooms_with_revenue: assert isinstance( room.total_revenue, (int, float) ), f"Revenue should be numeric, got {type(room.total_revenue)}" assert ( room.total_revenue > 0 ), f"Revenue should be positive, got {room.total_revenue}" total_revenue = sum(room.total_revenue for room in rooms_with_revenue) print(f" Total aggregated revenue: {total_revenue}") print(f" Average revenue per room: {total_revenue / len(rooms_with_revenue)}") @pytest.mark.asyncio async def test_conversion_matching_by_guest_details( self, test_db_session, test_config, test_data_dir ): """Test conversion matching by guest name and email fallback. Note: The test data may not have matching guest names/emails between the CSV and XML files. This test primarily verifies that the matching logic runs without errors and that the conversion service attempts to match by guest details when advertising data is unavailable. """ csv_file = test_data_dir / "leads_export.csv" xml_file = test_data_dir / "conversions_test_data.xml" if not csv_file.exists(): pytest.skip(f"Test data file not found: {csv_file}") if not xml_file.exists(): pytest.skip(f"Test data file not found: {xml_file}") # Import CSV data importer = CSVImporter(test_db_session, test_config) csv_stats = await importer.import_csv_file( csv_file_path=str(csv_file), hotel_code="39054_001", dryrun=False, ) assert csv_stats["created_reservations"] > 0, "Should have imported reservations" # Process conversions with xml_file.open(encoding="utf-8") as f: xml_content = f.read() # File already has proper XML structure, just use it as-is xml_content = xml_content.strip() conversion_service = ConversionService(test_db_session) stats = await conversion_service.process_conversion_xml(xml_content) # Verify conversions were processed from sqlalchemy import select result = await test_db_session.execute(select(Conversion)) all_conversions = result.scalars().all() assert len(all_conversions) > 0, "Should have created conversions" # Check for matched conversions result = await test_db_session.execute( select(Conversion).where(Conversion.customer_id.isnot(None)) ) conversions_with_customers = result.scalars().all() print(f"\nGuest Detail Matching:") print(f" Total conversions: {len(all_conversions)}") print(f" Conversions matched to customer: {len(conversions_with_customers)}") print(f" Stats matched_to_customer: {stats['matched_to_customer']}") # With this test data, matches may be 0 if guest names/emails don't align # The important thing is that the matching logic runs without errors print(f" Note: Matches depend on data alignment between CSV and XML files") @pytest.mark.asyncio async def test_conversion_service_error_handling( self, test_db_session, test_config ): """Test ConversionService handles invalid XML gracefully.""" invalid_xml = "unclosed tag" conversion_service = ConversionService(test_db_session) with pytest.raises(ValueError, match="Invalid XML"): await conversion_service.process_conversion_xml(invalid_xml) @pytest.mark.asyncio async def test_conversion_service_empty_xml(self, test_db_session, test_config): """Test ConversionService handles empty/minimal XML.""" minimal_xml = '' conversion_service = ConversionService(test_db_session) stats = await conversion_service.process_conversion_xml(minimal_xml) assert stats["total_reservations"] == 0 assert stats["total_daily_sales"] == 0 assert stats["errors"] == 0 class TestHashedMatchingLogic: """Test the hashed matching logic used in ConversionService.""" @pytest.mark.asyncio async def test_no_match_without_hashed_customer(self, test_db_session): """Test that matching fails gracefully when customer has no hashed version.""" # Create a customer WITHOUT hashed data customer = Customer( given_name="Bob", surname="Jones", email_address="bob@example.com", contact_id="test_contact_3", ) test_db_session.add(customer) await test_db_session.commit() # Create a reservation reservation = Reservation( customer_id=customer.id, unique_id="res_3", hotel_code="test_hotel", ) test_db_session.add(reservation) await test_db_session.commit() # Test the matching logic service = ConversionService(test_db_session) # Eagerly load reservations from sqlalchemy.orm import selectinload result = await test_db_session.execute( select(Reservation) .where(Reservation.id == reservation.id) .options(selectinload(Reservation.customer).selectinload(Customer.hashed_version)) ) reservations = result.scalars().all() hashed_email = hashlib.sha256( "bob@example.com".lower().strip().encode("utf-8") ).hexdigest() matched = service._match_reservations_by_guest_details( reservations, guest_first_name=None, guest_last_name=None, guest_email=hashed_email, ) # Should not match because customer has no hashed version assert matched is None, "Should not match without hashed customer" @pytest.mark.asyncio async def test_conversion_guest_hashed_fields_are_populated( self, test_db_session ): """Test that ConversionGuest properly stores hashed versions of guest data.""" # Create a conversion guest conversion_guest = ConversionGuest.create_from_conversion_data( hotel_id="test_hotel", guest_id="guest_123", guest_first_name="Margaret", guest_last_name="Brown", guest_email="margaret@example.com", guest_country_code="GB", guest_birth_date=None, now=None, ) test_db_session.add(conversion_guest) await test_db_session.flush() # Verify hashed fields are populated assert conversion_guest.hashed_first_name is not None assert conversion_guest.hashed_last_name is not None assert conversion_guest.hashed_email is not None # Verify hashes are correct (SHA256) expected_hashed_first = hashlib.sha256( "margaret".lower().strip().encode("utf-8") ).hexdigest() expected_hashed_last = hashlib.sha256( "brown".lower().strip().encode("utf-8") ).hexdigest() expected_hashed_email = hashlib.sha256( "margaret@example.com".lower().strip().encode("utf-8") ).hexdigest() assert conversion_guest.hashed_first_name == expected_hashed_first assert conversion_guest.hashed_last_name == expected_hashed_last assert conversion_guest.hashed_email == expected_hashed_email @pytest.mark.asyncio async def test_conversion_records_created_before_matching( self, test_db_session, test_config ): """Test that conversion records exist before matching occurs.""" # Create customer and reservation for matching customer = Customer( given_name="David", surname="Miller", email_address="david@example.com", contact_id="test_contact_6", ) test_db_session.add(customer) await test_db_session.flush() hashed_customer = customer.create_hashed_customer() test_db_session.add(hashed_customer) await test_db_session.flush() reservation = Reservation( customer_id=customer.id, unique_id="res_6", hotel_code="hotel_1", ) test_db_session.add(reservation) await test_db_session.commit() # Create conversion XML with matching hashed data xml_content = f""" """ service = ConversionService(test_db_session) stats = await service.process_conversion_xml(xml_content) # Verify conversion was created result = await test_db_session.execute( select(Conversion).where(Conversion.pms_reservation_id == "pms_123") ) conversion = result.scalar_one_or_none() assert conversion is not None, "Conversion should be created" assert conversion.hotel_id == "hotel_1" assert conversion.guest_first_name == "David" assert conversion.guest_last_name == "Miller" assert conversion.guest_email == "david@example.com" # Verify conversion_guest was created assert conversion.conversion_guest_id is not None, "ConversionGuest should be created" # Verify conversion_room was created room_result = await test_db_session.execute( select(ConversionRoom).where( ConversionRoom.conversion_id == conversion.id ) ) rooms = room_result.scalars().all() assert len(rooms) > 0, "ConversionRoom should be created" # Verify matching occurred (may or may not have matched depending on data) # The important thing is that the records exist assert stats["total_reservations"] == 1 assert stats["total_daily_sales"] == 1 @pytest.mark.asyncio async def test_hashed_customer_missing_fields_handled_gracefully( self, test_db_session ): """Test that matching handles customers with missing hashed fields gracefully.""" # Create a customer customer = Customer( given_name="Eve", surname="Taylor", email_address="eve@example.com", contact_id="test_contact_7", ) test_db_session.add(customer) await test_db_session.flush() # Create hashed customer but simulate missing fields by manually setting to None hashed_customer = HashedCustomer( customer_id=customer.id, contact_id="test_contact_7_hashed", hashed_email=None, # Simulate missing hashed email hashed_given_name=None, # Simulate missing hashed name hashed_surname=None, ) test_db_session.add(hashed_customer) await test_db_session.flush() # Create reservation reservation = Reservation( customer_id=customer.id, unique_id="res_7", hotel_code="test_hotel", ) test_db_session.add(reservation) await test_db_session.commit() # Test matching - should not crash even with missing hashed fields service = ConversionService(test_db_session) # Eagerly load reservations from sqlalchemy.orm import selectinload result = await test_db_session.execute( select(Reservation) .where(Reservation.id == reservation.id) .options(selectinload(Reservation.customer).selectinload(Customer.hashed_version)) ) reservations = result.scalars().all() hashed_email = hashlib.sha256( "eve@example.com".lower().strip().encode("utf-8") ).hexdigest() matched = service._match_reservations_by_guest_details( reservations, guest_first_name=None, guest_last_name=None, guest_email=hashed_email, ) # Should not match because hashed customer fields are None assert matched is None, "Should not match with missing hashed fields" @pytest.mark.asyncio async def test_duplicate_conversion_guests_with_same_hotel_and_guest_id( self, test_db_session ): """Test handling of duplicate ConversionGuest records with same (hotel_id, guest_id). This test reproduces the production issue where multiple ConversionGuest records can be created with the same (hotel_id, guest_id) combination, causing scalar_one_or_none() to fail with "Multiple rows were found when one or none was required". This can happen when: - Multiple conversions arrive for the same hotel and PMS guest within the same batch - The XML is processed multiple times - Race conditions in concurrent processing """ hotel_id = "test_hotel" guest_id = "guest_123" # Simulate the production scenario: multiple conversion guests with same (hotel_id, guest_id) guest1 = ConversionGuest.create_from_conversion_data( hotel_id=hotel_id, guest_id=guest_id, guest_first_name="John", guest_last_name="Doe", guest_email="john@example.com", guest_country_code="US", guest_birth_date=None, now=None, ) test_db_session.add(guest1) await test_db_session.flush() # Create a second guest with the SAME (hotel_id, guest_id) # This should not happen, but can occur in production guest2 = ConversionGuest.create_from_conversion_data( hotel_id=hotel_id, guest_id=guest_id, guest_first_name="Jane", # Different first name guest_last_name="Doe", guest_email="jane@example.com", guest_country_code="US", guest_birth_date=None, now=None, ) test_db_session.add(guest2) await test_db_session.commit() # Now try to query for the guest by (hotel_id, guest_id) # This should return multiple results result = await test_db_session.execute( select(ConversionGuest).where( (ConversionGuest.hotel_id == hotel_id) & (ConversionGuest.guest_id == guest_id) ) ) guests = result.scalars().all() # Verify we have duplicates (the production bug condition) assert len(guests) == 2, "Should have created duplicate conversion guests" # Verify that scalars().first() returns one of the guests (the fixed behavior) result2 = await test_db_session.execute( select(ConversionGuest).where( (ConversionGuest.hotel_id == hotel_id) & (ConversionGuest.guest_id == guest_id) ) ) first_guest = result2.scalars().first() assert first_guest is not None, "Should find at least one guest with scalars().first()" # The old code would have raised an error here with scalar_one_or_none() # when finding multiple results. Now it's fixed to use .first() instead. result3 = await test_db_session.execute( select(ConversionGuest).where( (ConversionGuest.hotel_id == hotel_id) & (ConversionGuest.guest_id == guest_id) ) ) with pytest.raises(Exception): # MultipleResultsFound from old code path result3.scalar_one_or_none() if __name__ == "__main__": pytest.main([__file__, "-v"])