Added a test for conversions
This commit is contained in:
321
tests/test_conversion_service.py
Normal file
321
tests/test_conversion_service.py
Normal file
@@ -0,0 +1,321 @@
|
||||
"""Tests for ConversionService using realistic test data.
|
||||
|
||||
This test module:
|
||||
1. Uses the CSV import tests to populate the in-memory database with realistic customer/reservation data
|
||||
2. Runs the XML conversion import endpoint with conversions_test_data.xml
|
||||
3. Asserts baseline match counts to detect regressions in matching logic
|
||||
|
||||
The test data is designed to test realistic matching scenarios:
|
||||
- Matching by advertising campaign data (fbclid/gclid)
|
||||
- Matching by guest name and email
|
||||
- Handling unmatched conversions
|
||||
- Processing daily sales revenue data
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
||||
|
||||
from alpine_bits_python.conversion_service import ConversionService
|
||||
from alpine_bits_python.csv_import import CSVImporter
|
||||
from alpine_bits_python.db import Base, Conversion, ConversionRoom
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def test_db_engine():
|
||||
"""Create an in-memory SQLite database for testing."""
|
||||
engine = create_async_engine(
|
||||
"sqlite+aiosqlite:///:memory:",
|
||||
echo=False,
|
||||
)
|
||||
|
||||
# Create tables
|
||||
async with engine.begin() as conn:
|
||||
await conn.run_sync(Base.metadata.create_all)
|
||||
|
||||
yield engine
|
||||
|
||||
# Cleanup
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def test_db_session(test_db_engine):
|
||||
"""Create a test database session."""
|
||||
async_session = async_sessionmaker(
|
||||
test_db_engine,
|
||||
class_=AsyncSession,
|
||||
expire_on_commit=False,
|
||||
)
|
||||
|
||||
async with async_session() as session:
|
||||
yield session
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_config():
|
||||
"""Test configuration."""
|
||||
return {
|
||||
"server": {
|
||||
"codecontext": "ADVERTISING",
|
||||
"code": "70597314",
|
||||
"companyname": "99tales Gmbh",
|
||||
"res_id_source_context": "99tales",
|
||||
},
|
||||
"alpine_bits_auth": [
|
||||
{
|
||||
"hotel_id": "bemelmans",
|
||||
"hotel_name": "Bemelmans Apartments",
|
||||
"username": "bemelmans_user",
|
||||
"password": "testpass",
|
||||
}
|
||||
],
|
||||
"default_hotel_code": "bemelmans",
|
||||
"default_hotel_name": "Bemelmans Apartments",
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_data_dir():
|
||||
"""Return path to test data directory."""
|
||||
return Path(__file__).parent / "test_data"
|
||||
|
||||
|
||||
class TestConversionServiceWithImportedData:
|
||||
"""Test ConversionService using realistic test data imported via CSV."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_conversion_import_with_csv_test_data(
|
||||
self, test_db_session, test_config, test_data_dir
|
||||
):
|
||||
"""Test full workflow: import CSV data, then process conversions XML.
|
||||
|
||||
This test demonstrates the intended workflow:
|
||||
1. Import CSV test data to populate customers and reservations
|
||||
2. Process conversion XML file to match conversions to reservations
|
||||
3. Verify match statistics to detect regressions
|
||||
|
||||
The conversions_test_data.xml file contains realistic conversion data
|
||||
from a hotel PMS system with multiple reservations and daily sales.
|
||||
"""
|
||||
csv_file = test_data_dir / "leads_export.csv"
|
||||
xml_file = test_data_dir / "conversions_test_data.xml"
|
||||
|
||||
# Skip test if data files don't exist
|
||||
if not csv_file.exists():
|
||||
pytest.skip(f"Test data file not found: {csv_file}")
|
||||
if not xml_file.exists():
|
||||
pytest.skip(f"Test data file not found: {xml_file}")
|
||||
|
||||
# Step 1: Import CSV data to populate database with realistic customers/reservations
|
||||
importer = CSVImporter(test_db_session, test_config)
|
||||
csv_stats = await importer.import_csv_file(
|
||||
csv_file_path=str(csv_file),
|
||||
hotel_code="bemelmans",
|
||||
dryrun=False,
|
||||
)
|
||||
|
||||
print(f"\nCSV Import Stats: {csv_stats}")
|
||||
assert csv_stats["total_rows"] > 0, "CSV import should have processed rows"
|
||||
assert (
|
||||
csv_stats["created_reservations"] > 0
|
||||
), "CSV import should create reservations"
|
||||
|
||||
# Step 2: Load and process conversion XML
|
||||
with xml_file.open(encoding="utf-8") as f:
|
||||
xml_content = f.read()
|
||||
|
||||
# File already has proper XML structure, just use it as-is
|
||||
xml_content = xml_content.strip()
|
||||
|
||||
conversion_service = ConversionService(test_db_session)
|
||||
stats = await conversion_service.process_conversion_xml(xml_content)
|
||||
|
||||
# BASELINE ASSERTIONS:
|
||||
# These values are established from test runs with conversions_test_data.xml + leads_export.csv.
|
||||
# If these change, it indicates a change in matching logic that needs review.
|
||||
# Update these values only when intentionally changing the matching behavior.
|
||||
#
|
||||
# Current test data contains:
|
||||
# - CSV import: 576 total rows, 535 created reservations, 41 duplicates skipped
|
||||
# - XML conversions: 252 reservations with 2905 daily sales records across 539 room records
|
||||
EXPECTED_TOTAL_RESERVATIONS = 252
|
||||
EXPECTED_TOTAL_DAILY_SALES = 2905
|
||||
EXPECTED_TOTAL_ROOMS = 539
|
||||
# Note: Currently no matches by tracking ID because XML data uses different formats
|
||||
# This is expected with the test data. Real PMS data would have higher match rates.
|
||||
EXPECTED_MATCHED_TO_RESERVATION = 0
|
||||
|
||||
print(f"\nBaseline Match Counts:")
|
||||
print(f" Total reservations in XML: {EXPECTED_TOTAL_RESERVATIONS}")
|
||||
print(f" Total daily sales records: {EXPECTED_TOTAL_DAILY_SALES}")
|
||||
print(f" Total conversion room records: {EXPECTED_TOTAL_ROOMS}")
|
||||
print(f" Matched to reservation: {EXPECTED_MATCHED_TO_RESERVATION}")
|
||||
match_rate = (EXPECTED_MATCHED_TO_RESERVATION / EXPECTED_TOTAL_RESERVATIONS * 100) if EXPECTED_TOTAL_RESERVATIONS > 0 else 0
|
||||
print(f" Match rate: {match_rate:.1f}%")
|
||||
|
||||
# Verify baseline stability on subsequent runs
|
||||
assert (
|
||||
stats["total_reservations"] == EXPECTED_TOTAL_RESERVATIONS
|
||||
), f"Total reservations should be {EXPECTED_TOTAL_RESERVATIONS}, got {stats['total_reservations']}"
|
||||
assert (
|
||||
stats["total_daily_sales"] == EXPECTED_TOTAL_DAILY_SALES
|
||||
), f"Total daily sales should be {EXPECTED_TOTAL_DAILY_SALES}, got {stats['total_daily_sales']}"
|
||||
assert (
|
||||
stats["matched_to_reservation"] == EXPECTED_MATCHED_TO_RESERVATION
|
||||
), f"Matched reservations should be {EXPECTED_MATCHED_TO_RESERVATION}, got {stats['matched_to_reservation']}"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_conversion_room_revenue_aggregation(
|
||||
self, test_db_session, test_config, test_data_dir
|
||||
):
|
||||
"""Test that daily sales revenue is correctly aggregated at room level."""
|
||||
csv_file = test_data_dir / "leads_export.csv"
|
||||
xml_file = test_data_dir / "conversions_test_data.xml"
|
||||
|
||||
if not csv_file.exists():
|
||||
pytest.skip(f"Test data file not found: {csv_file}")
|
||||
if not xml_file.exists():
|
||||
pytest.skip(f"Test data file not found: {xml_file}")
|
||||
|
||||
# Import CSV data
|
||||
importer = CSVImporter(test_db_session, test_config)
|
||||
await importer.import_csv_file(
|
||||
csv_file_path=str(csv_file),
|
||||
hotel_code="bemelmans",
|
||||
dryrun=False,
|
||||
)
|
||||
|
||||
# Process conversions
|
||||
with xml_file.open(encoding="utf-8") as f:
|
||||
xml_content = f.read()
|
||||
|
||||
# File already has proper XML structure, just use it as-is
|
||||
xml_content = xml_content.strip()
|
||||
|
||||
conversion_service = ConversionService(test_db_session)
|
||||
stats = await conversion_service.process_conversion_xml(xml_content)
|
||||
|
||||
# Verify conversions were created
|
||||
from sqlalchemy import select
|
||||
|
||||
result = await test_db_session.execute(select(ConversionRoom))
|
||||
all_rooms = result.scalars().all()
|
||||
assert len(all_rooms) > 0, "Should have created conversion rooms"
|
||||
|
||||
# Verify there are room records even if no revenue is set
|
||||
result = await test_db_session.execute(
|
||||
select(ConversionRoom).where(ConversionRoom.total_revenue.isnot(None))
|
||||
)
|
||||
rooms_with_revenue = result.scalars().all()
|
||||
|
||||
# Note: Test data may not have revenue values in the XML
|
||||
# The important thing is that we're capturing room-level data
|
||||
print(f"\nRevenue Aggregation Stats:")
|
||||
print(f" Total conversion rooms: {len(all_rooms)}")
|
||||
print(f" Rooms with revenue: {len(rooms_with_revenue)}")
|
||||
|
||||
if rooms_with_revenue:
|
||||
# Verify revenue values are numeric and positive
|
||||
for room in rooms_with_revenue:
|
||||
assert isinstance(
|
||||
room.total_revenue, (int, float)
|
||||
), f"Revenue should be numeric, got {type(room.total_revenue)}"
|
||||
assert (
|
||||
room.total_revenue > 0
|
||||
), f"Revenue should be positive, got {room.total_revenue}"
|
||||
|
||||
total_revenue = sum(room.total_revenue for room in rooms_with_revenue)
|
||||
print(f" Total aggregated revenue: {total_revenue}")
|
||||
print(f" Average revenue per room: {total_revenue / len(rooms_with_revenue)}")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_conversion_matching_by_guest_details(
|
||||
self, test_db_session, test_config, test_data_dir
|
||||
):
|
||||
"""Test conversion matching by guest name and email fallback.
|
||||
|
||||
Note: The test data may not have matching guest names/emails between
|
||||
the CSV and XML files. This test primarily verifies that the matching
|
||||
logic runs without errors and that the conversion service attempts to
|
||||
match by guest details when advertising data is unavailable.
|
||||
"""
|
||||
csv_file = test_data_dir / "leads_export.csv"
|
||||
xml_file = test_data_dir / "conversions_test_data.xml"
|
||||
|
||||
if not csv_file.exists():
|
||||
pytest.skip(f"Test data file not found: {csv_file}")
|
||||
if not xml_file.exists():
|
||||
pytest.skip(f"Test data file not found: {xml_file}")
|
||||
|
||||
# Import CSV data
|
||||
importer = CSVImporter(test_db_session, test_config)
|
||||
csv_stats = await importer.import_csv_file(
|
||||
csv_file_path=str(csv_file),
|
||||
hotel_code="bemelmans",
|
||||
dryrun=False,
|
||||
)
|
||||
|
||||
assert csv_stats["created_reservations"] > 0, "Should have imported reservations"
|
||||
|
||||
# Process conversions
|
||||
with xml_file.open(encoding="utf-8") as f:
|
||||
xml_content = f.read()
|
||||
|
||||
# File already has proper XML structure, just use it as-is
|
||||
xml_content = xml_content.strip()
|
||||
|
||||
conversion_service = ConversionService(test_db_session)
|
||||
stats = await conversion_service.process_conversion_xml(xml_content)
|
||||
|
||||
# Verify conversions were processed
|
||||
from sqlalchemy import select
|
||||
|
||||
result = await test_db_session.execute(select(Conversion))
|
||||
all_conversions = result.scalars().all()
|
||||
assert len(all_conversions) > 0, "Should have created conversions"
|
||||
|
||||
# Check for matched conversions
|
||||
result = await test_db_session.execute(
|
||||
select(Conversion).where(Conversion.customer_id.isnot(None))
|
||||
)
|
||||
conversions_with_customers = result.scalars().all()
|
||||
|
||||
print(f"\nGuest Detail Matching:")
|
||||
print(f" Total conversions: {len(all_conversions)}")
|
||||
print(f" Conversions matched to customer: {len(conversions_with_customers)}")
|
||||
print(f" Stats matched_to_customer: {stats['matched_to_customer']}")
|
||||
|
||||
# With this test data, matches may be 0 if guest names/emails don't align
|
||||
# The important thing is that the matching logic runs without errors
|
||||
print(f" Note: Matches depend on data alignment between CSV and XML files")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_conversion_service_error_handling(
|
||||
self, test_db_session, test_config
|
||||
):
|
||||
"""Test ConversionService handles invalid XML gracefully."""
|
||||
invalid_xml = "<invalid>unclosed tag"
|
||||
|
||||
conversion_service = ConversionService(test_db_session)
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid XML"):
|
||||
await conversion_service.process_conversion_xml(invalid_xml)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_conversion_service_empty_xml(self, test_db_session, test_config):
|
||||
"""Test ConversionService handles empty/minimal XML."""
|
||||
minimal_xml = '<?xml version="1.0"?><root></root>'
|
||||
|
||||
conversion_service = ConversionService(test_db_session)
|
||||
stats = await conversion_service.process_conversion_xml(minimal_xml)
|
||||
|
||||
assert stats["total_reservations"] == 0
|
||||
assert stats["total_daily_sales"] == 0
|
||||
assert stats["errors"] == 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
6308
tests/test_data/conversions_test_data.xml
Normal file
6308
tests/test_data/conversions_test_data.xml
Normal file
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user