991 lines
36 KiB
Python
991 lines
36 KiB
Python
"""Tests for API endpoints using FastAPI TestClient.
|
|
|
|
This module tests all FastAPI endpoints including:
|
|
- Health check endpoints
|
|
- Wix webhook endpoints
|
|
- AlpineBits server endpoint
|
|
- XML upload endpoint
|
|
- Authentication
|
|
- Rate limiting
|
|
"""
|
|
|
|
import base64
|
|
import gzip
|
|
import json
|
|
import uuid
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
from alembic import command
|
|
from alembic.config import Config
|
|
from fastapi.testclient import TestClient
|
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
|
|
|
from alpine_bits_python.api import app
|
|
from alpine_bits_python.const import HttpStatusCode
|
|
from alpine_bits_python.db import Base, Customer, Reservation
|
|
|
|
|
|
def run_alembic_migrations(connection):
|
|
"""Run Alembic migrations on a SQLAlchemy connection.
|
|
|
|
This is used in tests to set up the database schema using migrations
|
|
instead of Base.metadata.create_all().
|
|
"""
|
|
# Get path to alembic.ini
|
|
project_root = Path(__file__).parent.parent
|
|
alembic_ini_path = project_root / "alembic.ini"
|
|
|
|
# Create Alembic config
|
|
alembic_cfg = Config(str(alembic_ini_path))
|
|
|
|
# Override the database URL to use the test connection
|
|
# For SQLite, we can't use the in-memory connection URL directly,
|
|
# so we'll use Base.metadata.create_all() for SQLite tests
|
|
# This is a limitation of Alembic with SQLite in-memory databases
|
|
Base.metadata.create_all(bind=connection)
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def test_db_engine():
|
|
"""Create an in-memory SQLite database for testing."""
|
|
engine = create_async_engine(
|
|
"sqlite+aiosqlite:///:memory:",
|
|
echo=False,
|
|
)
|
|
|
|
# Create tables using Base.metadata.create_all for SQLite tests
|
|
# (Alembic doesn't work well with SQLite in-memory databases)
|
|
async with engine.begin() as conn:
|
|
await conn.run_sync(Base.metadata.create_all)
|
|
|
|
yield engine
|
|
|
|
# Cleanup
|
|
await engine.dispose()
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def test_db_session(test_db_engine):
|
|
"""Create a test database session."""
|
|
async_session = async_sessionmaker(
|
|
test_db_engine,
|
|
class_=AsyncSession,
|
|
expire_on_commit=False,
|
|
)
|
|
|
|
async with async_session() as session:
|
|
yield session
|
|
|
|
|
|
@pytest.fixture
|
|
def test_config():
|
|
"""Test configuration."""
|
|
return {
|
|
"server": {
|
|
"codecontext": "ADVERTISING",
|
|
"code": "70597314",
|
|
"companyname": "99tales Gmbh",
|
|
"res_id_source_context": "99tales",
|
|
},
|
|
"alpine_bits_auth": [
|
|
{
|
|
"hotel_id": "HOTEL123",
|
|
"hotel_name": "Test Hotel",
|
|
"username": "testuser",
|
|
"password": "testpass",
|
|
},
|
|
{
|
|
"hotel_id": "135",
|
|
"hotel_name": "Another Hotel",
|
|
"username": "anotheruser",
|
|
"password": "anotherpass",
|
|
}
|
|
],
|
|
"default_hotel_code": "HOTEL123",
|
|
"default_hotel_name": "Test Hotel",
|
|
"database": {"url": "sqlite+aiosqlite:///:memory:"},
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
def client(test_config):
|
|
"""Create a test client with mocked dependencies.
|
|
|
|
Each test gets a fresh TestClient instance to avoid database conflicts.
|
|
Mocks load_config to return test_config instead of production config.
|
|
"""
|
|
import asyncio # noqa: PLC0415
|
|
|
|
# Import locally to avoid circular imports
|
|
from alpine_bits_python.alpinebits_server import AlpineBitsServer # noqa: PLC0415
|
|
|
|
# Create a new in-memory database for each test
|
|
engine = create_async_engine(
|
|
"sqlite+aiosqlite:///:memory:",
|
|
echo=False,
|
|
)
|
|
|
|
# Create tables before TestClient starts (which triggers lifespan)
|
|
# This ensures tables exist when run_startup_tasks() runs
|
|
async def create_tables():
|
|
async with engine.begin() as conn:
|
|
await conn.run_sync(Base.metadata.create_all)
|
|
|
|
asyncio.run(create_tables())
|
|
|
|
# Mock both load_config and create_database_engine
|
|
# This ensures the lifespan uses our test database instead of creating a new one
|
|
with patch("alpine_bits_python.api.load_config", return_value=test_config), \
|
|
patch("alpine_bits_python.api.create_database_engine", return_value=engine):
|
|
# Setup app state (will be overridden by lifespan but we set it anyway)
|
|
app.state.engine = engine
|
|
app.state.async_sessionmaker = async_sessionmaker(
|
|
engine, expire_on_commit=False
|
|
)
|
|
app.state.config = test_config
|
|
app.state.alpine_bits_server = AlpineBitsServer(test_config)
|
|
|
|
# TestClient will trigger lifespan events
|
|
# The mocked load_config will ensure test_config is used
|
|
# The mocked create_database_engine will ensure our test database is used
|
|
with TestClient(app) as test_client:
|
|
yield test_client
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_wix_form_data():
|
|
"""Sample Wix form submission data.
|
|
|
|
Each call generates unique IDs to avoid database conflicts.
|
|
"""
|
|
unique_id = uuid.uuid4().hex[:8]
|
|
return {
|
|
"data": {
|
|
"submissionId": f"test-submission-{unique_id}",
|
|
"submissionTime": "2025-10-07T05:48:41.855Z",
|
|
"contact": {
|
|
"name": {"first": "John", "last": "Doe"},
|
|
"email": f"john.doe.{unique_id}@example.com",
|
|
"phones": [{"e164Phone": "+1234567890"}],
|
|
"locale": "en-US",
|
|
"contactId": f"contact-{unique_id}",
|
|
},
|
|
"field:anrede": "Mr.",
|
|
"field:form_field_5a7b": True,
|
|
"field:date_picker_a7c8": "2024-12-25",
|
|
"field:date_picker_7e65": "2024-12-31",
|
|
"field:number_7cf5": "2",
|
|
"field:anzahl_kinder": "1",
|
|
"field:alter_kind_1": "8",
|
|
"field:angebot_auswaehlen": "Christmas Special",
|
|
"field:utm_source": "google",
|
|
"field:utm_medium": "cpc",
|
|
"field:utm_campaign": "winter2024",
|
|
"field:fbclid": "test_fbclid_123",
|
|
"field:long_answer_3524": "Late check-in please",
|
|
}
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
def basic_auth_headers():
|
|
"""Create Basic Auth headers for testing."""
|
|
credentials = base64.b64encode(b"testuser:testpass").decode("utf-8")
|
|
return {"Authorization": f"Basic {credentials}"}
|
|
|
|
|
|
class TestHealthEndpoints:
|
|
"""Test health check and root endpoints."""
|
|
|
|
def test_root_endpoint(self, client):
|
|
"""Test GET / returns health status."""
|
|
response = client.get("/api/")
|
|
assert response.status_code == HttpStatusCode.OK
|
|
data = response.json()
|
|
assert data["message"] == "Wix Form Handler API is running"
|
|
assert "timestamp" in data
|
|
assert data["status"] == "healthy"
|
|
assert "rate_limits" in data
|
|
|
|
def test_health_check_endpoint(self, client):
|
|
"""Test GET /api/health returns healthy status."""
|
|
response = client.get("/api/health")
|
|
assert response.status_code == HttpStatusCode.OK
|
|
data = response.json()
|
|
assert data["status"] == "healthy"
|
|
assert data["service"] == "wix-form-handler"
|
|
assert data["version"] == "1.0.0"
|
|
assert "timestamp" in data
|
|
|
|
def test_landing_page(self, client):
|
|
"""Test GET / (landing page) returns HTML."""
|
|
response = client.get("/")
|
|
assert response.status_code == HttpStatusCode.OK
|
|
assert "text/html" in response.headers["content-type"]
|
|
assert "99tales" in response.text or "Construction" in response.text
|
|
|
|
|
|
class TestWixWebhookEndpoint:
|
|
"""Test Wix form webhook endpoint."""
|
|
|
|
def test_wix_webhook_success(self, client, sample_wix_form_data):
|
|
"""Test successful Wix form submission."""
|
|
response = client.post("/api/webhook/wix-form", json=sample_wix_form_data)
|
|
|
|
assert response.status_code == HttpStatusCode.OK
|
|
data = response.json()
|
|
assert data["status"] == "success"
|
|
assert "timestamp" in data
|
|
|
|
def test_wix_webhook_creates_customer_and_reservation(
|
|
self, client, sample_wix_form_data
|
|
):
|
|
"""Test that webhook creates customer and reservation in database."""
|
|
response = client.post("/api/webhook/wix-form", json=sample_wix_form_data)
|
|
assert response.status_code == HttpStatusCode.OK
|
|
|
|
# Verify data was saved to database
|
|
# Use the client's app state engine, not a separate test_db_engine
|
|
async def check_db():
|
|
engine = client.app.state.engine
|
|
async_session = async_sessionmaker(engine, expire_on_commit=False)
|
|
async with async_session() as session:
|
|
from sqlalchemy import select
|
|
|
|
# Check customer was created
|
|
result = await session.execute(select(Customer))
|
|
customers = result.scalars().all()
|
|
assert len(customers) == 1
|
|
customer = customers[0]
|
|
assert customer.given_name == "John"
|
|
assert customer.surname == "Doe"
|
|
# Email address in sample_wix_form_data has unique ID appended
|
|
assert customer.email_address.startswith("john.doe.")
|
|
assert "@example.com" in customer.email_address
|
|
|
|
# Check reservation was created
|
|
result = await session.execute(select(Reservation))
|
|
reservations = result.scalars().all()
|
|
assert len(reservations) == 1
|
|
reservation = reservations[0]
|
|
assert reservation.customer_id == customer.id
|
|
assert reservation.num_adults == 2
|
|
assert reservation.num_children == 1
|
|
|
|
import asyncio
|
|
|
|
asyncio.run(check_db())
|
|
|
|
def test_wix_webhook_minimal_data(self, client):
|
|
"""Test webhook with minimal required data."""
|
|
minimal_data = {
|
|
"data": {
|
|
"submissionId": "minimal-123",
|
|
"submissionTime": "2025-01-10T12:00:00.000Z",
|
|
"contact": {
|
|
"name": {"first": "Jane", "last": "Smith"},
|
|
"email": "jane@example.com",
|
|
},
|
|
"field:date_picker_a7c8": "2025-01-15",
|
|
"field:date_picker_7e65": "2025-01-20",
|
|
}
|
|
}
|
|
|
|
response = client.post("/api/webhook/wix-form", json=minimal_data)
|
|
assert response.status_code == HttpStatusCode.OK
|
|
data = response.json()
|
|
assert data["status"] == "success"
|
|
|
|
def test_wix_webhook_test_endpoint(self, client, sample_wix_form_data):
|
|
"""Test the test endpoint works identically."""
|
|
response = client.post("/api/webhook/wix-form", json=sample_wix_form_data)
|
|
assert response.status_code == HttpStatusCode.OK
|
|
data = response.json()
|
|
assert data["status"] == "success"
|
|
|
|
@pytest.mark.parametrize(
|
|
"test_form_file",
|
|
[
|
|
Path(__file__).parent / "test_data" / f"test_form{i}.json"
|
|
for i in range(1, 6)
|
|
],
|
|
ids=lambda p: p.name,
|
|
)
|
|
def test_wix_webhook_test_endpoint_with_test_forms(self, client, test_form_file):
|
|
"""Test the test endpoint works with all test form data files."""
|
|
# Skip if file doesn't exist
|
|
if not test_form_file.exists():
|
|
pytest.skip(f"{test_form_file.name} not found")
|
|
|
|
# Load test form data
|
|
with test_form_file.open() as f:
|
|
form_data = json.load(f)
|
|
|
|
response = client.post("/api/webhook/wix-form", json=form_data)
|
|
assert response.status_code == HttpStatusCode.OK
|
|
data = response.json()
|
|
assert data["status"] == "success"
|
|
|
|
def test_wix_webhook_updates_existing_customer(self, client):
|
|
"""Test that same contact_id updates customer instead of duplicate."""
|
|
# First submission
|
|
first_submission = {
|
|
"data": {
|
|
"submissionId": "test-submission-001",
|
|
"submissionTime": "2025-10-07T05:48:41.855Z",
|
|
"contact": {
|
|
"name": {"first": "John", "last": "Doe"},
|
|
"email": "john.doe@example.com",
|
|
"phones": [{"e164Phone": "+1234567890"}],
|
|
"locale": "en-US",
|
|
"contactId": "fixed-contact-id-123",
|
|
},
|
|
"field:anrede": "Mr.",
|
|
"field:date_picker_a7c8": "2024-12-25",
|
|
"field:date_picker_7e65": "2024-12-31",
|
|
"field:number_7cf5": "2",
|
|
"field:anzahl_kinder": "0",
|
|
}
|
|
}
|
|
|
|
response = client.post("/api/webhook/wix-form", json=first_submission)
|
|
assert response.status_code == HttpStatusCode.OK
|
|
|
|
# Second submission with same contact_id but different data
|
|
second_submission = {
|
|
"data": {
|
|
"submissionId": "test-submission-002",
|
|
"submissionTime": "2025-10-08T10:30:00.000Z",
|
|
"contact": {
|
|
"name": {"first": "John", "last": "Smith"}, # Changed last name
|
|
"email": "john.smith@example.com", # Changed email
|
|
"phones": [{"e164Phone": "+9876543210"}], # Changed phone
|
|
"locale": "de-DE", # Changed locale
|
|
"contactId": "fixed-contact-id-123", # Same contact_id
|
|
},
|
|
"field:anrede": "Dr.", # Changed prefix
|
|
"field:date_picker_a7c8": "2025-01-10",
|
|
"field:date_picker_7e65": "2025-01-15",
|
|
"field:number_7cf5": "4",
|
|
"field:anzahl_kinder": "2",
|
|
"field:alter_kind_1": "5",
|
|
"field:alter_kind_2": "10",
|
|
}
|
|
}
|
|
|
|
response = client.post("/api/webhook/wix-form", json=second_submission)
|
|
assert response.status_code == HttpStatusCode.OK
|
|
|
|
# Verify only one customer exists with updated information
|
|
async def check_db():
|
|
from sqlalchemy import select # noqa: PLC0415
|
|
|
|
engine = client.app.state.engine
|
|
async_session = async_sessionmaker(engine, expire_on_commit=False)
|
|
async with async_session() as session:
|
|
# Check only one customer exists
|
|
result = await session.execute(select(Customer))
|
|
customers = result.scalars().all()
|
|
assert len(customers) == 1, "Should have exactly one customer"
|
|
|
|
customer = customers[0]
|
|
# Verify customer was updated with new information
|
|
assert customer.given_name == "John"
|
|
assert customer.surname == "Smith", "Last name updated"
|
|
assert customer.email_address == "john.smith@example.com", (
|
|
"Email updated"
|
|
)
|
|
assert customer.phone == "+9876543210", "Phone updated"
|
|
assert customer.name_prefix == "Dr.", "Prefix updated"
|
|
assert customer.language == "de", "Language updated"
|
|
assert customer.contact_id == "fixed-contact-id-123"
|
|
|
|
# Check both reservations were created
|
|
result = await session.execute(select(Reservation))
|
|
reservations = result.scalars().all()
|
|
expected_reservations = 2
|
|
assert len(reservations) == expected_reservations
|
|
# Both reservations should be linked to the same customer
|
|
assert all(r.customer_id == customer.id for r in reservations)
|
|
|
|
import asyncio # noqa: PLC0415
|
|
|
|
asyncio.run(check_db())
|
|
|
|
|
|
class TestGenericWebhookEndpoint:
|
|
"""Test generic webhook endpoint."""
|
|
|
|
def test_generic_webhook_success_with_real_data(self, client):
|
|
"""Test successful generic webhook submission with real form data."""
|
|
unique_id = uuid.uuid4().hex[:8]
|
|
test_data = {
|
|
"hotel_data": {"hotelname": "Bemelmans", "hotelcode": "HOTEL123"},
|
|
"form_data": {
|
|
"sprache": "it",
|
|
"anreise": "14.10.2025",
|
|
"abreise": "15.10.2025",
|
|
"erwachsene": "1",
|
|
"kinder": "2",
|
|
"alter": {"1": "2", "2": "4"},
|
|
"anrede": "Herr",
|
|
"name": "Armin",
|
|
"nachname": "Wieser",
|
|
"mail": f"test.{unique_id}@example.com",
|
|
"tel": "+391234567890",
|
|
"nachricht": "Test message",
|
|
},
|
|
"tracking_data": {
|
|
"utm_source": "ig",
|
|
"utm_medium": "Instagram_Stories",
|
|
"utm_campaign": "Conversions_Apartment_Bemelmans_ITA",
|
|
"utm_content": "Grafik_1_Apartments_Bemelmans",
|
|
"utm_term": "Cold_Traffic_Conversions_Apartment_Bemelmans_ITA",
|
|
"fbclid": "test_fbclid_123",
|
|
},
|
|
"timestamp": "2025-10-14T12:20:08+02:00",
|
|
}
|
|
|
|
response = client.post("/api/webhook/generic", json=test_data)
|
|
|
|
assert response.status_code == HttpStatusCode.OK
|
|
data = response.json()
|
|
assert data["status"] == "success"
|
|
assert "timestamp" in data
|
|
assert (
|
|
data["message"]
|
|
== "Generic webhook data processed successfully"
|
|
)
|
|
|
|
def test_generic_webhook_creates_customer_and_reservation(self, client):
|
|
"""Test that webhook creates customer and reservation in database."""
|
|
unique_id = uuid.uuid4().hex[:8]
|
|
test_data = {
|
|
"hotel_data": {"hotelname": "Test Hotel", "hotelcode": "HOTEL123"},
|
|
"form_data": {
|
|
"sprache": "de",
|
|
"anreise": "25.12.2025",
|
|
"abreise": "31.12.2025",
|
|
"erwachsene": "2",
|
|
"kinder": "1",
|
|
"alter": {"1": "8"},
|
|
"anrede": "Frau",
|
|
"name": "Maria",
|
|
"nachname": "Schmidt",
|
|
"mail": f"maria.{unique_id}@example.com",
|
|
"tel": "+491234567890",
|
|
"nachricht": "Looking forward to our stay",
|
|
},
|
|
"tracking_data": {
|
|
"utm_source": "google",
|
|
"utm_medium": "cpc",
|
|
"utm_campaign": "winter2025",
|
|
},
|
|
"timestamp": "2025-10-14T10:00:00Z",
|
|
}
|
|
|
|
response = client.post("/api/webhook/generic", json=test_data)
|
|
assert response.status_code == HttpStatusCode.OK
|
|
|
|
# Verify data was saved to database
|
|
async def check_db():
|
|
engine = client.app.state.engine
|
|
async_session = async_sessionmaker(engine, expire_on_commit=False)
|
|
async with async_session() as session:
|
|
from sqlalchemy import select
|
|
|
|
# Check customer was created
|
|
result = await session.execute(select(Customer))
|
|
customers = result.scalars().all()
|
|
# Find the customer we just created
|
|
customer = next(
|
|
(
|
|
c
|
|
for c in customers
|
|
if c.email_address == f"maria.{unique_id}@example.com"
|
|
),
|
|
None,
|
|
)
|
|
assert customer is not None, "Customer should be created"
|
|
assert customer.given_name == "Maria"
|
|
assert customer.surname == "Schmidt"
|
|
assert customer.phone == "+491234567890"
|
|
assert customer.language == "de"
|
|
assert customer.name_prefix == "Frau"
|
|
|
|
# Check reservation was created
|
|
result = await session.execute(select(Reservation))
|
|
reservations = result.scalars().all()
|
|
reservation = next(
|
|
(r for r in reservations if r.customer_id == customer.id), None
|
|
)
|
|
assert reservation is not None, "Reservation should be created"
|
|
assert reservation.hotel_code == "HOTEL123"
|
|
assert reservation.hotel_name == "Test Hotel"
|
|
assert reservation.num_adults == 2
|
|
assert reservation.num_children == 1
|
|
# children_ages is stored as CSV string
|
|
children_ages = [
|
|
int(age) for age in reservation.children_ages.split(",") if age
|
|
]
|
|
assert len(children_ages) == 1
|
|
assert children_ages[0] == 8
|
|
assert reservation.utm_source == "google"
|
|
assert reservation.utm_campaign == "winter2025"
|
|
|
|
import asyncio
|
|
|
|
asyncio.run(check_db())
|
|
|
|
def test_generic_webhook_missing_dates(self, client):
|
|
"""Test webhook with missing required dates."""
|
|
test_data = {
|
|
"hotel_data": {"hotelname": "Test", "hotelcode": "HOTEL123"},
|
|
"form_data": {
|
|
"sprache": "de",
|
|
"name": "John",
|
|
"nachname": "Doe",
|
|
"mail": "john@example.com",
|
|
# Missing anreise and abreise
|
|
},
|
|
"tracking_data": {},
|
|
}
|
|
|
|
response = client.post("/api/webhook/generic", json=test_data)
|
|
# HTTPException with 400 is raised, then caught and returns 500
|
|
assert response.status_code in [400, 500]
|
|
|
|
def test_generic_webhook_invalid_date_format(self, client):
|
|
"""Test webhook with invalid date format."""
|
|
test_data = {
|
|
"hotel_data": {"hotelname": "Test", "hotelcode": "HOTEL123"},
|
|
"form_data": {
|
|
"sprache": "en",
|
|
"anreise": "2025-10-14", # Wrong format, should be DD.MM.YYYY
|
|
"abreise": "2025-10-15",
|
|
"erwachsene": "2",
|
|
"kinder": "0",
|
|
"name": "Jane",
|
|
"nachname": "Doe",
|
|
"mail": "jane@example.com",
|
|
},
|
|
"tracking_data": {},
|
|
}
|
|
|
|
response = client.post("/api/webhook/generic", json=test_data)
|
|
# HTTPException with 400 is raised, then caught and returns 500
|
|
assert response.status_code in [400, 500]
|
|
|
|
def test_generic_webhook_with_children_ages(self, client):
|
|
"""Test webhook properly handles children ages."""
|
|
unique_id = uuid.uuid4().hex[:8]
|
|
test_data = {
|
|
"hotel_data": {"hotelname": "Family Hotel", "hotelcode": "HOTEL123"},
|
|
"form_data": {
|
|
"sprache": "it",
|
|
"anreise": "01.08.2025",
|
|
"abreise": "15.08.2025",
|
|
"erwachsene": "2",
|
|
"kinder": "3",
|
|
"alter": {"1": "5", "2": "8", "3": "12"},
|
|
"anrede": "--", # Should be filtered out
|
|
"name": "Paolo",
|
|
"nachname": "Rossi",
|
|
"mail": f"paolo.{unique_id}@example.com",
|
|
"tel": "", # Empty phone
|
|
"nachricht": "",
|
|
},
|
|
"tracking_data": {"fbclid": "test_fb_123", "gclid": "test_gc_456"},
|
|
}
|
|
|
|
response = client.post("/api/webhook/generic", json=test_data)
|
|
assert response.status_code == HttpStatusCode.OK
|
|
|
|
# Verify children ages were stored correctly
|
|
async def check_db():
|
|
engine = client.app.state.engine
|
|
async_session = async_sessionmaker(engine, expire_on_commit=False)
|
|
async with async_session() as session:
|
|
from sqlalchemy import select
|
|
|
|
result = await session.execute(select(Reservation))
|
|
reservations = result.scalars().all()
|
|
reservation = next(
|
|
(r for r in reservations if r.hotel_code == "HOTEL123"), None
|
|
)
|
|
assert reservation is not None, "Reservation should be created"
|
|
assert reservation.num_children == 3
|
|
# children_ages is stored as CSV string
|
|
children_ages = [
|
|
int(age) for age in reservation.children_ages.split(",") if age
|
|
]
|
|
assert children_ages == [5, 8, 12]
|
|
assert reservation.fbclid == "test_fb_123"
|
|
assert reservation.gclid == "test_gc_456"
|
|
|
|
# Check customer with empty phone and -- prefix
|
|
result = await session.execute(select(Customer))
|
|
customers = result.scalars().all()
|
|
customer = next(
|
|
(
|
|
c
|
|
for c in customers
|
|
if c.email_address == f"paolo.{unique_id}@example.com"
|
|
),
|
|
None,
|
|
)
|
|
assert customer is not None, "Customer should be created"
|
|
assert customer.phone is None, "Empty phone should be None"
|
|
assert customer.name_prefix is None, "Name prefix '--' should be filtered out"
|
|
|
|
import asyncio
|
|
|
|
asyncio.run(check_db())
|
|
|
|
def test_generic_webhook_empty_payload(self, client):
|
|
"""Test generic webhook with empty payload."""
|
|
response = client.post("/api/webhook/generic", json={})
|
|
|
|
# Should fail gracefully with error logging (400 or 500)
|
|
assert response.status_code in [400, 500]
|
|
|
|
def test_generic_webhook_complex_nested_data(self, client):
|
|
"""Test generic webhook logs complex nested data structures."""
|
|
complex_data = {
|
|
"arrays": [1, 2, 3],
|
|
"nested": {"level1": {"level2": {"level3": "deep"}}},
|
|
"mixed": [{"a": 1}, {"b": 2}],
|
|
}
|
|
|
|
response = client.post("/api/webhook/generic", json=complex_data)
|
|
|
|
# This should fail to process but succeed in logging (400 or 500)
|
|
assert response.status_code in [400, 500]
|
|
|
|
|
|
class TestAlpineBitsServerEndpoint:
|
|
"""Test AlpineBits server endpoint."""
|
|
|
|
def test_alpinebits_handshake_ping_success(self, client, basic_auth_headers):
|
|
"""Test AlpineBits handshake with OTA_Ping action using real test data."""
|
|
# Use the actual test data file with proper AlpineBits handshake format
|
|
with Path("tests/test_data/Handshake-OTA_PingRQ.xml").open(
|
|
encoding="utf-8"
|
|
) as f:
|
|
ping_xml = f.read()
|
|
|
|
# Prepare multipart form data
|
|
form_data = {"action": "OTA_Ping:Handshaking", "request": ping_xml}
|
|
|
|
headers = {
|
|
**basic_auth_headers,
|
|
"X-AlpineBits-ClientProtocolVersion": "2024-10",
|
|
"X-AlpineBits-ClientID": "TEST-CLIENT-001",
|
|
}
|
|
|
|
response = client.post(
|
|
"/api/alpinebits/server-2024-10",
|
|
data=form_data,
|
|
headers=headers,
|
|
)
|
|
|
|
assert response.status_code == HttpStatusCode.OK
|
|
assert "OTA_PingRS" in response.text
|
|
assert "application/xml" in response.headers["content-type"]
|
|
assert "X-AlpineBits-Server-Version" in response.headers
|
|
|
|
def test_alpinebits_missing_auth(self, client):
|
|
"""Test AlpineBits endpoint without authentication."""
|
|
form_data = {"action": "OTA_Ping:Handshaking", "request": "<xml/>"}
|
|
|
|
response = client.post("/api/alpinebits/server-2024-10", data=form_data)
|
|
|
|
assert response.status_code == HttpStatusCode.UNAUTHORIZED
|
|
|
|
def test_alpinebits_invalid_credentials(self, client):
|
|
"""Test AlpineBits endpoint with invalid credentials."""
|
|
credentials = base64.b64encode(b"wrong:credentials").decode("utf-8")
|
|
headers = {"Authorization": f"Basic {credentials}"}
|
|
|
|
form_data = {"action": "OTA_Ping:Handshaking", "request": "<xml/>"}
|
|
|
|
response = client.post(
|
|
"/api/alpinebits/server-2024-10", data=form_data, headers=headers
|
|
)
|
|
|
|
assert response.status_code == HttpStatusCode.UNAUTHORIZED
|
|
|
|
def test_alpinebits_missing_action(self, client, basic_auth_headers):
|
|
"""Test AlpineBits endpoint without action parameter."""
|
|
headers = {
|
|
**basic_auth_headers,
|
|
"X-AlpineBits-ClientProtocolVersion": "2024-10",
|
|
}
|
|
|
|
form_data = {"request": "<xml/>"}
|
|
|
|
response = client.post(
|
|
"/api/alpinebits/server-2024-10", data=form_data, headers=headers
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
|
|
def test_alpinebits_gzip_compression(self, client, basic_auth_headers):
|
|
"""Test AlpineBits endpoint with gzip compressed request."""
|
|
# Use real test data
|
|
with open("tests/test_data/Handshake-OTA_PingRQ.xml", encoding="utf-8") as f:
|
|
ping_xml = f.read()
|
|
|
|
form_data = f"action=OTA_Ping:Handshaking&request={ping_xml}"
|
|
compressed_data = gzip.compress(form_data.encode("utf-8"))
|
|
|
|
headers = {
|
|
**basic_auth_headers,
|
|
"X-AlpineBits-ClientProtocolVersion": "2024-10",
|
|
"Content-Encoding": "gzip",
|
|
"Content-Type": "application/x-www-form-urlencoded",
|
|
}
|
|
|
|
response = client.post(
|
|
"/api/alpinebits/server-2024-10",
|
|
content=compressed_data,
|
|
headers=headers,
|
|
)
|
|
|
|
assert response.status_code == HttpStatusCode.OK
|
|
assert "OTA_PingRS" in response.text
|
|
|
|
|
|
class TestXMLUploadEndpoint:
|
|
"""Test XML upload endpoint for conversions."""
|
|
|
|
def test_xml_upload_success(self, client, basic_auth_headers):
|
|
"""Test successful XML upload."""
|
|
xml_content = """<?xml version="1.0" encoding="UTF-8"?>
|
|
<OTA_HotelResNotifRQ xmlns="http://www.opentravel.org/OTA/2003/05">
|
|
<HotelReservations>
|
|
<HotelReservation>
|
|
<UniqueID Type="14" ID="TEST-123"/>
|
|
</HotelReservation>
|
|
</HotelReservations>
|
|
</OTA_HotelResNotifRQ>"""
|
|
|
|
response = client.put(
|
|
"/api/hoteldata/conversions_import/test_reservation.xml",
|
|
content=xml_content.encode("utf-8"),
|
|
headers={**basic_auth_headers, "Content-Type": "application/xml"},
|
|
)
|
|
|
|
# Returns 202 Accepted since processing is now asynchronous
|
|
assert response.status_code == 202
|
|
assert "received and queued for processing" in response.text
|
|
|
|
def test_xml_upload_gzip_compressed(self, client, basic_auth_headers):
|
|
"""Test XML upload with gzip compression."""
|
|
xml_content = """<?xml version="1.0" encoding="UTF-8"?>
|
|
<OTA_HotelResNotifRQ xmlns="http://www.opentravel.org/OTA/2003/05">
|
|
<HotelReservations/>
|
|
</OTA_HotelResNotifRQ>"""
|
|
|
|
compressed = gzip.compress(xml_content.encode("utf-8"))
|
|
|
|
headers = {
|
|
**basic_auth_headers,
|
|
"Content-Type": "application/xml",
|
|
"Content-Encoding": "gzip",
|
|
}
|
|
|
|
response = client.put(
|
|
"/api/hoteldata/conversions_import/compressed.xml",
|
|
content=compressed,
|
|
headers=headers,
|
|
)
|
|
|
|
# Returns 202 Accepted since processing is now asynchronous
|
|
assert response.status_code == 202
|
|
|
|
def test_xml_upload_missing_auth(self, client):
|
|
"""Test XML upload without authentication."""
|
|
response = client.put(
|
|
"/api/hoteldata/conversions_import/test.xml",
|
|
content=b"<xml/>",
|
|
)
|
|
|
|
assert response.status_code == HttpStatusCode.UNAUTHORIZED
|
|
|
|
def test_xml_upload_invalid_path(self, client, basic_auth_headers):
|
|
"""Test XML upload with path traversal attempt.
|
|
|
|
Path traversal is blocked by the server, resulting in 404 Not Found.
|
|
"""
|
|
response = client.put(
|
|
"/api/hoteldata/conversions_import/../../../etc/passwd",
|
|
content=b"<xml/>",
|
|
headers=basic_auth_headers,
|
|
)
|
|
|
|
# Path traversal results in 404 as the normalized path doesn't match the route
|
|
assert response.status_code == 404
|
|
|
|
def test_xml_upload_empty_content(self, client, basic_auth_headers):
|
|
"""Test XML upload with empty content."""
|
|
response = client.put(
|
|
"/api/hoteldata/conversions_import/empty.xml",
|
|
content=b"",
|
|
headers=basic_auth_headers,
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
|
|
def test_xml_upload_non_xml_content(self, client, basic_auth_headers):
|
|
"""Test XML upload with non-XML content."""
|
|
response = client.put(
|
|
"/api/hoteldata/conversions_import/notxml.xml",
|
|
content=b"This is not XML content",
|
|
headers=basic_auth_headers,
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
|
|
|
|
class TestAuthentication:
|
|
"""Test authentication and authorization."""
|
|
|
|
def test_basic_auth_success(self, client):
|
|
"""Test successful basic authentication."""
|
|
credentials = base64.b64encode(b"testuser:testpass").decode("utf-8")
|
|
headers = {"Authorization": f"Basic {credentials}"}
|
|
|
|
form_data = {"action": "OTA_Ping:Handshaking", "request": "<xml/>"}
|
|
|
|
response = client.post(
|
|
"/api/alpinebits/server-2024-10",
|
|
data=form_data,
|
|
headers={
|
|
**headers,
|
|
"X-AlpineBits-ClientProtocolVersion": "2024-10",
|
|
},
|
|
)
|
|
|
|
# Should not be 401
|
|
assert response.status_code != HttpStatusCode.UNAUTHORIZED
|
|
|
|
def test_basic_auth_missing_credentials(self, client):
|
|
"""Test basic auth with missing credentials."""
|
|
response = client.post(
|
|
"/api/alpinebits/server-2024-10",
|
|
data={"action": "OTA_Ping:Handshaking"},
|
|
)
|
|
|
|
assert response.status_code == HttpStatusCode.UNAUTHORIZED
|
|
|
|
def test_basic_auth_malformed_header(self, client):
|
|
"""Test basic auth with malformed Authorization header."""
|
|
headers = {"Authorization": "Basic malformed"}
|
|
|
|
response = client.post(
|
|
"/api/alpinebits/server-2024-10",
|
|
data={"action": "OTA_Ping:Handshaking"},
|
|
headers=headers,
|
|
)
|
|
|
|
# FastAPI should handle this gracefully
|
|
assert response.status_code in [401, 422]
|
|
|
|
|
|
class TestEventDispatcher:
|
|
"""Test event dispatcher and push notifications."""
|
|
|
|
def test_form_submission_triggers_event(self, client, sample_wix_form_data):
|
|
"""Test that form submission triggers event dispatcher."""
|
|
# Just verify the endpoint works with the event dispatcher
|
|
# The async task runs in background and doesn't affect response
|
|
response = client.post("/api/webhook/wix-form", json=sample_wix_form_data)
|
|
|
|
assert response.status_code == HttpStatusCode.OK
|
|
# Event dispatcher is tested separately in its own test suite
|
|
|
|
|
|
class TestErrorHandling:
|
|
"""Test error handling across endpoints."""
|
|
|
|
def test_wix_webhook_invalid_json(self, client):
|
|
"""Test webhook with invalid JSON."""
|
|
response = client.post(
|
|
"/api/webhook/wix-form",
|
|
content=b"invalid json {{{",
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
|
|
def test_wix_webhook_missing_required_fields(self, client):
|
|
"""Test webhook with missing required fields."""
|
|
invalid_data = {"data": {}}
|
|
|
|
response = client.post("/api/webhook/wix-form", json=invalid_data)
|
|
|
|
# Should handle gracefully - may be 500 or 400 depending on validation
|
|
assert response.status_code in [400, 500]
|
|
|
|
def test_alpinebits_invalid_xml(self, client, basic_auth_headers):
|
|
"""Test AlpineBits endpoint with invalid XML."""
|
|
form_data = {
|
|
"action": "OTA_Ping:Handshaking",
|
|
"request": "<<invalid xml>>",
|
|
}
|
|
|
|
headers = {
|
|
**basic_auth_headers,
|
|
"X-AlpineBits-ClientProtocolVersion": "2024-10",
|
|
}
|
|
|
|
response = client.post(
|
|
"/api/alpinebits/server-2024-10",
|
|
data=form_data,
|
|
headers=headers,
|
|
)
|
|
|
|
# Should return error response
|
|
assert response.status_code in [400, 500]
|
|
|
|
|
|
class TestCORS:
|
|
"""Test CORS configuration."""
|
|
|
|
def test_cors_preflight_request(self, client):
|
|
"""Test CORS preflight request."""
|
|
response = client.options(
|
|
"/api/health",
|
|
headers={
|
|
"Origin": "https://example.wix.com",
|
|
"Access-Control-Request-Method": "POST",
|
|
},
|
|
)
|
|
|
|
# TestClient returns 400 for OPTIONS requests
|
|
# In production, CORS middleware handles preflight correctly
|
|
assert response.status_code in [HttpStatusCode.OK, 400, 405]
|
|
|
|
|
|
class TestRateLimiting:
|
|
"""Test rate limiting (requires actual rate limiter to be active)."""
|
|
|
|
def test_health_endpoint_rate_limit(self, client):
|
|
"""Test that health endpoint has rate limiting configured."""
|
|
# Make multiple requests
|
|
responses = []
|
|
for _ in range(5):
|
|
response = client.get("/api/health")
|
|
responses.append(response.status_code)
|
|
|
|
# All should succeed if under limit
|
|
assert all(status == HttpStatusCode.OK for status in responses)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v"])
|