Added a test for conversions

This commit is contained in:
Jonas Linter
2025-11-19 11:05:51 +01:00
parent 187c955a67
commit 90f68bfd26
14 changed files with 6629 additions and 2812 deletions

View File

@@ -0,0 +1,321 @@
"""Tests for ConversionService using realistic test data.
This test module:
1. Uses the CSV import tests to populate the in-memory database with realistic customer/reservation data
2. Runs the XML conversion import endpoint with conversions_test_data.xml
3. Asserts baseline match counts to detect regressions in matching logic
The test data is designed to test realistic matching scenarios:
- Matching by advertising campaign data (fbclid/gclid)
- Matching by guest name and email
- Handling unmatched conversions
- Processing daily sales revenue data
"""
from pathlib import Path
import pytest
import pytest_asyncio
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from alpine_bits_python.conversion_service import ConversionService
from alpine_bits_python.csv_import import CSVImporter
from alpine_bits_python.db import Base, Conversion, ConversionRoom
@pytest_asyncio.fixture
async def test_db_engine():
"""Create an in-memory SQLite database for testing."""
engine = create_async_engine(
"sqlite+aiosqlite:///:memory:",
echo=False,
)
# Create tables
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
# Cleanup
await engine.dispose()
@pytest_asyncio.fixture
async def test_db_session(test_db_engine):
"""Create a test database session."""
async_session = async_sessionmaker(
test_db_engine,
class_=AsyncSession,
expire_on_commit=False,
)
async with async_session() as session:
yield session
@pytest.fixture
def test_config():
"""Test configuration."""
return {
"server": {
"codecontext": "ADVERTISING",
"code": "70597314",
"companyname": "99tales Gmbh",
"res_id_source_context": "99tales",
},
"alpine_bits_auth": [
{
"hotel_id": "bemelmans",
"hotel_name": "Bemelmans Apartments",
"username": "bemelmans_user",
"password": "testpass",
}
],
"default_hotel_code": "bemelmans",
"default_hotel_name": "Bemelmans Apartments",
}
@pytest.fixture
def test_data_dir():
"""Return path to test data directory."""
return Path(__file__).parent / "test_data"
class TestConversionServiceWithImportedData:
"""Test ConversionService using realistic test data imported via CSV."""
@pytest.mark.asyncio
async def test_conversion_import_with_csv_test_data(
self, test_db_session, test_config, test_data_dir
):
"""Test full workflow: import CSV data, then process conversions XML.
This test demonstrates the intended workflow:
1. Import CSV test data to populate customers and reservations
2. Process conversion XML file to match conversions to reservations
3. Verify match statistics to detect regressions
The conversions_test_data.xml file contains realistic conversion data
from a hotel PMS system with multiple reservations and daily sales.
"""
csv_file = test_data_dir / "leads_export.csv"
xml_file = test_data_dir / "conversions_test_data.xml"
# Skip test if data files don't exist
if not csv_file.exists():
pytest.skip(f"Test data file not found: {csv_file}")
if not xml_file.exists():
pytest.skip(f"Test data file not found: {xml_file}")
# Step 1: Import CSV data to populate database with realistic customers/reservations
importer = CSVImporter(test_db_session, test_config)
csv_stats = await importer.import_csv_file(
csv_file_path=str(csv_file),
hotel_code="bemelmans",
dryrun=False,
)
print(f"\nCSV Import Stats: {csv_stats}")
assert csv_stats["total_rows"] > 0, "CSV import should have processed rows"
assert (
csv_stats["created_reservations"] > 0
), "CSV import should create reservations"
# Step 2: Load and process conversion XML
with xml_file.open(encoding="utf-8") as f:
xml_content = f.read()
# File already has proper XML structure, just use it as-is
xml_content = xml_content.strip()
conversion_service = ConversionService(test_db_session)
stats = await conversion_service.process_conversion_xml(xml_content)
# BASELINE ASSERTIONS:
# These values are established from test runs with conversions_test_data.xml + leads_export.csv.
# If these change, it indicates a change in matching logic that needs review.
# Update these values only when intentionally changing the matching behavior.
#
# Current test data contains:
# - CSV import: 576 total rows, 535 created reservations, 41 duplicates skipped
# - XML conversions: 252 reservations with 2905 daily sales records across 539 room records
EXPECTED_TOTAL_RESERVATIONS = 252
EXPECTED_TOTAL_DAILY_SALES = 2905
EXPECTED_TOTAL_ROOMS = 539
# Note: Currently no matches by tracking ID because XML data uses different formats
# This is expected with the test data. Real PMS data would have higher match rates.
EXPECTED_MATCHED_TO_RESERVATION = 0
print(f"\nBaseline Match Counts:")
print(f" Total reservations in XML: {EXPECTED_TOTAL_RESERVATIONS}")
print(f" Total daily sales records: {EXPECTED_TOTAL_DAILY_SALES}")
print(f" Total conversion room records: {EXPECTED_TOTAL_ROOMS}")
print(f" Matched to reservation: {EXPECTED_MATCHED_TO_RESERVATION}")
match_rate = (EXPECTED_MATCHED_TO_RESERVATION / EXPECTED_TOTAL_RESERVATIONS * 100) if EXPECTED_TOTAL_RESERVATIONS > 0 else 0
print(f" Match rate: {match_rate:.1f}%")
# Verify baseline stability on subsequent runs
assert (
stats["total_reservations"] == EXPECTED_TOTAL_RESERVATIONS
), f"Total reservations should be {EXPECTED_TOTAL_RESERVATIONS}, got {stats['total_reservations']}"
assert (
stats["total_daily_sales"] == EXPECTED_TOTAL_DAILY_SALES
), f"Total daily sales should be {EXPECTED_TOTAL_DAILY_SALES}, got {stats['total_daily_sales']}"
assert (
stats["matched_to_reservation"] == EXPECTED_MATCHED_TO_RESERVATION
), f"Matched reservations should be {EXPECTED_MATCHED_TO_RESERVATION}, got {stats['matched_to_reservation']}"
@pytest.mark.asyncio
async def test_conversion_room_revenue_aggregation(
self, test_db_session, test_config, test_data_dir
):
"""Test that daily sales revenue is correctly aggregated at room level."""
csv_file = test_data_dir / "leads_export.csv"
xml_file = test_data_dir / "conversions_test_data.xml"
if not csv_file.exists():
pytest.skip(f"Test data file not found: {csv_file}")
if not xml_file.exists():
pytest.skip(f"Test data file not found: {xml_file}")
# Import CSV data
importer = CSVImporter(test_db_session, test_config)
await importer.import_csv_file(
csv_file_path=str(csv_file),
hotel_code="bemelmans",
dryrun=False,
)
# Process conversions
with xml_file.open(encoding="utf-8") as f:
xml_content = f.read()
# File already has proper XML structure, just use it as-is
xml_content = xml_content.strip()
conversion_service = ConversionService(test_db_session)
stats = await conversion_service.process_conversion_xml(xml_content)
# Verify conversions were created
from sqlalchemy import select
result = await test_db_session.execute(select(ConversionRoom))
all_rooms = result.scalars().all()
assert len(all_rooms) > 0, "Should have created conversion rooms"
# Verify there are room records even if no revenue is set
result = await test_db_session.execute(
select(ConversionRoom).where(ConversionRoom.total_revenue.isnot(None))
)
rooms_with_revenue = result.scalars().all()
# Note: Test data may not have revenue values in the XML
# The important thing is that we're capturing room-level data
print(f"\nRevenue Aggregation Stats:")
print(f" Total conversion rooms: {len(all_rooms)}")
print(f" Rooms with revenue: {len(rooms_with_revenue)}")
if rooms_with_revenue:
# Verify revenue values are numeric and positive
for room in rooms_with_revenue:
assert isinstance(
room.total_revenue, (int, float)
), f"Revenue should be numeric, got {type(room.total_revenue)}"
assert (
room.total_revenue > 0
), f"Revenue should be positive, got {room.total_revenue}"
total_revenue = sum(room.total_revenue for room in rooms_with_revenue)
print(f" Total aggregated revenue: {total_revenue}")
print(f" Average revenue per room: {total_revenue / len(rooms_with_revenue)}")
@pytest.mark.asyncio
async def test_conversion_matching_by_guest_details(
self, test_db_session, test_config, test_data_dir
):
"""Test conversion matching by guest name and email fallback.
Note: The test data may not have matching guest names/emails between
the CSV and XML files. This test primarily verifies that the matching
logic runs without errors and that the conversion service attempts to
match by guest details when advertising data is unavailable.
"""
csv_file = test_data_dir / "leads_export.csv"
xml_file = test_data_dir / "conversions_test_data.xml"
if not csv_file.exists():
pytest.skip(f"Test data file not found: {csv_file}")
if not xml_file.exists():
pytest.skip(f"Test data file not found: {xml_file}")
# Import CSV data
importer = CSVImporter(test_db_session, test_config)
csv_stats = await importer.import_csv_file(
csv_file_path=str(csv_file),
hotel_code="bemelmans",
dryrun=False,
)
assert csv_stats["created_reservations"] > 0, "Should have imported reservations"
# Process conversions
with xml_file.open(encoding="utf-8") as f:
xml_content = f.read()
# File already has proper XML structure, just use it as-is
xml_content = xml_content.strip()
conversion_service = ConversionService(test_db_session)
stats = await conversion_service.process_conversion_xml(xml_content)
# Verify conversions were processed
from sqlalchemy import select
result = await test_db_session.execute(select(Conversion))
all_conversions = result.scalars().all()
assert len(all_conversions) > 0, "Should have created conversions"
# Check for matched conversions
result = await test_db_session.execute(
select(Conversion).where(Conversion.customer_id.isnot(None))
)
conversions_with_customers = result.scalars().all()
print(f"\nGuest Detail Matching:")
print(f" Total conversions: {len(all_conversions)}")
print(f" Conversions matched to customer: {len(conversions_with_customers)}")
print(f" Stats matched_to_customer: {stats['matched_to_customer']}")
# With this test data, matches may be 0 if guest names/emails don't align
# The important thing is that the matching logic runs without errors
print(f" Note: Matches depend on data alignment between CSV and XML files")
@pytest.mark.asyncio
async def test_conversion_service_error_handling(
self, test_db_session, test_config
):
"""Test ConversionService handles invalid XML gracefully."""
invalid_xml = "<invalid>unclosed tag"
conversion_service = ConversionService(test_db_session)
with pytest.raises(ValueError, match="Invalid XML"):
await conversion_service.process_conversion_xml(invalid_xml)
@pytest.mark.asyncio
async def test_conversion_service_empty_xml(self, test_db_session, test_config):
"""Test ConversionService handles empty/minimal XML."""
minimal_xml = '<?xml version="1.0"?><root></root>'
conversion_service = ConversionService(test_db_session)
stats = await conversion_service.process_conversion_xml(minimal_xml)
assert stats["total_reservations"] == 0
assert stats["total_daily_sales"] == 0
assert stats["errors"] == 0
if __name__ == "__main__":
pytest.main([__file__, "-v"])