13 Commits

Author SHA1 Message Date
Jonas Linter
5cec464ac2 Added tests for double reservation by one customer 2025-10-10 16:50:43 +02:00
Jonas Linter
1248772f60 Updateinsert customers 2025-10-10 16:47:19 +02:00
Jonas Linter
165914d686 Added generic endpoint 2025-10-10 16:21:25 +02:00
Jonas Linter
dbbdb3694b More tests 2025-10-10 16:17:01 +02:00
Jonas Linter
6ab5212a0f MORE Tests 2025-10-10 11:28:25 +02:00
Jonas Linter
4ac5a148b6 Cleanup 2025-10-10 10:45:47 +02:00
Jonas Linter
5b91608577 Linting fixies 2025-10-10 10:21:13 +02:00
Jonas Linter
2c54303189 Replace with post for xml with put 2025-10-09 16:47:54 +02:00
Jonas Linter
123bd19e3c More cleanup. Fixing linting issues and stuff like that 2025-10-09 15:41:43 +02:00
Jonas Linter
f0beb294ee Cleanup. Removed some unneccessary files 2025-10-09 15:16:28 +02:00
Jonas Linter
a325a443f7 Better logger 2025-10-09 14:29:44 +02:00
Jonas Linter
f05cc9215e Updated config 2025-10-09 14:16:11 +02:00
Jonas Linter
162ef39013 added logging config. Not active yet 2025-10-09 11:06:22 +02:00
32 changed files with 15577 additions and 672 deletions

BIN
.coverage Normal file

Binary file not shown.

View File

@@ -26,6 +26,10 @@ Data flows: Wix form → Database → AlpineBits XML → Hotel systems (pull or
- Default config location: `config/config.yaml` + `config/secrets.yaml`
- Override via `ALPINEBITS_CONFIG_DIR` environment variable
- Multi-hotel support: Each hotel in `alpine_bits_auth` array gets own credentials and optional `push_endpoint`
- **Logging**: Centralized logging configured via `logger` section (see `logging_config.py` and `LOGGING.md`)
- Use `from logging_config import get_logger; _LOGGER = get_logger(__name__)` in any module
- Logs to console always; optionally to file if `logger.file` is set
- Format includes timestamp: `%(asctime)s - %(name)s - %(levelname)s - %(message)s`
### Database Layer

4
.gitignore vendored
View File

@@ -27,3 +27,7 @@ secrets.yaml
# ignore db
alpinebits.db
# test output files
test_output.txt
output.xml

1
CLAUDE.md Normal file
View File

@@ -0,0 +1 @@
This python project is managed by uv. Use uv run to execute app and tests.

118
LOGGING.md Normal file
View File

@@ -0,0 +1,118 @@
# Logging Configuration
The AlpineBits Python server uses a centralized logging system that can be configured via the `config.yaml` file.
## Configuration
Add the following section to your `config/config.yaml`:
```yaml
logger:
level: "INFO" # Options: DEBUG, INFO, WARNING, ERROR, CRITICAL
file: "logs/alpinebits.log" # Optional: path to log file (omit or set to null for console-only)
```
### Log Levels
- **DEBUG**: Detailed diagnostic information (very verbose)
- **INFO**: General informational messages about application progress
- **WARNING**: Warning messages about potential issues
- **ERROR**: Error messages when something goes wrong
- **CRITICAL**: Critical errors that may cause application failure
### Log Output
- **Console**: Logs are always written to console (stdout)
- **File**: Optionally write logs to a file by specifying the `file` parameter
- File logs include the same timestamp and formatting as console logs
- Log directory will be created automatically if it doesn't exist
## Usage in Code
To use logging in any module:
```python
from alpine_bits_python.logging_config import get_logger
_LOGGER = get_logger(__name__)
# Then use the logger
_LOGGER.info("Application started")
_LOGGER.debug("Detailed debug information: %s", some_variable)
_LOGGER.warning("Something unusual happened")
_LOGGER.error("An error occurred: %s", error_message)
_LOGGER.exception("Critical error with stack trace")
```
## Log Format
All log entries include:
- Timestamp (YYYY-MM-DD HH:MM:SS)
- Module name (logger name)
- Log level
- Message
Example:
```
2025-10-09 14:23:45 - alpine_bits_python.api - INFO - Application startup initiated
2025-10-09 14:23:45 - alpine_bits_python.api - INFO - Logging configured at INFO level
2025-10-09 14:23:46 - alpine_bits_python.api - INFO - Database tables checked/created at startup.
```
## Best Practices
1. **Use structured logging**: Pass variables as arguments, not f-strings
```python
# Good
_LOGGER.info("Processing reservation %s for hotel %s", reservation_id, hotel_code)
# Avoid (performance overhead, linting warnings)
_LOGGER.info(f"Processing reservation {reservation_id} for hotel {hotel_code}")
```
2. **Use appropriate log levels**:
- `DEBUG`: Detailed tracing for development
- `INFO`: Normal application flow events
- `WARNING`: Unexpected but handled situations
- `ERROR`: Errors that need attention
- `CRITICAL`: Severe errors requiring immediate action
3. **Use `exception()` for error handling**:
```python
try:
risky_operation()
except Exception:
_LOGGER.exception("Operation failed") # Automatically includes stack trace
```
4. **Don't log sensitive data**: Avoid logging passwords, tokens, or personal data
## Examples
### Console-only logging (development)
```yaml
logger:
level: "DEBUG"
```
### File logging (production)
```yaml
logger:
level: "INFO"
file: "/var/log/alpinebits/app.log"
```
### Minimal logging
```yaml
logger:
level: "WARNING"
file: "logs/warnings.log"
```

14061
alpinebits.log Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -8,6 +8,18 @@ database:
# AlpineBits Python config
# Use annotatedyaml for secrets and environment-specific overrides
server:
codecontext: "ADVERTISING"
code: 70597314
companyname: "99tales Gmbh"
res_id_source_context: "99tales"
logger:
level: "INFO" # Set to DEBUG for more verbose output
file: "alpinebits.log" # Log file path, or null for console only
alpine_bits_auth:
- hotel_id: "39054_001"
hotel_name: "Bemelmans Post"

1
coverage.json Normal file

File diff suppressed because one or more lines are too long

View File

@@ -147,3 +147,8 @@ select = [
"UP032", # Use f-string instead of `format` call
"W", # pycodestyle
]
[dependency-groups]
dev = [
"pytest-cov>=7.0.0",
]

View File

@@ -0,0 +1 @@
"""AlpineBits Python Server package."""

View File

@@ -1,7 +0,0 @@
"""Entry point for alpine_bits_python package."""
from .main import main
if __name__ == "__main__":
print("running test main")
main()

View File

@@ -1,4 +1,3 @@
import logging
import traceback
from dataclasses import dataclass
from datetime import UTC
@@ -6,6 +5,7 @@ from enum import Enum
from typing import Any
from alpine_bits_python.db import Customer, Reservation
from alpine_bits_python.logging_config import get_logger
from alpine_bits_python.schemas import (
CommentData,
CommentListItemData,
@@ -25,8 +25,7 @@ from .generated.alpinebits import (
UniqueIdType2,
)
_LOGGER = logging.getLogger(__name__)
_LOGGER.setLevel(logging.INFO)
_LOGGER = get_logger(__name__)
# Define type aliases for the two Customer types
NotifCustomer = OtaHotelResNotifRq.HotelReservations.HotelReservation.ResGuests.ResGuest.Profiles.ProfileInfo.Profile.Customer # noqa: E501
@@ -74,6 +73,8 @@ RetrieveRoomStays = OtaResRetrieveRs.ReservationsList.HotelReservation.RoomStays
NotifHotelReservation = OtaHotelResNotifRq.HotelReservations.HotelReservation
RetrieveHotelReservation = OtaResRetrieveRs.ReservationsList.HotelReservation
from .const import RESERVATION_ID_TYPE
# Enum to specify which OTA message type to use
class OtaMessageType(Enum):
@@ -604,19 +605,24 @@ class AlpineBitsFactory:
def create_res_retrieve_response(
list: list[tuple[Reservation, Customer]],
list: list[tuple[Reservation, Customer]], config: dict[str, Any]
) -> OtaResRetrieveRs:
"""Create RetrievedReservation XML from database entries."""
return _create_xml_from_db(list, OtaMessageType.RETRIEVE)
return _create_xml_from_db(list, OtaMessageType.RETRIEVE, config)
def create_res_notif_push_message(list: tuple[Reservation, Customer]):
def create_res_notif_push_message(
list: tuple[Reservation, Customer], config: dict[str, Any]
):
"""Create Reservation Notification XML from database entries."""
return _create_xml_from_db(list, OtaMessageType.NOTIF)
return _create_xml_from_db(list, OtaMessageType.NOTIF, config)
def _process_single_reservation(
reservation: Reservation, customer: Customer, message_type: OtaMessageType
reservation: Reservation,
customer: Customer,
message_type: OtaMessageType,
config: dict[str, Any],
):
phone_numbers = (
[(customer.phone, PhoneTechType.MOBILE)] if customer.phone is not None else []
@@ -698,11 +704,14 @@ def _process_single_reservation(
# - Trim whitespace
# - Truncate to 64 characters if needed
# - Convert empty strings to None
res_id_source_context = config["server"]["res_id_source_context"]
hotel_res_id_data = HotelReservationIdData(
res_id_type="13",
res_id_type=RESERVATION_ID_TYPE,
res_id_value=klick_id,
res_id_source=res_id_source,
res_id_source_context="99tales",
res_id_source_context=res_id_source_context,
)
hotel_res_id = alpine_bits_factory.create(hotel_res_id_data, message_type)
@@ -713,10 +722,7 @@ def _process_single_reservation(
if reservation.hotel_code is None:
raise ValueError("Reservation hotel_code is None")
hotel_code = str(reservation.hotel_code)
if reservation.hotel_name is None:
hotel_name = None
else:
hotel_name = str(reservation.hotel_name)
hotel_name = None if reservation.hotel_name is None else str(reservation.hotel_name)
basic_property_info = HotelReservation.ResGlobalInfo.BasicPropertyInfo(
hotel_code=hotel_code,
@@ -768,8 +774,12 @@ def _process_single_reservation(
comments_data = CommentsData(comments=comments)
comments_xml = alpine_bits_factory.create(comments_data, message_type)
company_name_value = config["server"]["companyname"]
company_code = config["server"]["code"]
codecontext = config["server"]["codecontext"]
company_name = Profile.CompanyInfo.CompanyName(
value="99tales GmbH", code="who knows?", code_context="who knows?"
value=company_name_value, code=company_code, code_context=codecontext
)
company_info = Profile.CompanyInfo(company_name=company_name)
@@ -805,6 +815,7 @@ def _process_single_reservation(
def _create_xml_from_db(
entries: list[tuple[Reservation, Customer]] | tuple[Reservation, Customer],
type: OtaMessageType,
config: dict[str, Any],
):
"""Create RetrievedReservation XML from database entries.
@@ -825,7 +836,9 @@ def _create_xml_from_db(
)
try:
hotel_reservation = _process_single_reservation(reservation, customer, type)
hotel_reservation = _process_single_reservation(
reservation, customer, type, config
)
reservations_list.append(hotel_reservation)
@@ -865,7 +878,7 @@ def _create_xml_from_db(
try:
ota_res_retrieve_rs.model_validate(ota_res_retrieve_rs.model_dump())
except Exception as e:
_LOGGER.error(f"Validation error: {e}")
_LOGGER.exception(f"Validation error: {e}")
raise
return ota_res_retrieve_rs

View File

@@ -7,7 +7,6 @@ handshaking functionality with configurable supported actions and capabilities.
import inspect
import json
import logging
import re
from abc import ABC
from dataclasses import dataclass
@@ -24,6 +23,7 @@ from alpine_bits_python.alpine_bits_helpers import (
create_res_notif_push_message,
create_res_retrieve_response,
)
from alpine_bits_python.logging_config import get_logger
from .db import AckedRequest, Customer, Reservation
from .generated.alpinebits import (
@@ -36,8 +36,7 @@ from .generated.alpinebits import (
)
# Configure logging
logging.basicConfig(level=logging.INFO)
_LOGGER = logging.getLogger(__name__)
_LOGGER = get_logger(__name__)
class HttpStatusCode(IntEnum):
@@ -129,7 +128,7 @@ class Version(str, Enum):
class AlpineBitsClientInfo:
"""Wrapper for username, password, client_id"""
"""Wrapper for username, password, client_id."""
def __init__(self, username: str, password: str, client_id: str | None = None):
self.username = username
@@ -213,7 +212,7 @@ class ServerCapabilities:
"""Discover all AlpineBitsAction implementations in the current module."""
current_module = inspect.getmodule(self)
for name, obj in inspect.getmembers(current_module):
for _name, obj in inspect.getmembers(current_module):
if (
inspect.isclass(obj)
and issubclass(obj, AlpineBitsAction)
@@ -231,9 +230,7 @@ class ServerCapabilities:
This is a simple check - in practice, you might want more sophisticated detection.
"""
# Check if the class has overridden the handle method
if "handle" in action_class.__dict__:
return True
return False
return "handle" in action_class.__dict__
def create_capabilities_dict(self) -> None:
"""Generate the capabilities dictionary based on discovered actions."""
@@ -552,7 +549,9 @@ class ReadAction(AlpineBitsAction):
customer.surname,
)
res_retrive_rs = create_res_retrieve_response(reservation_customer_pairs)
res_retrive_rs = create_res_retrieve_response(
reservation_customer_pairs, config=self.config
)
config = SerializerConfig(
pretty_print=True, xml_declaration=True, encoding="UTF-8"
@@ -635,7 +634,7 @@ class NotifReportReadAction(AlpineBitsAction):
class PushAction(AlpineBitsAction):
"""Creates the necessary xml for OTA_HotelResNotif:GuestRequests"""
"""Creates the necessary xml for OTA_HotelResNotif:GuestRequests."""
def __init__(self, config: dict = {}):
self.name = AlpineBitsActionName.OTA_HOTEL_RES_NOTIF_GUEST_REQUESTS
@@ -652,7 +651,9 @@ class PushAction(AlpineBitsAction):
server_capabilities=None,
) -> AlpineBitsResponse:
"""Create push request XML."""
xml_push_request = create_res_notif_push_message(request_xml)
xml_push_request = create_res_notif_push_message(
request_xml, config=self.config
)
config = SerializerConfig(
pretty_print=True, xml_declaration=True, encoding="UTF-8"
@@ -673,7 +674,7 @@ class AlpineBitsServer:
their capabilities, and can respond to handshake requests with its capabilities.
"""
def __init__(self, config: dict = None):
def __init__(self, config: dict | None = None):
self.capabilities = ServerCapabilities()
self._action_instances = {}
self.config = config
@@ -779,7 +780,6 @@ class AlpineBitsServer:
client_info=client_info,
)
except Exception as e:
print(f"Error handling request {request_action_name}: {e!s}")
# print stack trace for debugging
import traceback
@@ -792,7 +792,7 @@ class AlpineBitsServer:
def get_supported_request_names(self) -> list[str]:
"""Get all supported request names (not capability names)."""
request_names = []
for capability_name in self._action_instances.keys():
for capability_name in self._action_instances:
action_enum = AlpineBitsActionName.get_by_capability_name(capability_name)
if action_enum:
request_names.append(action_enum.request_name)

View File

@@ -1,7 +1,6 @@
import asyncio
import gzip
import json
import logging
import os
import urllib.parse
from collections import defaultdict
@@ -16,6 +15,7 @@ from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, Response
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from slowapi.errors import RateLimitExceeded
from sqlalchemy import select
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from alpine_bits_python.schemas import ReservationData
@@ -26,11 +26,12 @@ from .alpinebits_server import (
AlpineBitsServer,
Version,
)
from .auth import generate_api_key, generate_unique_id, validate_api_key
from .auth import generate_unique_id, validate_api_key
from .config_loader import load_config
from .db import Base, get_database_url
from .db import Customer as DBCustomer
from .db import Reservation as DBReservation
from .logging_config import get_logger, setup_logging
from .rate_limit import (
BURST_RATE_LIMIT,
DEFAULT_RATE_LIMIT,
@@ -40,9 +41,8 @@ from .rate_limit import (
webhook_limiter,
)
# Configure logging
logging.basicConfig(level=logging.INFO)
_LOGGER = logging.getLogger(__name__)
# Configure logging - will be reconfigured during lifespan with actual config
_LOGGER = get_logger(__name__)
# HTTP Basic auth for AlpineBits
security_basic = HTTPBasic()
@@ -58,7 +58,7 @@ class EventDispatcher:
self.listeners[event_name].append(func)
def register_hotel_listener(self, event_name, hotel_code, func):
"""Register a listener for a specific hotel"""
"""Register a listener for a specific hotel."""
self.hotel_listeners[f"{event_name}:{hotel_code}"].append(func)
async def dispatch(self, event_name, *args, **kwargs):
@@ -66,7 +66,7 @@ class EventDispatcher:
await func(*args, **kwargs)
async def dispatch_for_hotel(self, event_name, hotel_code, *args, **kwargs):
"""Dispatch event only to listeners registered for specific hotel"""
"""Dispatch event only to listeners registered for specific hotel."""
key = f"{event_name}:{hotel_code}"
for func in self.hotel_listeners[key]:
await func(*args, **kwargs)
@@ -79,12 +79,13 @@ event_dispatcher = EventDispatcher()
async def push_listener(customer: DBCustomer, reservation: DBReservation, hotel):
"""Push listener that sends reservation data to hotel's push endpoint.
Only called for reservations that match this hotel's hotel_id.
"""
push_endpoint = hotel.get("push_endpoint")
if not push_endpoint:
_LOGGER.warning(
f"No push endpoint configured for hotel {hotel.get('hotel_id')}"
"No push endpoint configured for hotel %s", hotel.get("hotel_id")
)
return
@@ -95,12 +96,16 @@ async def push_listener(customer: DBCustomer, reservation: DBReservation, hotel)
# Double-check hotel matching (should be guaranteed by dispatcher)
if hotel_id != reservation_hotel_id:
_LOGGER.warning(
f"Hotel ID mismatch: listener for {hotel_id}, reservation for {reservation_hotel_id}"
"Hotel ID mismatch: listener for %s, reservation for %s",
hotel_id,
reservation_hotel_id,
)
return
_LOGGER.info(
f"Processing push notification for hotel {hotel_id}, reservation {reservation.unique_id}"
"Processing push notification for hotel %s, reservation %s",
hotel_id,
reservation.unique_id,
)
# Prepare payload for push notification
@@ -114,15 +119,18 @@ async def push_listener(customer: DBCustomer, reservation: DBReservation, hotel)
if request.status_code != 200:
_LOGGER.error(
f"Failed to generate push request for hotel {hotel_id}, reservation {reservation.unique_id}: {request.xml_content}"
"Failed to generate push request for hotel %s, reservation %s: %s",
hotel_id,
reservation.unique_id,
request.xml_content,
)
return
# save push request to file
logs_dir = "logs/push_requests"
if not os.path.exists(logs_dir):
os.makedirs(logs_dir, mode=0o755, exist_ok=True)
if not Path.exists(logs_dir):
Path.mkdir(logs_dir, mode=0o755, exist_ok=True)
stat_info = os.stat(logs_dir)
_LOGGER.info(
f"Created directory owner: uid:{stat_info.st_uid}, gid:{stat_info.st_gid}"
@@ -155,7 +163,7 @@ async def push_listener(customer: DBCustomer, reservation: DBReservation, hotel)
)
except Exception as e:
_LOGGER.error(f"Push event failed for hotel {hotel['hotel_id']}: {e}")
_LOGGER.exception(f"Push event failed for hotel {hotel['hotel_id']}: {e}")
# Optionally implement retry logic here@asynccontextmanager
@@ -164,12 +172,16 @@ async def lifespan(app: FastAPI):
try:
config = load_config()
except Exception as e:
_LOGGER.error(f"Failed to load config: {e!s}")
except Exception:
_LOGGER.exception("Failed to load config: ")
config = {}
# Setup logging from config
setup_logging(config)
_LOGGER.info("Application startup initiated")
DATABASE_URL = get_database_url(config)
engine = create_async_engine(DATABASE_URL, echo=True)
engine = create_async_engine(DATABASE_URL, echo=False)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
app.state.engine = engine
@@ -247,7 +259,7 @@ app.add_middleware(
@api_router.get("/")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def root(request: Request):
"""Health check endpoint"""
"""Health check endpoint."""
return {
"message": "Wix Form Handler API is running",
"timestamp": datetime.now().isoformat(),
@@ -264,7 +276,7 @@ async def root(request: Request):
@api_router.get("/health")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def health_check(request: Request):
"""Detailed health check"""
"""Detailed health check."""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
@@ -334,7 +346,7 @@ async def process_wix_form_submission(request: Request, data: dict[str, Any], db
last_name = contact_info.get("name", {}).get("last")
email = contact_info.get("email")
phone_number = contact_info.get("phones", [{}])[0].get("e164Phone")
locale = contact_info.get("locale", "de-de")
contact_info.get("locale", "de-de")
contact_id = contact_info.get("contactId")
name_prefix = data.get("field:anrede")
@@ -366,7 +378,7 @@ async def process_wix_form_submission(request: Request, data: dict[str, Any], db
num_children = int(data.get("field:anzahl_kinder") or 0)
children_ages = []
if num_children > 0:
for k in data.keys():
for k in data:
if k.startswith("field:alter_kind_"):
try:
age = int(data[k])
@@ -382,28 +394,57 @@ async def process_wix_form_submission(request: Request, data: dict[str, Any], db
# use database session
# Save all relevant data to DB (including new fields)
db_customer = DBCustomer(
given_name=first_name,
surname=last_name,
contact_id=contact_id,
name_prefix=name_prefix,
email_address=email,
phone=phone_number,
email_newsletter=email_newsletter,
address_line=address_line,
city_name=city_name,
postal_code=postal_code,
country_code=country_code,
gender=gender,
birth_date=birth_date,
language=language,
address_catalog=False,
name_title=None,
)
db.add(db_customer)
await db.flush() # This assigns db_customer.id without committing
# await db.refresh(db_customer)
# Check if customer with this contact_id already exists
existing_customer = None
if contact_id:
result = await db.execute(
select(DBCustomer).where(DBCustomer.contact_id == contact_id)
)
existing_customer = result.scalar_one_or_none()
if existing_customer:
# Update existing customer with new information
_LOGGER.info("Updating existing customer with contact_id: %s", contact_id)
existing_customer.given_name = first_name
existing_customer.surname = last_name
existing_customer.name_prefix = name_prefix
existing_customer.email_address = email
existing_customer.phone = phone_number
existing_customer.email_newsletter = email_newsletter
existing_customer.address_line = address_line
existing_customer.city_name = city_name
existing_customer.postal_code = postal_code
existing_customer.country_code = country_code
existing_customer.gender = gender
existing_customer.birth_date = birth_date
existing_customer.language = language
existing_customer.address_catalog = False
existing_customer.name_title = None
db_customer = existing_customer
await db.flush()
else:
# Create new customer
_LOGGER.info("Creating new customer with contact_id: %s", contact_id)
db_customer = DBCustomer(
given_name=first_name,
surname=last_name,
contact_id=contact_id,
name_prefix=name_prefix,
email_address=email,
phone=phone_number,
email_newsletter=email_newsletter,
address_line=address_line,
city_name=city_name,
postal_code=postal_code,
country_code=country_code,
gender=gender,
birth_date=birth_date,
language=language,
address_catalog=False,
name_title=None,
)
db.add(db_customer)
await db.flush() # This assigns db_customer.id without committing
# Determine hotel_code and hotel_name
# Priority: 1) Form field, 2) Configuration default, 3) Hardcoded fallback
@@ -537,7 +578,7 @@ async def handle_wix_form(
try:
return await process_wix_form_submission(request, data, db_session)
except Exception as e:
_LOGGER.error("Error in handle_wix_form: %s", e)
_LOGGER.exception("Error in handle_wix_form: %s", e)
# log stacktrace
import traceback
@@ -552,25 +593,90 @@ async def handle_wix_form_test(
request: Request, data: dict[str, Any], db_session=Depends(get_async_session)
):
"""Test endpoint to verify the API is working with raw JSON data.
No authentication required for testing purposes.
"""
try:
return await process_wix_form_submission(request, data, db_session)
except Exception as e:
_LOGGER.error(f"Error in handle_wix_form_test: {e!s}")
_LOGGER.exception(f"Error in handle_wix_form_test: {e!s}")
raise HTTPException(status_code=500, detail="Error processing test data")
@api_router.post("/hoteldata/conversions_import")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def handle_xml_upload(
request: Request, credentials_tupel: tuple = Depends(validate_basic_auth)
):
"""Endpoint for receiving XML files for conversion processing.
Requires basic authentication and saves XML files to log directory.
Supports gzip compression via Content-Encoding header.
@api_router.post("/webhook/generic")
@webhook_limiter.limit(WEBHOOK_RATE_LIMIT)
async def handle_generic_webhook(request: Request, data: dict[str, Any]):
"""Handle generic webhook endpoint for receiving JSON payloads.
Logs the data to file for later analysis. Does not process the data
or save to database since the structure is not yet known.
No authentication required for this endpoint.
"""
try:
timestamp = datetime.now().isoformat()
_LOGGER.info("Received generic webhook data at %s", timestamp)
# Create log entry with metadata
log_entry = {
"timestamp": timestamp,
"client_ip": request.client.host if request.client else "unknown",
"headers": dict(request.headers),
"data": data,
"origin_header": request.headers.get("origin"),
}
# Create logs directory if it doesn't exist
logs_dir = Path("logs/generic_webhooks")
if not logs_dir.exists():
logs_dir.mkdir(parents=True, mode=0o755, exist_ok=True)
_LOGGER.info("Created directory: %s", logs_dir)
# Generate log filename with timestamp
log_filename = (
logs_dir / f"webhook_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
)
# Write log file
with log_filename.open("w", encoding="utf-8") as f:
json.dump(log_entry, f, indent=2, default=str, ensure_ascii=False)
_LOGGER.info("Generic webhook data logged to: %s", log_filename)
except Exception as e:
_LOGGER.exception("Error in handle_generic_webhook")
raise HTTPException(
status_code=500, detail="Error processing generic webhook data"
) from e
else:
return {
"status": "success",
"message": "Generic webhook data received successfully",
"data_logged_to": str(log_filename),
"timestamp": timestamp,
"note": "Data logged for later analysis",
}
@api_router.put("/hoteldata/conversions_import/{filename:path}")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def handle_xml_upload(
request: Request,
filename: str,
credentials_tupel: tuple = Depends(validate_basic_auth),
):
"""Endpoint for receiving XML files for conversion processing via PUT.
Requires basic authentication and saves XML files to log directory.
Supports gzip compression via Content-Encoding header.
Example: PUT /api/hoteldata/conversions_import/Reservierungen.xml
"""
try:
# Validate filename to prevent path traversal
if ".." in filename or filename.startswith("/"):
raise HTTPException(status_code=400, detail="ERROR: Invalid filename")
# Get the raw body content
body = await request.body()
@@ -615,12 +721,20 @@ async def handle_xml_upload(
# Generate filename with timestamp and authenticated user
username, _ = credentials_tupel
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_filename = logs_dir / f"xml_import_{username}_{timestamp}.xml"
# Use the filename from the path, but add timestamp and username for uniqueness
base_filename = Path(filename).stem
extension = Path(filename).suffix or ".xml"
log_filename = logs_dir / f"{base_filename}_{username}_{timestamp}{extension}"
# Save XML content to file
log_filename.write_text(xml_content, encoding="utf-8")
_LOGGER.info("XML file saved to %s by user %s", log_filename, username)
_LOGGER.info(
"XML file saved to %s by user %s (original: %s)",
log_filename,
username,
filename,
)
response_headers = {
"Content-Type": "application/xml; charset=utf-8",
@@ -638,30 +752,6 @@ async def handle_xml_upload(
raise HTTPException(status_code=500, detail="Error processing XML upload")
# UNUSED
@api_router.post("/admin/generate-api-key")
@limiter.limit("5/hour") # Very restrictive for admin operations
async def generate_new_api_key(
request: Request, admin_key: str = Depends(validate_api_key)
):
"""Admin endpoint to generate new API keys.
Requires admin API key and is heavily rate limited.
"""
if admin_key != "admin-key":
raise HTTPException(status_code=403, detail="Admin access required")
new_key = generate_api_key()
_LOGGER.info(f"Generated new API key (requested by: {admin_key})")
return {
"status": "success",
"message": "New API key generated",
"api_key": new_key,
"timestamp": datetime.now().isoformat(),
"note": "Store this key securely - it won't be shown again",
}
# TODO Bit sketchy. May need requests-toolkit in the future
def parse_multipart_data(content_type: str, body: bytes) -> dict[str, Any]:
"""Parse multipart/form-data from raw request body.
@@ -833,7 +923,7 @@ async def alpinebits_server_handshake(
# Re-raise HTTP exceptions (auth errors, etc.)
raise
except Exception as e:
_LOGGER.error(f"Error in AlpineBits handshake: {e!s}")
_LOGGER.exception(f"Error in AlpineBits handshake: {e!s}")
raise HTTPException(status_code=500, detail="Internal server error")

View File

@@ -1,6 +1,5 @@
import hashlib
import hmac
import logging
import os
import secrets
@@ -10,8 +9,9 @@ from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
# Load environment variables from .env file
load_dotenv()
from .logging_config import get_logger
logger = logging.getLogger(__name__)
logger = get_logger(__name__)
# Security scheme
security = HTTPBearer()
@@ -31,12 +31,12 @@ if os.getenv("ADMIN_API_KEY"):
def generate_unique_id() -> str:
"""Generate a unique ID with max length 35 characters"""
return secrets.token_urlsafe(26)[:35] # 26 bytes -> 35 chars in base64url
"""Generate a unique ID with max length 32 characters."""
return secrets.token_urlsafe(26)[:32] # 26 bytes -> 32 chars in base64url
def generate_api_key() -> str:
"""Generate a secure API key"""
"""Generate a secure API key."""
return f"sk_live_{secrets.token_urlsafe(32)}"
@@ -44,6 +44,7 @@ def validate_api_key(
credentials: HTTPAuthorizationCredentials = Security(security),
) -> str:
"""Validate API key from Authorization header.
Expected format: Authorization: Bearer your_api_key_here
"""
token = credentials.credentials
@@ -64,6 +65,7 @@ def validate_api_key(
def validate_wix_signature(payload: bytes, signature: str, secret: str) -> bool:
"""Validate Wix webhook signature for additional security.
Wix signs their webhooks with HMAC-SHA256.
"""
if not signature or not secret:
@@ -81,29 +83,29 @@ def validate_wix_signature(payload: bytes, signature: str, secret: str) -> bool:
# Compare signatures securely
return secrets.compare_digest(signature, expected_signature)
except Exception as e:
logger.error(f"Error validating signature: {e}")
logger.exception(f"Error validating signature: {e}")
return False
class APIKeyAuth:
"""Simple API key authentication class"""
"""Simple API key authentication class."""
def __init__(self, api_keys: dict):
self.api_keys = api_keys
def authenticate(self, api_key: str) -> str | None:
"""Authenticate an API key and return the key name if valid"""
"""Authenticate an API key and return the key name if valid."""
for key_name, valid_key in self.api_keys.items():
if secrets.compare_digest(api_key, valid_key):
return key_name
return None
def add_key(self, name: str, key: str):
"""Add a new API key"""
"""Add a new API key."""
self.api_keys[name] = key
def remove_key(self, name: str):
"""Remove an API key"""
"""Remove an API key."""
if name in self.api_keys:
del self.api_keys[name]

View File

@@ -1,12 +1,8 @@
import os
from pathlib import Path
from annotatedyaml.loader import (
Secrets,
)
from annotatedyaml.loader import (
load_yaml as load_annotated_yaml,
)
from annotatedyaml.loader import Secrets
from annotatedyaml.loader import load_yaml as load_annotated_yaml
from voluptuous import (
PREVENT_EXTRA,
All,
@@ -17,20 +13,68 @@ from voluptuous import (
Schema,
)
from alpine_bits_python.const import (
CONF_ALPINE_BITS_AUTH,
CONF_DATABASE,
CONF_HOTEL_ID,
CONF_HOTEL_NAME,
CONF_LOGGING,
CONF_LOGGING_FILE,
CONF_LOGGING_LEVEL,
CONF_PASSWORD,
CONF_PUSH_ENDPOINT,
CONF_PUSH_TOKEN,
CONF_PUSH_URL,
CONF_PUSH_USERNAME,
CONF_SERVER,
CONF_SERVER_CODE,
CONF_SERVER_CODECONTEXT,
CONF_SERVER_COMPANYNAME,
CONF_SERVER_RES_ID_SOURCE_CONTEXT,
CONF_USERNAME,
ENV_ALPINE_BITS_CONFIG_PATH,
)
# --- Voluptuous schemas ---
database_schema = Schema({Required("url"): str}, extra=PREVENT_EXTRA)
logger_schema = Schema(
{
Required(CONF_LOGGING_LEVEL, default="INFO"): str,
Optional(CONF_LOGGING_FILE): str, # If not provided, log to console
},
extra=PREVENT_EXTRA,
)
def ensure_string(value):
"""Ensure the value is a string."""
if isinstance(value, str):
return value
return str(value)
server_info = Schema(
{
Required(CONF_SERVER_CODECONTEXT, default="ADVERTISING"): ensure_string,
Required(CONF_SERVER_CODE, default="70597314"): ensure_string,
Required(CONF_SERVER_COMPANYNAME, default="99tales Gmbh"): ensure_string,
Required(CONF_SERVER_RES_ID_SOURCE_CONTEXT, default="99tales"): ensure_string,
}
)
hotel_auth_schema = Schema(
{
Required("hotel_id"): str,
Required("hotel_name"): str,
Required("username"): str,
Required("password"): str,
Optional("push_endpoint"): {
Required("url"): str,
Required("token"): str,
Optional("username"): str,
Required(CONF_HOTEL_ID): ensure_string,
Required(CONF_HOTEL_NAME): str,
Required(CONF_USERNAME): str,
Required(CONF_PASSWORD): str,
Optional(CONF_PUSH_ENDPOINT): {
Required(CONF_PUSH_URL): str,
Required(CONF_PUSH_TOKEN): str,
Optional(CONF_PUSH_USERNAME): str,
},
},
extra=PREVENT_EXTRA,
@@ -40,8 +84,10 @@ basic_auth_schema = Schema(All([hotel_auth_schema], Length(min=1)))
config_schema = Schema(
{
Required("database"): database_schema,
Required("alpine_bits_auth"): basic_auth_schema,
Required(CONF_DATABASE): database_schema,
Required(CONF_ALPINE_BITS_AUTH): basic_auth_schema,
Required(CONF_SERVER): server_info,
Required(CONF_LOGGING): logger_schema,
},
extra=PREVENT_EXTRA,
)
@@ -52,20 +98,18 @@ DEFAULT_CONFIG_FILE = "config.yaml"
class Config:
def __init__(
self,
config_folder: str | Path = None,
config_folder: str | Path | None = None,
config_name: str = DEFAULT_CONFIG_FILE,
testing_mode: bool = False,
):
if config_folder is None:
config_folder = os.environ.get("ALPINEBITS_CONFIG_DIR")
config_folder = os.environ.get(ENV_ALPINE_BITS_CONFIG_PATH)
if not config_folder:
config_folder = os.path.abspath(
os.path.join(os.path.dirname(__file__), "../../config")
)
config_folder = Path(__file__).parent.joinpath("../../config").resolve()
if isinstance(config_folder, str):
config_folder = Path(config_folder)
self.config_folder = config_folder
self.config_path = os.path.join(config_folder, config_name)
self.config_path = config_folder / config_name
self.secrets = Secrets(config_folder)
self.testing_mode = testing_mode
self._load_config()

View File

@@ -0,0 +1,34 @@
from typing import Final
RESERVATION_ID_TYPE: str = (
"13" # Default reservation ID type for Reservation. 14 would be cancellation
)
CONF_LOGGING: Final[str] = "logger"
CONF_LOGGING_LEVEL: Final[str] = "level"
CONF_LOGGING_FILE: Final[str] = "file"
CONF_DATABASE: Final[str] = "database"
CONF_SERVER: Final[str] = "server"
CONF_SERVER_CODECONTEXT: Final[str] = "codecontext"
CONF_SERVER_CODE: Final[str] = "code"
CONF_SERVER_COMPANYNAME: Final[str] = "companyname"
CONF_SERVER_RES_ID_SOURCE_CONTEXT: Final[str] = "res_id_source_context"
CONF_ALPINE_BITS_AUTH: Final[str] = "alpine_bits_auth"
CONF_HOTEL_ID: Final[str] = "hotel_id"
CONF_HOTEL_NAME: Final[str] = "hotel_name"
CONF_USERNAME: Final[str] = "username"
CONF_PASSWORD: Final[str] = "password"
CONF_PUSH_ENDPOINT: Final[str] = "push_endpoint"
CONF_PUSH_URL: Final[str] = "url"
CONF_PUSH_TOKEN: Final[str] = "token"
CONF_PUSH_USERNAME: Final[str] = "username"
ENV_ALPINE_BITS_CONFIG_PATH: Final[str] = "ALPINE_BITS_CONFIG_DIR"

View File

@@ -585,8 +585,7 @@ class TextTextFormat2(Enum):
class TimeUnitType(Enum):
"""Defines the unit in which the time is expressed (e.g. year, day, hour).
"""
"""Defines the unit in which the time is expressed (e.g. year, day, hour)."""
YEAR = "Year"
MONTH = "Month"

View File

@@ -0,0 +1,87 @@
"""Centralized logging configuration for AlpineBits application.
This module sets up logging based on config and provides a function to get
loggers from anywhere in the application.
"""
import logging
import sys
from pathlib import Path
def setup_logging(config: dict | None = None):
"""Configure logging based on application config.
Args:
config: Application configuration dict with optional 'logger' section
Logger config format:
logger:
level: "INFO" # DEBUG, INFO, WARNING, ERROR, CRITICAL
file: "alpinebits.log" # Optional, logs to console if not provided
"""
if config is None:
config = {}
logger_config = config.get("logger", {})
level = logger_config.get("level", "INFO").upper()
log_file = logger_config.get("file")
# Convert string level to logging constant
numeric_level = getattr(logging, level, logging.INFO)
# Create formatter with timestamp
formatter = logging.Formatter(
fmt="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# Get root logger
root_logger = logging.getLogger()
root_logger.setLevel(numeric_level)
# Remove existing handlers to avoid duplicates
root_logger.handlers.clear()
# Console handler (always add this)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(numeric_level)
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
# File handler (optional)
if log_file:
log_path = Path(log_file)
# Create logs directory if it doesn't exist
if log_path.parent != Path():
log_path.parent.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(log_file, encoding="utf-8")
file_handler.setLevel(numeric_level)
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
root_logger.info("Logging to file: %s", log_file)
root_logger.info("Logging configured at %s level", level)
def get_logger(name: str) -> logging.Logger:
"""Get a logger instance for the given module name.
Usage:
from alpine_bits_python.logging_config import get_logger
_LOGGER = get_logger(__name__)
_LOGGER.info("Something happened")
Args:
name: Usually __name__ from the calling module
Returns:
Configured logger instance
"""
return logging.getLogger(name)

View File

@@ -1,366 +0,0 @@
import asyncio
import json
import logging
import os
from datetime import UTC, date, datetime
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from .alpine_bits_helpers import (
AlpineBitsFactory,
CommentData,
CommentListItemData,
CommentsData,
CustomerData,
GuestCountsFactory,
HotelReservationIdData,
OtaMessageType,
PhoneTechType,
)
from .config_loader import load_config
# DB and config
from .db import (
Base,
get_database_url,
)
from .db import (
Customer as DBCustomer,
)
from .db import (
Reservation as DBReservation,
)
from .generated import alpinebits as ab
# Configure logging
logging.basicConfig(level=logging.INFO)
_LOGGER = logging.getLogger(__name__)
async def setup_db(config):
DATABASE_URL = get_database_url(config)
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
# Create tables
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
_LOGGER.info("Database tables checked/created at startup.")
return engine, AsyncSessionLocal
async def main():
print("🚀 Starting AlpineBits XML generation script...")
# Load config (yaml, annotatedyaml)
config = load_config()
# print config for debugging
print("Loaded configuration:")
print(json.dumps(config, indent=2))
# Ensure SQLite DB file exists if using SQLite
db_url = config.get("database", {}).get("url", "")
if db_url.startswith("sqlite+aiosqlite:///"):
db_path = db_url.replace("sqlite+aiosqlite:///", "")
db_path = os.path.abspath(db_path)
db_dir = os.path.dirname(db_path)
if not os.path.exists(db_dir):
os.makedirs(db_dir, exist_ok=True)
# for now we delete the existing DB for clean testing
if os.path.exists(db_path):
os.remove(db_path)
print(f"Deleted existing SQLite DB at {db_path} for clean testing.")
# # Ensure DB schema is created (async)
engine, AsyncSessionLocal = await setup_db(config)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async with AsyncSessionLocal() as db:
# Load data from JSON file
json_path = os.path.join(
os.path.dirname(__file__),
"../../test_data/wix_test_data_20250928_132611.json",
)
with open(json_path, encoding="utf-8") as f:
wix_data = json.load(f)
data = wix_data["data"]["data"]
contact_info = data.get("contact", {})
first_name = contact_info.get("name", {}).get("first")
last_name = contact_info.get("name", {}).get("last")
email = contact_info.get("email")
phone_number = contact_info.get("phones", [{}])[0].get("e164Phone")
locale = contact_info.get("locale", "de-de")
contact_id = contact_info.get("contactId")
name_prefix = data.get("field:anrede")
email_newsletter = data.get("field:form_field_5a7b", "") != "Non selezionato"
address_line = None
city_name = None
postal_code = None
country_code = None
gender = None
birth_date = None
language = data.get("contact", {}).get("locale", "en")[:2]
# Dates
start_date = (
data.get("field:date_picker_a7c8")
or data.get("Anreisedatum")
or data.get("submissions", [{}])[1].get("value")
)
end_date = (
data.get("field:date_picker_7e65")
or data.get("Abreisedatum")
or data.get("submissions", [{}])[2].get("value")
)
# Room/guest info
num_adults = int(data.get("field:number_7cf5") or 2)
num_children = int(data.get("field:anzahl_kinder") or 0)
children_ages = []
if num_children > 0:
for k in data.keys():
if k.startswith("field:alter_kind_"):
try:
age = int(data[k])
children_ages.append(age)
except ValueError:
logging.warning(f"Invalid age value for {k}: {data[k]}")
# UTM and offer
utm_fields = [
("utm_Source", "utm_source"),
("utm_Medium", "utm_medium"),
("utm_Campaign", "utm_campaign"),
("utm_Term", "utm_term"),
("utm_Content", "utm_content"),
]
utm_comment_text = []
for label, field in utm_fields:
val = data.get(f"field:{field}") or data.get(label)
if val:
utm_comment_text.append(f"{label}: {val}")
utm_comment = " | ".join(utm_comment_text) if utm_comment_text else None
offer = data.get("field:angebot_auswaehlen")
# Save all relevant data to DB (including new fields)
db_customer = DBCustomer(
given_name=first_name,
surname=last_name,
contact_id=contact_id,
name_prefix=name_prefix,
email_address=email,
phone=phone_number,
email_newsletter=email_newsletter,
address_line=address_line,
city_name=city_name,
postal_code=postal_code,
country_code=country_code,
gender=gender,
birth_date=birth_date,
language=language,
address_catalog=False,
name_title=None,
)
db.add(db_customer)
await db.commit()
await db.refresh(db_customer)
db_reservation = DBReservation(
customer_id=db_customer.id,
form_id=data.get("submissionId"),
start_date=date.fromisoformat(start_date) if start_date else None,
end_date=date.fromisoformat(end_date) if end_date else None,
num_adults=num_adults,
num_children=num_children,
children_ages=",".join(str(a) for a in children_ages),
offer=offer,
utm_comment=utm_comment,
created_at=datetime.now(UTC),
utm_source=data.get("field:utm_source"),
utm_medium=data.get("field:utm_medium"),
utm_campaign=data.get("field:utm_campaign"),
utm_term=data.get("field:utm_term"),
utm_content=data.get("field:utm_content"),
user_comment=data.get("field:long_answer_3524", ""),
fbclid=data.get("field:fbclid"),
gclid=data.get("field:gclid"),
hotel_code="123",
hotel_name="Frangart Inn",
)
db.add(db_reservation)
await db.commit()
await db.refresh(db_reservation)
# Now read back from DB
customer = await db.get(DBCustomer, db_reservation.customer_id)
reservation = await db.get(DBReservation, db_reservation.id)
# Generate XML from DB data
create_xml_from_db(customer, reservation)
await db.close()
def create_xml_from_db(customer: DBCustomer, reservation: DBReservation):
# Prepare data for XML
phone_numbers = [(customer.phone, PhoneTechType.MOBILE)] if customer.phone else []
customer_data = CustomerData(
given_name=customer.given_name,
surname=customer.surname,
name_prefix=customer.name_prefix,
name_title=customer.name_title,
phone_numbers=phone_numbers,
email_address=customer.email_address,
email_newsletter=customer.email_newsletter,
address_line=customer.address_line,
city_name=customer.city_name,
postal_code=customer.postal_code,
country_code=customer.country_code,
address_catalog=customer.address_catalog,
gender=customer.gender,
birth_date=customer.birth_date,
language=customer.language,
)
alpine_bits_factory = AlpineBitsFactory()
res_guests = alpine_bits_factory.create_res_guests(
customer_data, OtaMessageType.RETRIEVE
)
# Guest counts
children_ages = [int(a) for a in reservation.children_ages.split(",") if a]
guest_counts = GuestCountsFactory.create_retrieve_guest_counts(
reservation.num_adults, children_ages
)
# UniqueID
unique_id = ab.OtaResRetrieveRs.ReservationsList.HotelReservation.UniqueId(
type_value=ab.UniqueIdType2.VALUE_14, id=reservation.unique_id
)
# TimeSpan
time_span = ab.OtaResRetrieveRs.ReservationsList.HotelReservation.RoomStays.RoomStay.TimeSpan(
start=reservation.start_date.isoformat() if reservation.start_date else None,
end=reservation.end_date.isoformat() if reservation.end_date else None,
)
room_stay = (
ab.OtaResRetrieveRs.ReservationsList.HotelReservation.RoomStays.RoomStay(
time_span=time_span,
guest_counts=guest_counts,
)
)
room_stays = ab.OtaResRetrieveRs.ReservationsList.HotelReservation.RoomStays(
room_stay=[room_stay],
)
# HotelReservationId
hotel_res_id_data = HotelReservationIdData(
res_id_type="13",
res_id_value=reservation.fbclid or reservation.gclid,
res_id_source=None,
res_id_source_context="99tales",
)
hotel_res_id = alpine_bits_factory.create(
hotel_res_id_data, OtaMessageType.RETRIEVE
)
hotel_res_ids = ab.OtaResRetrieveRs.ReservationsList.HotelReservation.ResGlobalInfo.HotelReservationIds(
hotel_reservation_id=[hotel_res_id]
)
basic_property_info = ab.OtaResRetrieveRs.ReservationsList.HotelReservation.ResGlobalInfo.BasicPropertyInfo(
hotel_code=reservation.hotel_code,
hotel_name=reservation.hotel_name,
)
# Comments
offer_comment = CommentData(
name=ab.CommentName2.ADDITIONAL_INFO,
text="Angebot/Offerta",
list_items=[
CommentListItemData(
value=reservation.offer,
language=customer.language,
list_item="1",
)
],
)
comment = None
if reservation.user_comment:
comment = CommentData(
name=ab.CommentName2.CUSTOMER_COMMENT,
text=reservation.user_comment,
list_items=[
CommentListItemData(
value="Landing page comment",
language=customer.language,
list_item="1",
)
],
)
comments = [offer_comment, comment] if comment else [offer_comment]
comments_data = CommentsData(comments=comments)
comments_xml = alpine_bits_factory.create(comments_data, OtaMessageType.RETRIEVE)
res_global_info = (
ab.OtaResRetrieveRs.ReservationsList.HotelReservation.ResGlobalInfo(
hotel_reservation_ids=hotel_res_ids,
basic_property_info=basic_property_info,
comments=comments_xml,
)
)
hotel_reservation = ab.OtaResRetrieveRs.ReservationsList.HotelReservation(
create_date_time=datetime.now(UTC).isoformat(),
res_status=ab.HotelReservationResStatus.REQUESTED,
room_stay_reservation="true",
unique_id=unique_id,
room_stays=room_stays,
res_guests=res_guests,
res_global_info=res_global_info,
)
reservations_list = ab.OtaResRetrieveRs.ReservationsList(
hotel_reservation=[hotel_reservation]
)
ota_res_retrieve_rs = ab.OtaResRetrieveRs(
version="7.000", success=None, reservations_list=reservations_list
)
# Serialize to XML
try:
ota_res_retrieve_rs.model_validate(ota_res_retrieve_rs.model_dump())
print("✅ Pydantic validation successful!")
from xsdata.formats.dataclass.serializers.config import SerializerConfig
from xsdata_pydantic.bindings import XmlSerializer
config = SerializerConfig(
pretty_print=True, xml_declaration=True, encoding="UTF-8"
)
serializer = XmlSerializer(config=config)
ns_map = {None: "http://www.opentravel.org/OTA/2003/05"}
xml_string = serializer.render(ota_res_retrieve_rs, ns_map=ns_map)
with open("output.xml", "w", encoding="utf-8") as outfile:
outfile.write(xml_string)
print("✅ XML serialization successful!")
print("Generated XML written to output.xml")
print("\n📄 Generated XML:")
print(xml_string)
from xsdata_pydantic.bindings import XmlParser
parser = XmlParser()
with open("output.xml", encoding="utf-8") as infile:
xml_content = infile.read()
parsed_result = parser.from_string(xml_content, ab.OtaResRetrieveRs)
print("✅ Round-trip validation successful!")
print(
f"Parsed reservation status: {parsed_result.reservations_list.hotel_reservation[0].res_status}"
)
except Exception as e:
print(f"❌ Validation/Serialization failed: {e}")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,73 +0,0 @@
from typing import Any
from pydantic import BaseModel, Field
class AlpineBitsHandshakeRequest(BaseModel):
"""Model for AlpineBits handshake request data"""
action: str = Field(
..., description="Action parameter, typically 'OTA_Ping:Handshaking'"
)
request_xml: str | None = Field(None, description="XML request document")
class ContactName(BaseModel):
"""Contact name structure"""
first: str | None = None
last: str | None = None
class ContactAddress(BaseModel):
"""Contact address structure"""
street: str | None = None
city: str | None = None
state: str | None = None
country: str | None = None
postalCode: str | None = None
class Contact(BaseModel):
"""Contact information from Wix form"""
name: ContactName | None = None
email: str | None = None
locale: str | None = None
company: str | None = None
birthdate: str | None = None
labelKeys: dict[str, Any] | None = None
contactId: str | None = None
address: ContactAddress | None = None
jobTitle: str | None = None
imageUrl: str | None = None
updatedDate: str | None = None
phone: str | None = None
createdDate: str | None = None
class SubmissionPdf(BaseModel):
"""PDF submission structure"""
url: str | None = None
filename: str | None = None
class WixFormSubmission(BaseModel):
"""Model for Wix form submission data"""
formName: str
submissions: list[dict[str, Any]] = Field(default_factory=list)
submissionTime: str
formFieldMask: list[str] = Field(default_factory=list)
submissionId: str
contactId: str
submissionsLink: str
submissionPdf: SubmissionPdf | None = None
formId: str
contact: Contact | None = None
# Dynamic form fields - these will capture all field:* entries
class Config:
extra = "allow" # Allow additional fields not defined in the model

View File

@@ -19,8 +19,7 @@ REDIS_URL = os.getenv("REDIS_URL", None)
def get_remote_address_with_forwarded(request: Request):
"""Get client IP address, considering forwarded headers from proxies/load balancers
"""
"""Get client IP address, considering forwarded headers from proxies/load balancers."""
# Check for forwarded headers (common in production behind proxies)
forwarded_for = request.headers.get("X-Forwarded-For")
if forwarded_for:
@@ -59,7 +58,7 @@ else:
def get_api_key_identifier(request: Request) -> str:
"""Get identifier for rate limiting based on API key if available, otherwise IP
This allows different rate limits per API key
This allows different rate limits per API key.
"""
# Try to get API key from Authorization header
auth_header = request.headers.get("Authorization")
@@ -85,7 +84,7 @@ webhook_limiter = Limiter(
# Custom rate limit exceeded handler
def custom_rate_limit_handler(request: Request, exc: RateLimitExceeded):
"""Custom handler for rate limit exceeded"""
"""Custom handler for rate limit exceeded."""
logger.warning(
f"Rate limit exceeded for {get_remote_address_with_forwarded(request)}: "
f"{exc.detail}"

View File

@@ -1,2 +0,0 @@
def parse_form(form: dict):
pass

View File

@@ -1,6 +1,5 @@
#!/usr/bin/env python3
"""Startup script for the Wix Form Handler API
"""
"""Startup script for the Wix Form Handler API."""
import os
@@ -10,7 +9,6 @@ if __name__ == "__main__":
db_path = "alpinebits.db" # Adjust path if needed
if os.path.exists(db_path):
os.remove(db_path)
print(f"Deleted database file: {db_path}")
uvicorn.run(
"alpine_bits_python.api:app",

View File

@@ -1,44 +0,0 @@
from xsdata_pydantic.bindings import XmlParser
from ..generated.alpinebits import OtaPingRs
def main():
# test parsing a ping request sample
path = (
"AlpineBits-HotelData-2024-10/files/samples/Handshake/Handshake-OTA_PingRS.xml"
)
with open(path, encoding="utf-8") as f:
xml = f.read()
# Parse the XML into the request object
# Test parsing back
parser = XmlParser()
parsed_result = parser.from_string(xml, OtaPingRs)
print(parsed_result.echo_data)
warning = parsed_result.warnings.warning[0]
print(warning.type_value)
print(type(warning.content))
print(warning.content[0])
# save json in echo_data to file with indents
output_path = "echo_data_response.json"
with open(output_path, "w", encoding="utf-8") as out_f:
import json
json.dump(json.loads(parsed_result.echo_data), out_f, indent=4)
print(f"Saved echo_data json to {output_path}")
if __name__ == "__main__":
main()

View File

@@ -1,6 +1,5 @@
#!/usr/bin/env python3
"""Convenience launcher for the Wix Form Handler API
"""
"""Convenience launcher for the Wix Form Handler API."""
import os
import subprocess

View File

@@ -1,6 +1,5 @@
#!/usr/bin/env python3
"""Test the handshake functionality with the real AlpineBits sample file.
"""
"""Test the handshake functionality with the real AlpineBits sample file."""
import asyncio
@@ -8,8 +7,6 @@ from alpine_bits_python.alpinebits_server import AlpineBitsServer
async def main():
print("🔄 Testing AlpineBits Handshake with Sample File")
print("=" * 60)
# Create server instance
server = AlpineBitsServer()
@@ -20,15 +17,12 @@ async def main():
) as f:
ping_request_xml = f.read()
print("📤 Sending handshake request...")
# Handle the ping request
response = await server.handle_request(
await server.handle_request(
"OTA_Ping:Handshaking", ping_request_xml, "2024-10"
)
print(f"\n📥 Response Status: {response.status_code}")
print(f"📄 Response XML:\n{response.xml_content}")
if __name__ == "__main__":

View File

@@ -1,4 +1,3 @@
import pytest
from alpine_bits_python.alpine_bits_helpers import (

View File

@@ -191,6 +191,12 @@ def read_request_xml_no_date_filter():
def test_config():
"""Test configuration with hotel credentials."""
return {
"server": {
"codecontext": "ADVERTISING",
"code": "70597314",
"companyname": "99tales Gmbh",
"res_id_source_context": "99tales",
},
"alpine_bits_auth": [
{
"hotel_id": "HOTEL123",
@@ -198,7 +204,7 @@ def test_config():
"username": "testuser",
"password": "testpass",
}
]
],
}
@@ -215,9 +221,9 @@ def client_info():
class TestCreateResRetrieveResponse:
"""Test the create_res_retrieve_response function."""
def test_empty_list(self):
def test_empty_list(self, test_config):
"""Test creating response with empty reservation list."""
response = create_res_retrieve_response([])
response = create_res_retrieve_response([], config=test_config)
assert response is not None, "Response should not be None"
@@ -232,10 +238,10 @@ class TestCreateResRetrieveResponse:
"Response should have reservations_list attribute"
)
def test_single_reservation(self, sample_reservation, sample_customer):
def test_single_reservation(self, sample_reservation, sample_customer, test_config):
"""Test creating response with single reservation."""
reservation_pairs = [(sample_reservation, sample_customer)]
response = create_res_retrieve_response(reservation_pairs)
response = create_res_retrieve_response(reservation_pairs, config=test_config)
assert response is not None
assert hasattr(response, "reservations_list"), (
@@ -273,13 +279,14 @@ class TestCreateResRetrieveResponse:
sample_customer,
minimal_reservation,
minimal_customer,
test_config,
):
"""Test creating response with multiple reservations."""
reservation_pairs = [
(sample_reservation, sample_customer),
(minimal_reservation, minimal_customer),
]
response = create_res_retrieve_response(reservation_pairs)
response = create_res_retrieve_response(reservation_pairs, config=test_config)
assert response is not None
@@ -297,13 +304,15 @@ class TestCreateResRetrieveResponse:
assert "John" in xml_output
assert "Jane" in xml_output
def test_reservation_with_children(self, sample_reservation, sample_customer):
def test_reservation_with_children(
self, sample_reservation, sample_customer, test_config
):
"""Test reservation with children ages."""
sample_reservation.num_children = 2
sample_reservation.children_ages = "8,5"
reservation_pairs = [(sample_reservation, sample_customer)]
response = create_res_retrieve_response(reservation_pairs)
response = create_res_retrieve_response(reservation_pairs, config=test_config)
config = SerializerConfig(pretty_print=True)
serializer = XmlSerializer(config=config)
@@ -348,10 +357,11 @@ class TestXMLParsing:
self,
sample_reservation,
sample_customer,
test_config,
):
"""Test serialization of retrieve response to XML."""
reservation_pairs = [(sample_reservation, sample_customer)]
response = create_res_retrieve_response(reservation_pairs)
response = create_res_retrieve_response(reservation_pairs, config=test_config)
config = SerializerConfig(
pretty_print=True, xml_declaration=True, encoding="UTF-8"
@@ -378,7 +388,7 @@ class TestXMLParsing:
class TestEdgeCases:
"""Test edge cases and error conditions."""
def test_customer_with_special_characters(self):
def test_customer_with_special_characters(self, test_config):
"""Test customer with special characters in name."""
customer = Customer(
id=99,
@@ -400,7 +410,7 @@ class TestEdgeCases:
)
reservation_pairs = [(reservation, customer)]
response = create_res_retrieve_response(reservation_pairs)
response = create_res_retrieve_response(reservation_pairs, config=test_config)
config = SerializerConfig(pretty_print=True, encoding="UTF-8")
serializer = XmlSerializer(config=config)
@@ -411,7 +421,7 @@ class TestEdgeCases:
assert response is not None
assert xml_output is not None
def test_reservation_with_all_utm_parameters(self):
def test_reservation_with_all_utm_parameters(self, test_config):
"""Test reservation with all UTM tracking parameters."""
customer = Customer(
id=97,
@@ -444,11 +454,11 @@ class TestEdgeCases:
)
reservation_pairs = [(reservation_db, customer)]
response = create_res_retrieve_response(reservation_pairs)
response = create_res_retrieve_response(reservation_pairs, config=test_config)
config = SerializerConfig(pretty_print=True)
serializer = XmlSerializer(config=config)
xml_output = serializer.render(
serializer.render(
response, ns_map={None: "http://www.opentravel.org/OTA/2003/05"}
)
@@ -820,6 +830,123 @@ class TestAcknowledgments:
"Acknowledgment should not affect count when date filter is applied"
)
@pytest.mark.asyncio
async def test_same_customer_multiple_reservations(
self,
alpinebits_server,
test_db_session,
client_info,
sample_customer,
):
"""Test same customer with multiple reservations returns all."""
# Add the customer to the database
test_db_session.add(sample_customer)
await test_db_session.commit()
# Create two reservations for the same customer
first_reservation = ReservationData(
unique_id="RES-2024-MULTI-001",
start_date=date(2024, 12, 25),
end_date=date(2024, 12, 31),
num_adults=2,
num_children=0,
children_ages=[],
hotel_code="HOTEL123",
hotel_name="Alpine Paradise Resort",
created_at=datetime(2024, 11, 1, 12, 0, 0, tzinfo=UTC),
)
second_reservation = ReservationData(
unique_id="RES-2024-MULTI-002",
start_date=date(2025, 3, 15),
end_date=date(2025, 3, 20),
num_adults=2,
num_children=1,
children_ages=[10],
hotel_code="HOTEL123",
hotel_name="Alpine Paradise Resort",
created_at=datetime(2024, 11, 15, 10, 0, 0, tzinfo=UTC),
)
# Convert to DB reservations
first_data = first_reservation.model_dump(exclude_none=True)
children_list = first_data.pop("children_ages", [])
children_csv = (
",".join(str(int(a)) for a in children_list) if children_list else ""
)
first_data["children_ages"] = children_csv
db_first_reservation = Reservation(
id=100,
customer_id=sample_customer.id,
**first_data,
)
second_data = second_reservation.model_dump(exclude_none=True)
children_list = second_data.pop("children_ages", [])
children_csv = (
",".join(str(int(a)) for a in children_list) if children_list else ""
)
second_data["children_ages"] = children_csv
db_second_reservation = Reservation(
id=101,
customer_id=sample_customer.id,
**second_data,
)
# Add both reservations to the database
test_db_session.add(db_first_reservation)
test_db_session.add(db_second_reservation)
await test_db_session.commit()
# Send read request
read_xml = """<?xml version="1.0" encoding="UTF-8"?>
<OTA_ReadRQ xmlns="http://www.opentravel.org/OTA/2003/05"
EchoToken="12345"
TimeStamp="2024-10-07T10:00:00"
Version="8.000">
<ReadRequests>
<HotelReadRequest HotelCode="HOTEL123" HotelName="Alpine Paradise Resort"/>
</ReadRequests>
</OTA_ReadRQ>"""
response = await alpinebits_server.handle_request(
request_action_name="OTA_Read:GuestRequests",
request_xml=read_xml,
client_info=client_info,
version="2024-10",
dbsession=test_db_session,
)
assert response is not None
assert response.status_code == HTTP_OK
# Parse response to verify both reservations are returned
parser = XmlParser()
parsed_response = parser.from_string(response.xml_content, OtaResRetrieveRs)
assert parsed_response.reservations_list is not None
assert parsed_response.reservations_list.hotel_reservation is not None
reservation_count = len(parsed_response.reservations_list.hotel_reservation)
expected_reservations = 2
assert reservation_count == expected_reservations, (
"Should return 2 reservations for the same customer"
)
# Verify both reservations are present in the response
xml_content = response.xml_content
assert "John" in xml_content # Customer first name
assert "Doe" in xml_content # Customer last name
# Both reservations should be linked to the same customer
# Verify this by checking that customer appears in both reservation contexts
min_customer_name_occurrences = 2
assert xml_content.count("John") >= min_customer_name_occurrences, (
"Customer name should appear for each reservation"
)
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -11,8 +11,7 @@ def extract_relevant_sections(xml_string):
# Remove version attribute value, keep only presence
# Use the same XmlParser as AlpineBitsServer
parser = XmlParser()
obj = parser.from_string(xml_string, OtaPingRs)
return obj
return parser.from_string(xml_string, OtaPingRs)
@pytest.mark.asyncio

723
tests/test_api.py Normal file
View File

@@ -0,0 +1,723 @@
"""Tests for API endpoints using FastAPI TestClient.
This module tests all FastAPI endpoints including:
- Health check endpoints
- Wix webhook endpoints
- AlpineBits server endpoint
- XML upload endpoint
- Authentication
- Rate limiting
"""
import base64
import gzip
import uuid
from pathlib import Path
from unittest.mock import patch
import pytest
import pytest_asyncio
from fastapi.testclient import TestClient
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from alpine_bits_python.api import app
from alpine_bits_python.db import Base, Customer, Reservation
@pytest_asyncio.fixture
async def test_db_engine():
"""Create an in-memory SQLite database for testing."""
engine = create_async_engine(
"sqlite+aiosqlite:///:memory:",
echo=False,
)
# Create tables
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
# Cleanup
await engine.dispose()
@pytest_asyncio.fixture
async def test_db_session(test_db_engine):
"""Create a test database session."""
async_session = async_sessionmaker(
test_db_engine,
class_=AsyncSession,
expire_on_commit=False,
)
async with async_session() as session:
yield session
@pytest.fixture
def test_config():
"""Test configuration."""
return {
"server": {
"codecontext": "ADVERTISING",
"code": "70597314",
"companyname": "99tales Gmbh",
"res_id_source_context": "99tales",
},
"alpine_bits_auth": [
{
"hotel_id": "HOTEL123",
"hotel_name": "Test Hotel",
"username": "testuser",
"password": "testpass",
}
],
"default_hotel_code": "HOTEL123",
"default_hotel_name": "Test Hotel",
"database": {"url": "sqlite+aiosqlite:///:memory:"},
}
@pytest.fixture
def client(test_config):
"""Create a test client with mocked dependencies.
Each test gets a fresh TestClient instance to avoid database conflicts.
Mocks load_config to return test_config instead of production config.
"""
# Import locally to avoid circular imports
from alpine_bits_python.alpinebits_server import AlpineBitsServer # noqa: PLC0415
# Mock load_config to return test_config instead of production config
with patch("alpine_bits_python.api.load_config", return_value=test_config):
# Create a new in-memory database for each test
engine = create_async_engine(
"sqlite+aiosqlite:///:memory:",
echo=False,
)
# Setup app state (will be overridden by lifespan but we set it anyway)
app.state.engine = engine
app.state.async_sessionmaker = async_sessionmaker(
engine, expire_on_commit=False
)
app.state.config = test_config
app.state.alpine_bits_server = AlpineBitsServer(test_config)
# TestClient will trigger lifespan events which create the tables
# The mocked load_config will ensure test_config is used
with TestClient(app) as test_client:
yield test_client
@pytest.fixture
def sample_wix_form_data():
"""Sample Wix form submission data.
Each call generates unique IDs to avoid database conflicts.
"""
unique_id = uuid.uuid4().hex[:8]
return {
"data": {
"submissionId": f"test-submission-{unique_id}",
"submissionTime": "2025-10-07T05:48:41.855Z",
"contact": {
"name": {"first": "John", "last": "Doe"},
"email": f"john.doe.{unique_id}@example.com",
"phones": [{"e164Phone": "+1234567890"}],
"locale": "en-US",
"contactId": f"contact-{unique_id}",
},
"field:anrede": "Mr.",
"field:form_field_5a7b": "Checked",
"field:date_picker_a7c8": "2024-12-25",
"field:date_picker_7e65": "2024-12-31",
"field:number_7cf5": "2",
"field:anzahl_kinder": "1",
"field:alter_kind_1": "8",
"field:angebot_auswaehlen": "Christmas Special",
"field:utm_source": "google",
"field:utm_medium": "cpc",
"field:utm_campaign": "winter2024",
"field:fbclid": "test_fbclid_123",
"field:long_answer_3524": "Late check-in please",
}
}
@pytest.fixture
def basic_auth_headers():
"""Create Basic Auth headers for testing."""
credentials = base64.b64encode(b"testuser:testpass").decode("utf-8")
return {"Authorization": f"Basic {credentials}"}
class TestHealthEndpoints:
"""Test health check and root endpoints."""
def test_root_endpoint(self, client):
"""Test GET / returns health status."""
response = client.get("/api/")
assert response.status_code == 200
data = response.json()
assert data["message"] == "Wix Form Handler API is running"
assert "timestamp" in data
assert data["status"] == "healthy"
assert "rate_limits" in data
def test_health_check_endpoint(self, client):
"""Test GET /api/health returns healthy status."""
response = client.get("/api/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "healthy"
assert data["service"] == "wix-form-handler"
assert data["version"] == "1.0.0"
assert "timestamp" in data
def test_landing_page(self, client):
"""Test GET / (landing page) returns HTML."""
response = client.get("/")
assert response.status_code == 200
assert "text/html" in response.headers["content-type"]
assert "99tales" in response.text or "Construction" in response.text
class TestWixWebhookEndpoint:
"""Test Wix form webhook endpoint."""
def test_wix_webhook_success(self, client, sample_wix_form_data):
"""Test successful Wix form submission."""
response = client.post("/api/webhook/wix-form", json=sample_wix_form_data)
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
assert "timestamp" in data
assert "data_logged_to" in data
def test_wix_webhook_creates_customer_and_reservation(
self, client, sample_wix_form_data
):
"""Test that webhook creates customer and reservation in database."""
response = client.post("/api/webhook/wix-form", json=sample_wix_form_data)
assert response.status_code == 200
# Verify data was saved to database
# Use the client's app state engine, not a separate test_db_engine
async def check_db():
engine = client.app.state.engine
async_session = async_sessionmaker(engine, expire_on_commit=False)
async with async_session() as session:
from sqlalchemy import select
# Check customer was created
result = await session.execute(select(Customer))
customers = result.scalars().all()
assert len(customers) == 1
customer = customers[0]
assert customer.given_name == "John"
assert customer.surname == "Doe"
# Email address in sample_wix_form_data has unique ID appended
assert customer.email_address.startswith("john.doe.")
assert "@example.com" in customer.email_address
# Check reservation was created
result = await session.execute(select(Reservation))
reservations = result.scalars().all()
assert len(reservations) == 1
reservation = reservations[0]
assert reservation.customer_id == customer.id
assert reservation.num_adults == 2
assert reservation.num_children == 1
import asyncio
asyncio.run(check_db())
def test_wix_webhook_minimal_data(self, client):
"""Test webhook with minimal required data."""
minimal_data = {
"data": {
"submissionId": "minimal-123",
"submissionTime": "2025-01-10T12:00:00.000Z",
"contact": {
"name": {"first": "Jane", "last": "Smith"},
"email": "jane@example.com",
},
"field:date_picker_a7c8": "2025-01-15",
"field:date_picker_7e65": "2025-01-20",
}
}
response = client.post("/api/webhook/wix-form", json=minimal_data)
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
def test_wix_webhook_test_endpoint(self, client, sample_wix_form_data):
"""Test the test endpoint works identically."""
response = client.post("/api/webhook/wix-form/test", json=sample_wix_form_data)
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
def test_wix_webhook_updates_existing_customer(self, client):
"""Test that same contact_id updates customer instead of duplicate."""
# First submission
first_submission = {
"data": {
"submissionId": "test-submission-001",
"submissionTime": "2025-10-07T05:48:41.855Z",
"contact": {
"name": {"first": "John", "last": "Doe"},
"email": "john.doe@example.com",
"phones": [{"e164Phone": "+1234567890"}],
"locale": "en-US",
"contactId": "fixed-contact-id-123",
},
"field:anrede": "Mr.",
"field:date_picker_a7c8": "2024-12-25",
"field:date_picker_7e65": "2024-12-31",
"field:number_7cf5": "2",
"field:anzahl_kinder": "0",
}
}
response = client.post("/api/webhook/wix-form", json=first_submission)
assert response.status_code == 200
# Second submission with same contact_id but different data
second_submission = {
"data": {
"submissionId": "test-submission-002",
"submissionTime": "2025-10-08T10:30:00.000Z",
"contact": {
"name": {"first": "John", "last": "Smith"}, # Changed last name
"email": "john.smith@example.com", # Changed email
"phones": [{"e164Phone": "+9876543210"}], # Changed phone
"locale": "de-DE", # Changed locale
"contactId": "fixed-contact-id-123", # Same contact_id
},
"field:anrede": "Dr.", # Changed prefix
"field:date_picker_a7c8": "2025-01-10",
"field:date_picker_7e65": "2025-01-15",
"field:number_7cf5": "4",
"field:anzahl_kinder": "2",
"field:alter_kind_1": "5",
"field:alter_kind_2": "10",
}
}
response = client.post("/api/webhook/wix-form", json=second_submission)
assert response.status_code == 200
# Verify only one customer exists with updated information
async def check_db():
from sqlalchemy import select # noqa: PLC0415
engine = client.app.state.engine
async_session = async_sessionmaker(engine, expire_on_commit=False)
async with async_session() as session:
# Check only one customer exists
result = await session.execute(select(Customer))
customers = result.scalars().all()
assert len(customers) == 1, "Should have exactly one customer"
customer = customers[0]
# Verify customer was updated with new information
assert customer.given_name == "John"
assert customer.surname == "Smith", "Last name updated"
assert (
customer.email_address == "john.smith@example.com"
), "Email updated"
assert customer.phone == "+9876543210", "Phone updated"
assert customer.name_prefix == "Dr.", "Prefix updated"
assert customer.language == "de", "Language updated"
assert customer.contact_id == "fixed-contact-id-123"
# Check both reservations were created
result = await session.execute(select(Reservation))
reservations = result.scalars().all()
expected_reservations = 2
assert len(reservations) == expected_reservations
# Both reservations should be linked to the same customer
assert all(r.customer_id == customer.id for r in reservations)
import asyncio # noqa: PLC0415
asyncio.run(check_db())
class TestGenericWebhookEndpoint:
"""Test generic webhook endpoint."""
def test_generic_webhook_success(self, client):
"""Test successful generic webhook submission."""
test_data = {
"event_type": "test_event",
"data": {
"key1": "value1",
"key2": "value2",
"nested": {"foo": "bar"},
},
"metadata": {"source": "test_system"},
}
response = client.post("/api/webhook/generic", json=test_data)
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
assert "timestamp" in data
assert "data_logged_to" in data
assert "generic_webhooks" in data["data_logged_to"]
assert data["note"] == "Data logged for later analysis"
def test_generic_webhook_empty_payload(self, client):
"""Test generic webhook with empty payload."""
response = client.post("/api/webhook/generic", json={})
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
def test_generic_webhook_complex_nested_data(self, client):
"""Test generic webhook with complex nested data structures."""
complex_data = {
"arrays": [1, 2, 3],
"nested": {"level1": {"level2": {"level3": "deep"}}},
"mixed": [{"a": 1}, {"b": 2}],
}
response = client.post("/api/webhook/generic", json=complex_data)
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
class TestAlpineBitsServerEndpoint:
"""Test AlpineBits server endpoint."""
def test_alpinebits_handshake_ping_success(self, client, basic_auth_headers):
"""Test AlpineBits handshake with OTA_Ping action using real test data."""
# Use the actual test data file with proper AlpineBits handshake format
with Path("tests/test_data/Handshake-OTA_PingRQ.xml").open(
encoding="utf-8"
) as f:
ping_xml = f.read()
# Prepare multipart form data
form_data = {"action": "OTA_Ping:Handshaking", "request": ping_xml}
headers = {
**basic_auth_headers,
"X-AlpineBits-ClientProtocolVersion": "2024-10",
"X-AlpineBits-ClientID": "TEST-CLIENT-001",
}
response = client.post(
"/api/alpinebits/server-2024-10",
data=form_data,
headers=headers,
)
assert response.status_code == 200
assert "OTA_PingRS" in response.text
assert "application/xml" in response.headers["content-type"]
assert "X-AlpineBits-Server-Version" in response.headers
def test_alpinebits_missing_auth(self, client):
"""Test AlpineBits endpoint without authentication."""
form_data = {"action": "OTA_Ping:Handshaking", "request": "<xml/>"}
response = client.post("/api/alpinebits/server-2024-10", data=form_data)
assert response.status_code == 401
def test_alpinebits_invalid_credentials(self, client):
"""Test AlpineBits endpoint with invalid credentials."""
credentials = base64.b64encode(b"wrong:credentials").decode("utf-8")
headers = {"Authorization": f"Basic {credentials}"}
form_data = {"action": "OTA_Ping:Handshaking", "request": "<xml/>"}
response = client.post(
"/api/alpinebits/server-2024-10", data=form_data, headers=headers
)
assert response.status_code == 401
def test_alpinebits_missing_action(self, client, basic_auth_headers):
"""Test AlpineBits endpoint without action parameter."""
headers = {
**basic_auth_headers,
"X-AlpineBits-ClientProtocolVersion": "2024-10",
}
form_data = {"request": "<xml/>"}
response = client.post(
"/api/alpinebits/server-2024-10", data=form_data, headers=headers
)
assert response.status_code == 400
def test_alpinebits_gzip_compression(self, client, basic_auth_headers):
"""Test AlpineBits endpoint with gzip compressed request."""
# Use real test data
with open("tests/test_data/Handshake-OTA_PingRQ.xml", encoding="utf-8") as f:
ping_xml = f.read()
form_data = f"action=OTA_Ping:Handshaking&request={ping_xml}"
compressed_data = gzip.compress(form_data.encode("utf-8"))
headers = {
**basic_auth_headers,
"X-AlpineBits-ClientProtocolVersion": "2024-10",
"Content-Encoding": "gzip",
"Content-Type": "application/x-www-form-urlencoded",
}
response = client.post(
"/api/alpinebits/server-2024-10",
content=compressed_data,
headers=headers,
)
assert response.status_code == 200
assert "OTA_PingRS" in response.text
class TestXMLUploadEndpoint:
"""Test XML upload endpoint for conversions."""
def test_xml_upload_success(self, client, basic_auth_headers):
"""Test successful XML upload."""
xml_content = """<?xml version="1.0" encoding="UTF-8"?>
<OTA_HotelResNotifRQ xmlns="http://www.opentravel.org/OTA/2003/05">
<HotelReservations>
<HotelReservation>
<UniqueID Type="14" ID="TEST-123"/>
</HotelReservation>
</HotelReservations>
</OTA_HotelResNotifRQ>"""
response = client.put(
"/api/hoteldata/conversions_import/test_reservation.xml",
content=xml_content.encode("utf-8"),
headers={**basic_auth_headers, "Content-Type": "application/xml"},
)
assert response.status_code == 200
assert "Xml received" in response.text
def test_xml_upload_gzip_compressed(self, client, basic_auth_headers):
"""Test XML upload with gzip compression."""
xml_content = """<?xml version="1.0" encoding="UTF-8"?>
<OTA_HotelResNotifRQ xmlns="http://www.opentravel.org/OTA/2003/05">
<HotelReservations/>
</OTA_HotelResNotifRQ>"""
compressed = gzip.compress(xml_content.encode("utf-8"))
headers = {
**basic_auth_headers,
"Content-Type": "application/xml",
"Content-Encoding": "gzip",
}
response = client.put(
"/api/hoteldata/conversions_import/compressed.xml",
content=compressed,
headers=headers,
)
assert response.status_code == 200
def test_xml_upload_missing_auth(self, client):
"""Test XML upload without authentication."""
response = client.put(
"/api/hoteldata/conversions_import/test.xml",
content=b"<xml/>",
)
assert response.status_code == 401
def test_xml_upload_invalid_path(self, client, basic_auth_headers):
"""Test XML upload with path traversal attempt.
Path traversal is blocked by the server, resulting in 404 Not Found.
"""
response = client.put(
"/api/hoteldata/conversions_import/../../../etc/passwd",
content=b"<xml/>",
headers=basic_auth_headers,
)
# Path traversal results in 404 as the normalized path doesn't match the route
assert response.status_code == 404
def test_xml_upload_empty_content(self, client, basic_auth_headers):
"""Test XML upload with empty content."""
response = client.put(
"/api/hoteldata/conversions_import/empty.xml",
content=b"",
headers=basic_auth_headers,
)
assert response.status_code == 400
def test_xml_upload_non_xml_content(self, client, basic_auth_headers):
"""Test XML upload with non-XML content."""
response = client.put(
"/api/hoteldata/conversions_import/notxml.xml",
content=b"This is not XML content",
headers=basic_auth_headers,
)
assert response.status_code == 400
class TestAuthentication:
"""Test authentication and authorization."""
def test_basic_auth_success(self, client):
"""Test successful basic authentication."""
credentials = base64.b64encode(b"testuser:testpass").decode("utf-8")
headers = {"Authorization": f"Basic {credentials}"}
form_data = {"action": "OTA_Ping:Handshaking", "request": "<xml/>"}
response = client.post(
"/api/alpinebits/server-2024-10",
data=form_data,
headers={
**headers,
"X-AlpineBits-ClientProtocolVersion": "2024-10",
},
)
# Should not be 401
assert response.status_code != 401
def test_basic_auth_missing_credentials(self, client):
"""Test basic auth with missing credentials."""
response = client.post(
"/api/alpinebits/server-2024-10",
data={"action": "OTA_Ping:Handshaking"},
)
assert response.status_code == 401
def test_basic_auth_malformed_header(self, client):
"""Test basic auth with malformed Authorization header."""
headers = {"Authorization": "Basic malformed"}
response = client.post(
"/api/alpinebits/server-2024-10",
data={"action": "OTA_Ping:Handshaking"},
headers=headers,
)
# FastAPI should handle this gracefully
assert response.status_code in [401, 422]
class TestEventDispatcher:
"""Test event dispatcher and push notifications."""
def test_form_submission_triggers_event(
self, client, sample_wix_form_data
):
"""Test that form submission triggers event dispatcher."""
# Just verify the endpoint works with the event dispatcher
# The async task runs in background and doesn't affect response
response = client.post("/api/webhook/wix-form", json=sample_wix_form_data)
assert response.status_code == 200
# Event dispatcher is tested separately in its own test suite
class TestErrorHandling:
"""Test error handling across endpoints."""
def test_wix_webhook_invalid_json(self, client):
"""Test webhook with invalid JSON."""
response = client.post(
"/api/webhook/wix-form",
content=b"invalid json {{{",
headers={"Content-Type": "application/json"},
)
assert response.status_code == 422
def test_wix_webhook_missing_required_fields(self, client):
"""Test webhook with missing required fields."""
invalid_data = {"data": {}}
response = client.post("/api/webhook/wix-form", json=invalid_data)
# Should handle gracefully - may be 500 or 400 depending on validation
assert response.status_code in [400, 500]
def test_alpinebits_invalid_xml(self, client, basic_auth_headers):
"""Test AlpineBits endpoint with invalid XML."""
form_data = {
"action": "OTA_Ping:Handshaking",
"request": "<<invalid xml>>",
}
headers = {
**basic_auth_headers,
"X-AlpineBits-ClientProtocolVersion": "2024-10",
}
response = client.post(
"/api/alpinebits/server-2024-10",
data=form_data,
headers=headers,
)
# Should return error response
assert response.status_code in [400, 500]
class TestCORS:
"""Test CORS configuration."""
def test_cors_preflight_request(self, client):
"""Test CORS preflight request."""
response = client.options(
"/api/health",
headers={
"Origin": "https://example.wix.com",
"Access-Control-Request-Method": "POST",
},
)
# TestClient returns 400 for OPTIONS requests
# In production, CORS middleware handles preflight correctly
assert response.status_code in [200, 400, 405]
class TestRateLimiting:
"""Test rate limiting (requires actual rate limiter to be active)."""
def test_health_endpoint_rate_limit(self, client):
"""Test that health endpoint has rate limiting configured."""
# Make multiple requests
responses = []
for _ in range(5):
response = client.get("/api/health")
responses.append(response.status_code)
# All should succeed if under limit
assert all(status == 200 for status in responses)
if __name__ == "__main__":
pytest.main([__file__, "-v"])

85
uv.lock generated
View File

@@ -1,5 +1,5 @@
version = 1
revision = 2
revision = 3
requires-python = ">=3.13"
[[package]]
@@ -39,6 +39,11 @@ dependencies = [
{ name = "xsdata-pydantic", extra = ["cli", "lxml", "soap"] },
]
[package.dev-dependencies]
dev = [
{ name = "pytest-cov" },
]
[package.metadata]
requires-dist = [
{ name = "aiosqlite", specifier = ">=0.21.0" },
@@ -61,6 +66,9 @@ requires-dist = [
{ name = "xsdata-pydantic", extras = ["cli", "lxml", "soap"], specifier = ">=24.5" },
]
[package.metadata.requires-dev]
dev = [{ name = "pytest-cov", specifier = ">=7.0.0" }]
[[package]]
name = "annotated-types"
version = "0.7.0"
@@ -196,6 +204,67 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" },
]
[[package]]
name = "coverage"
version = "7.10.7"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/51/26/d22c300112504f5f9a9fd2297ce33c35f3d353e4aeb987c8419453b2a7c2/coverage-7.10.7.tar.gz", hash = "sha256:f4ab143ab113be368a3e9b795f9cd7906c5ef407d6173fe9675a902e1fffc239", size = 827704, upload-time = "2025-09-21T20:03:56.815Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/9a/94/b765c1abcb613d103b64fcf10395f54d69b0ef8be6a0dd9c524384892cc7/coverage-7.10.7-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:981a651f543f2854abd3b5fcb3263aac581b18209be49863ba575de6edf4c14d", size = 218320, upload-time = "2025-09-21T20:01:56.629Z" },
{ url = "https://files.pythonhosted.org/packages/72/4f/732fff31c119bb73b35236dd333030f32c4bfe909f445b423e6c7594f9a2/coverage-7.10.7-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:73ab1601f84dc804f7812dc297e93cd99381162da39c47040a827d4e8dafe63b", size = 218575, upload-time = "2025-09-21T20:01:58.203Z" },
{ url = "https://files.pythonhosted.org/packages/87/02/ae7e0af4b674be47566707777db1aa375474f02a1d64b9323e5813a6cdd5/coverage-7.10.7-cp313-cp313-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:a8b6f03672aa6734e700bbcd65ff050fd19cddfec4b031cc8cf1c6967de5a68e", size = 249568, upload-time = "2025-09-21T20:01:59.748Z" },
{ url = "https://files.pythonhosted.org/packages/a2/77/8c6d22bf61921a59bce5471c2f1f7ac30cd4ac50aadde72b8c48d5727902/coverage-7.10.7-cp313-cp313-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:10b6ba00ab1132a0ce4428ff68cf50a25efd6840a42cdf4239c9b99aad83be8b", size = 252174, upload-time = "2025-09-21T20:02:01.192Z" },
{ url = "https://files.pythonhosted.org/packages/b1/20/b6ea4f69bbb52dac0aebd62157ba6a9dddbfe664f5af8122dac296c3ee15/coverage-7.10.7-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:c79124f70465a150e89340de5963f936ee97097d2ef76c869708c4248c63ca49", size = 253447, upload-time = "2025-09-21T20:02:02.701Z" },
{ url = "https://files.pythonhosted.org/packages/f9/28/4831523ba483a7f90f7b259d2018fef02cb4d5b90bc7c1505d6e5a84883c/coverage-7.10.7-cp313-cp313-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:69212fbccdbd5b0e39eac4067e20a4a5256609e209547d86f740d68ad4f04911", size = 249779, upload-time = "2025-09-21T20:02:04.185Z" },
{ url = "https://files.pythonhosted.org/packages/a7/9f/4331142bc98c10ca6436d2d620c3e165f31e6c58d43479985afce6f3191c/coverage-7.10.7-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:7ea7c6c9d0d286d04ed3541747e6597cbe4971f22648b68248f7ddcd329207f0", size = 251604, upload-time = "2025-09-21T20:02:06.034Z" },
{ url = "https://files.pythonhosted.org/packages/ce/60/bda83b96602036b77ecf34e6393a3836365481b69f7ed7079ab85048202b/coverage-7.10.7-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:b9be91986841a75042b3e3243d0b3cb0b2434252b977baaf0cd56e960fe1e46f", size = 249497, upload-time = "2025-09-21T20:02:07.619Z" },
{ url = "https://files.pythonhosted.org/packages/5f/af/152633ff35b2af63977edd835d8e6430f0caef27d171edf2fc76c270ef31/coverage-7.10.7-cp313-cp313-musllinux_1_2_riscv64.whl", hash = "sha256:b281d5eca50189325cfe1f365fafade89b14b4a78d9b40b05ddd1fc7d2a10a9c", size = 249350, upload-time = "2025-09-21T20:02:10.34Z" },
{ url = "https://files.pythonhosted.org/packages/9d/71/d92105d122bd21cebba877228990e1646d862e34a98bb3374d3fece5a794/coverage-7.10.7-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:99e4aa63097ab1118e75a848a28e40d68b08a5e19ce587891ab7fd04475e780f", size = 251111, upload-time = "2025-09-21T20:02:12.122Z" },
{ url = "https://files.pythonhosted.org/packages/a2/9e/9fdb08f4bf476c912f0c3ca292e019aab6712c93c9344a1653986c3fd305/coverage-7.10.7-cp313-cp313-win32.whl", hash = "sha256:dc7c389dce432500273eaf48f410b37886be9208b2dd5710aaf7c57fd442c698", size = 220746, upload-time = "2025-09-21T20:02:13.919Z" },
{ url = "https://files.pythonhosted.org/packages/b1/b1/a75fd25df44eab52d1931e89980d1ada46824c7a3210be0d3c88a44aaa99/coverage-7.10.7-cp313-cp313-win_amd64.whl", hash = "sha256:cac0fdca17b036af3881a9d2729a850b76553f3f716ccb0360ad4dbc06b3b843", size = 221541, upload-time = "2025-09-21T20:02:15.57Z" },
{ url = "https://files.pythonhosted.org/packages/14/3a/d720d7c989562a6e9a14b2c9f5f2876bdb38e9367126d118495b89c99c37/coverage-7.10.7-cp313-cp313-win_arm64.whl", hash = "sha256:4b6f236edf6e2f9ae8fcd1332da4e791c1b6ba0dc16a2dc94590ceccb482e546", size = 220170, upload-time = "2025-09-21T20:02:17.395Z" },
{ url = "https://files.pythonhosted.org/packages/bb/22/e04514bf2a735d8b0add31d2b4ab636fc02370730787c576bb995390d2d5/coverage-7.10.7-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:a0ec07fd264d0745ee396b666d47cef20875f4ff2375d7c4f58235886cc1ef0c", size = 219029, upload-time = "2025-09-21T20:02:18.936Z" },
{ url = "https://files.pythonhosted.org/packages/11/0b/91128e099035ece15da3445d9015e4b4153a6059403452d324cbb0a575fa/coverage-7.10.7-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:dd5e856ebb7bfb7672b0086846db5afb4567a7b9714b8a0ebafd211ec7ce6a15", size = 219259, upload-time = "2025-09-21T20:02:20.44Z" },
{ url = "https://files.pythonhosted.org/packages/8b/51/66420081e72801536a091a0c8f8c1f88a5c4bf7b9b1bdc6222c7afe6dc9b/coverage-7.10.7-cp313-cp313t-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:f57b2a3c8353d3e04acf75b3fed57ba41f5c0646bbf1d10c7c282291c97936b4", size = 260592, upload-time = "2025-09-21T20:02:22.313Z" },
{ url = "https://files.pythonhosted.org/packages/5d/22/9b8d458c2881b22df3db5bb3e7369e63d527d986decb6c11a591ba2364f7/coverage-7.10.7-cp313-cp313t-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:1ef2319dd15a0b009667301a3f84452a4dc6fddfd06b0c5c53ea472d3989fbf0", size = 262768, upload-time = "2025-09-21T20:02:24.287Z" },
{ url = "https://files.pythonhosted.org/packages/f7/08/16bee2c433e60913c610ea200b276e8eeef084b0d200bdcff69920bd5828/coverage-7.10.7-cp313-cp313t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:83082a57783239717ceb0ad584de3c69cf581b2a95ed6bf81ea66034f00401c0", size = 264995, upload-time = "2025-09-21T20:02:26.133Z" },
{ url = "https://files.pythonhosted.org/packages/20/9d/e53eb9771d154859b084b90201e5221bca7674ba449a17c101a5031d4054/coverage-7.10.7-cp313-cp313t-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:50aa94fb1fb9a397eaa19c0d5ec15a5edd03a47bf1a3a6111a16b36e190cff65", size = 259546, upload-time = "2025-09-21T20:02:27.716Z" },
{ url = "https://files.pythonhosted.org/packages/ad/b0/69bc7050f8d4e56a89fb550a1577d5d0d1db2278106f6f626464067b3817/coverage-7.10.7-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:2120043f147bebb41c85b97ac45dd173595ff14f2a584f2963891cbcc3091541", size = 262544, upload-time = "2025-09-21T20:02:29.216Z" },
{ url = "https://files.pythonhosted.org/packages/ef/4b/2514b060dbd1bc0aaf23b852c14bb5818f244c664cb16517feff6bb3a5ab/coverage-7.10.7-cp313-cp313t-musllinux_1_2_i686.whl", hash = "sha256:2fafd773231dd0378fdba66d339f84904a8e57a262f583530f4f156ab83863e6", size = 260308, upload-time = "2025-09-21T20:02:31.226Z" },
{ url = "https://files.pythonhosted.org/packages/54/78/7ba2175007c246d75e496f64c06e94122bdb914790a1285d627a918bd271/coverage-7.10.7-cp313-cp313t-musllinux_1_2_riscv64.whl", hash = "sha256:0b944ee8459f515f28b851728ad224fa2d068f1513ef6b7ff1efafeb2185f999", size = 258920, upload-time = "2025-09-21T20:02:32.823Z" },
{ url = "https://files.pythonhosted.org/packages/c0/b3/fac9f7abbc841409b9a410309d73bfa6cfb2e51c3fada738cb607ce174f8/coverage-7.10.7-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:4b583b97ab2e3efe1b3e75248a9b333bd3f8b0b1b8e5b45578e05e5850dfb2c2", size = 261434, upload-time = "2025-09-21T20:02:34.86Z" },
{ url = "https://files.pythonhosted.org/packages/ee/51/a03bec00d37faaa891b3ff7387192cef20f01604e5283a5fabc95346befa/coverage-7.10.7-cp313-cp313t-win32.whl", hash = "sha256:2a78cd46550081a7909b3329e2266204d584866e8d97b898cd7fb5ac8d888b1a", size = 221403, upload-time = "2025-09-21T20:02:37.034Z" },
{ url = "https://files.pythonhosted.org/packages/53/22/3cf25d614e64bf6d8e59c7c669b20d6d940bb337bdee5900b9ca41c820bb/coverage-7.10.7-cp313-cp313t-win_amd64.whl", hash = "sha256:33a5e6396ab684cb43dc7befa386258acb2d7fae7f67330ebb85ba4ea27938eb", size = 222469, upload-time = "2025-09-21T20:02:39.011Z" },
{ url = "https://files.pythonhosted.org/packages/49/a1/00164f6d30d8a01c3c9c48418a7a5be394de5349b421b9ee019f380df2a0/coverage-7.10.7-cp313-cp313t-win_arm64.whl", hash = "sha256:86b0e7308289ddde73d863b7683f596d8d21c7d8664ce1dee061d0bcf3fbb4bb", size = 220731, upload-time = "2025-09-21T20:02:40.939Z" },
{ url = "https://files.pythonhosted.org/packages/23/9c/5844ab4ca6a4dd97a1850e030a15ec7d292b5c5cb93082979225126e35dd/coverage-7.10.7-cp314-cp314-macosx_10_13_x86_64.whl", hash = "sha256:b06f260b16ead11643a5a9f955bd4b5fd76c1a4c6796aeade8520095b75de520", size = 218302, upload-time = "2025-09-21T20:02:42.527Z" },
{ url = "https://files.pythonhosted.org/packages/f0/89/673f6514b0961d1f0e20ddc242e9342f6da21eaba3489901b565c0689f34/coverage-7.10.7-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:212f8f2e0612778f09c55dd4872cb1f64a1f2b074393d139278ce902064d5b32", size = 218578, upload-time = "2025-09-21T20:02:44.468Z" },
{ url = "https://files.pythonhosted.org/packages/05/e8/261cae479e85232828fb17ad536765c88dd818c8470aca690b0ac6feeaa3/coverage-7.10.7-cp314-cp314-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:3445258bcded7d4aa630ab8296dea4d3f15a255588dd535f980c193ab6b95f3f", size = 249629, upload-time = "2025-09-21T20:02:46.503Z" },
{ url = "https://files.pythonhosted.org/packages/82/62/14ed6546d0207e6eda876434e3e8475a3e9adbe32110ce896c9e0c06bb9a/coverage-7.10.7-cp314-cp314-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:bb45474711ba385c46a0bfe696c695a929ae69ac636cda8f532be9e8c93d720a", size = 252162, upload-time = "2025-09-21T20:02:48.689Z" },
{ url = "https://files.pythonhosted.org/packages/ff/49/07f00db9ac6478e4358165a08fb41b469a1b053212e8a00cb02f0d27a05f/coverage-7.10.7-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:813922f35bd800dca9994c5971883cbc0d291128a5de6b167c7aa697fcf59360", size = 253517, upload-time = "2025-09-21T20:02:50.31Z" },
{ url = "https://files.pythonhosted.org/packages/a2/59/c5201c62dbf165dfbc91460f6dbbaa85a8b82cfa6131ac45d6c1bfb52deb/coverage-7.10.7-cp314-cp314-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:93c1b03552081b2a4423091d6fb3787265b8f86af404cff98d1b5342713bdd69", size = 249632, upload-time = "2025-09-21T20:02:51.971Z" },
{ url = "https://files.pythonhosted.org/packages/07/ae/5920097195291a51fb00b3a70b9bbd2edbfe3c84876a1762bd1ef1565ebc/coverage-7.10.7-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:cc87dd1b6eaf0b848eebb1c86469b9f72a1891cb42ac7adcfbce75eadb13dd14", size = 251520, upload-time = "2025-09-21T20:02:53.858Z" },
{ url = "https://files.pythonhosted.org/packages/b9/3c/a815dde77a2981f5743a60b63df31cb322c944843e57dbd579326625a413/coverage-7.10.7-cp314-cp314-musllinux_1_2_i686.whl", hash = "sha256:39508ffda4f343c35f3236fe8d1a6634a51f4581226a1262769d7f970e73bffe", size = 249455, upload-time = "2025-09-21T20:02:55.807Z" },
{ url = "https://files.pythonhosted.org/packages/aa/99/f5cdd8421ea656abefb6c0ce92556709db2265c41e8f9fc6c8ae0f7824c9/coverage-7.10.7-cp314-cp314-musllinux_1_2_riscv64.whl", hash = "sha256:925a1edf3d810537c5a3abe78ec5530160c5f9a26b1f4270b40e62cc79304a1e", size = 249287, upload-time = "2025-09-21T20:02:57.784Z" },
{ url = "https://files.pythonhosted.org/packages/c3/7a/e9a2da6a1fc5d007dd51fca083a663ab930a8c4d149c087732a5dbaa0029/coverage-7.10.7-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:2c8b9a0636f94c43cd3576811e05b89aa9bc2d0a85137affc544ae5cb0e4bfbd", size = 250946, upload-time = "2025-09-21T20:02:59.431Z" },
{ url = "https://files.pythonhosted.org/packages/ef/5b/0b5799aa30380a949005a353715095d6d1da81927d6dbed5def2200a4e25/coverage-7.10.7-cp314-cp314-win32.whl", hash = "sha256:b7b8288eb7cdd268b0304632da8cb0bb93fadcfec2fe5712f7b9cc8f4d487be2", size = 221009, upload-time = "2025-09-21T20:03:01.324Z" },
{ url = "https://files.pythonhosted.org/packages/da/b0/e802fbb6eb746de006490abc9bb554b708918b6774b722bb3a0e6aa1b7de/coverage-7.10.7-cp314-cp314-win_amd64.whl", hash = "sha256:1ca6db7c8807fb9e755d0379ccc39017ce0a84dcd26d14b5a03b78563776f681", size = 221804, upload-time = "2025-09-21T20:03:03.4Z" },
{ url = "https://files.pythonhosted.org/packages/9e/e8/71d0c8e374e31f39e3389bb0bd19e527d46f00ea8571ec7ec8fd261d8b44/coverage-7.10.7-cp314-cp314-win_arm64.whl", hash = "sha256:097c1591f5af4496226d5783d036bf6fd6cd0cbc132e071b33861de756efb880", size = 220384, upload-time = "2025-09-21T20:03:05.111Z" },
{ url = "https://files.pythonhosted.org/packages/62/09/9a5608d319fa3eba7a2019addeacb8c746fb50872b57a724c9f79f146969/coverage-7.10.7-cp314-cp314t-macosx_10_13_x86_64.whl", hash = "sha256:a62c6ef0d50e6de320c270ff91d9dd0a05e7250cac2a800b7784bae474506e63", size = 219047, upload-time = "2025-09-21T20:03:06.795Z" },
{ url = "https://files.pythonhosted.org/packages/f5/6f/f58d46f33db9f2e3647b2d0764704548c184e6f5e014bef528b7f979ef84/coverage-7.10.7-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:9fa6e4dd51fe15d8738708a973470f67a855ca50002294852e9571cdbd9433f2", size = 219266, upload-time = "2025-09-21T20:03:08.495Z" },
{ url = "https://files.pythonhosted.org/packages/74/5c/183ffc817ba68e0b443b8c934c8795553eb0c14573813415bd59941ee165/coverage-7.10.7-cp314-cp314t-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:8fb190658865565c549b6b4706856d6a7b09302c797eb2cf8e7fe9dabb043f0d", size = 260767, upload-time = "2025-09-21T20:03:10.172Z" },
{ url = "https://files.pythonhosted.org/packages/0f/48/71a8abe9c1ad7e97548835e3cc1adbf361e743e9d60310c5f75c9e7bf847/coverage-7.10.7-cp314-cp314t-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:affef7c76a9ef259187ef31599a9260330e0335a3011732c4b9effa01e1cd6e0", size = 262931, upload-time = "2025-09-21T20:03:11.861Z" },
{ url = "https://files.pythonhosted.org/packages/84/fd/193a8fb132acfc0a901f72020e54be5e48021e1575bb327d8ee1097a28fd/coverage-7.10.7-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:6e16e07d85ca0cf8bafe5f5d23a0b850064e8e945d5677492b06bbe6f09cc699", size = 265186, upload-time = "2025-09-21T20:03:13.539Z" },
{ url = "https://files.pythonhosted.org/packages/b1/8f/74ecc30607dd95ad50e3034221113ccb1c6d4e8085cc761134782995daae/coverage-7.10.7-cp314-cp314t-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:03ffc58aacdf65d2a82bbeb1ffe4d01ead4017a21bfd0454983b88ca73af94b9", size = 259470, upload-time = "2025-09-21T20:03:15.584Z" },
{ url = "https://files.pythonhosted.org/packages/0f/55/79ff53a769f20d71b07023ea115c9167c0bb56f281320520cf64c5298a96/coverage-7.10.7-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:1b4fd784344d4e52647fd7857b2af5b3fbe6c239b0b5fa63e94eb67320770e0f", size = 262626, upload-time = "2025-09-21T20:03:17.673Z" },
{ url = "https://files.pythonhosted.org/packages/88/e2/dac66c140009b61ac3fc13af673a574b00c16efdf04f9b5c740703e953c0/coverage-7.10.7-cp314-cp314t-musllinux_1_2_i686.whl", hash = "sha256:0ebbaddb2c19b71912c6f2518e791aa8b9f054985a0769bdb3a53ebbc765c6a1", size = 260386, upload-time = "2025-09-21T20:03:19.36Z" },
{ url = "https://files.pythonhosted.org/packages/a2/f1/f48f645e3f33bb9ca8a496bc4a9671b52f2f353146233ebd7c1df6160440/coverage-7.10.7-cp314-cp314t-musllinux_1_2_riscv64.whl", hash = "sha256:a2d9a3b260cc1d1dbdb1c582e63ddcf5363426a1a68faa0f5da28d8ee3c722a0", size = 258852, upload-time = "2025-09-21T20:03:21.007Z" },
{ url = "https://files.pythonhosted.org/packages/bb/3b/8442618972c51a7affeead957995cfa8323c0c9bcf8fa5a027421f720ff4/coverage-7.10.7-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:a3cc8638b2480865eaa3926d192e64ce6c51e3d29c849e09d5b4ad95efae5399", size = 261534, upload-time = "2025-09-21T20:03:23.12Z" },
{ url = "https://files.pythonhosted.org/packages/b2/dc/101f3fa3a45146db0cb03f5b4376e24c0aac818309da23e2de0c75295a91/coverage-7.10.7-cp314-cp314t-win32.whl", hash = "sha256:67f8c5cbcd3deb7a60b3345dffc89a961a484ed0af1f6f73de91705cc6e31235", size = 221784, upload-time = "2025-09-21T20:03:24.769Z" },
{ url = "https://files.pythonhosted.org/packages/4c/a1/74c51803fc70a8a40d7346660379e144be772bab4ac7bb6e6b905152345c/coverage-7.10.7-cp314-cp314t-win_amd64.whl", hash = "sha256:e1ed71194ef6dea7ed2d5cb5f7243d4bcd334bfb63e59878519be558078f848d", size = 222905, upload-time = "2025-09-21T20:03:26.93Z" },
{ url = "https://files.pythonhosted.org/packages/12/65/f116a6d2127df30bcafbceef0302d8a64ba87488bf6f73a6d8eebf060873/coverage-7.10.7-cp314-cp314t-win_arm64.whl", hash = "sha256:7fe650342addd8524ca63d77b2362b02345e5f1a093266787d210c70a50b471a", size = 220922, upload-time = "2025-09-21T20:03:28.672Z" },
{ url = "https://files.pythonhosted.org/packages/ec/16/114df1c291c22cac3b0c127a73e0af5c12ed7bbb6558d310429a0ae24023/coverage-7.10.7-py3-none-any.whl", hash = "sha256:f7941f6f2fe6dd6807a1208737b8a0cbcf1cc6d7b07d24998ad2d63590868260", size = 209952, upload-time = "2025-09-21T20:03:53.918Z" },
]
[[package]]
name = "deprecated"
version = "1.2.18"
@@ -602,6 +671,20 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/04/93/2fa34714b7a4ae72f2f8dad66ba17dd9a2c793220719e736dda28b7aec27/pytest_asyncio-1.2.0-py3-none-any.whl", hash = "sha256:8e17ae5e46d8e7efe51ab6494dd2010f4ca8dae51652aa3c8d55acf50bfb2e99", size = 15095, upload-time = "2025-09-12T07:33:52.639Z" },
]
[[package]]
name = "pytest-cov"
version = "7.0.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "coverage" },
{ name = "pluggy" },
{ name = "pytest" },
]
sdist = { url = "https://files.pythonhosted.org/packages/5e/f7/c933acc76f5208b3b00089573cf6a2bc26dc80a8aece8f52bb7d6b1855ca/pytest_cov-7.0.0.tar.gz", hash = "sha256:33c97eda2e049a0c5298e91f519302a1334c26ac65c1a483d6206fd458361af1", size = 54328, upload-time = "2025-09-09T10:57:02.113Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ee/49/1377b49de7d0c1ce41292161ea0f721913fa8722c19fb9c1e3aa0367eecb/pytest_cov-7.0.0-py3-none-any.whl", hash = "sha256:3b8e9558b16cc1479da72058bdecf8073661c7f57f7d3c5f22a1c23507f2d861", size = 22424, upload-time = "2025-09-09T10:57:00.695Z" },
]
[[package]]
name = "python-dotenv"
version = "1.1.1"