8 Commits

Author SHA1 Message Date
Jonas Linter
52f95bd677 Updated config 2025-10-08 15:28:36 +02:00
Jonas Linter
6701dcd6bf Probably added gzip 2025-10-08 14:36:21 +02:00
Jonas Linter
9f0a77ca39 Removed unneccessary scripts 2025-10-08 14:26:11 +02:00
Jonas Linter
259243d44b updated db 2025-10-08 13:53:44 +02:00
Jonas Linter
84a57f3d98 Created endpoint for export 2025-10-08 13:28:38 +02:00
Jonas Linter
ff25142f62 All tests pass again. Handeling the children is difficult 2025-10-08 11:23:18 +02:00
Jonas Linter
ebbea84a4c Fixed acknowledgments 2025-10-08 10:47:18 +02:00
Jonas Linter
584def323c Starting unique_id migration 2025-10-08 10:45:00 +02:00
11 changed files with 296 additions and 587 deletions

View File

@@ -19,9 +19,6 @@
"notebook.output.textLineLimit": 200, "notebook.output.textLineLimit": 200,
"jupyter.debugJustMyCode": false, "jupyter.debugJustMyCode": false,
"python.testing.pytestEnabled": true, "python.testing.pytestEnabled": true,
"python.testing.pytestArgs": [
"tests"
],
"files.exclude": { "files.exclude": {
"**/*.egg-info": true, "**/*.egg-info": true,
"**/htmlcov": true, "**/htmlcov": true,

View File

@@ -2,19 +2,28 @@
# Use annotatedyaml for secrets and environment-specific overrides # Use annotatedyaml for secrets and environment-specific overrides
database: database:
url: "sqlite+aiosqlite:///alpinebits.db" # For local dev, use SQLite. For prod, override with PostgreSQL URL. url: "sqlite+aiosqlite:///alpinebits.db" # For local dev, use SQLite. For prod, override with PostgreSQL URL.
# url: "postgresql://user:password@host:port/dbname" # Example for Postgres # url: "postgresql://user:password@host:port/dbname" # Example for Postgres
# AlpineBits Python config
# Use annotatedyaml for secrets and environment-specific overrides
alpine_bits_auth: alpine_bits_auth:
- hotel_id: "12345" - hotel_id: "39054_001"
hotel_name: "Bemelmans Post" hotel_name: "Bemelmans Post"
username: "alice" username: "bemelman"
password: !secret ALICE_PASSWORD password: !secret BEMELMANS_PASSWORD
push_endpoint:
url: "https://example.com/push"
token: !secret PUSH_TOKEN_ALICE
username: "alice"
- hotel_id: "135" - hotel_id: "135"
hotel_name: "Bemelmans" hotel_name: "Testhotel"
username: "sebastian" username: "sebastian"
password: !secret BOB_PASSWORD password: !secret BOB_PASSWORD
- hotel_id: "39052_001"
hotel_name: "Jagthof Kaltern"
username: "jagthof"
password: !secret JAGTHOF_PASSWORD
- hotel_id: "39040_001"
hotel_name: "Residence Erika"
username: "erika"
password: !secret ERIKA_PASSWORD

View File

@@ -36,7 +36,7 @@ alpine-bits-server = "alpine_bits_python.main:main"
packages = ["src/alpine_bits_python"] packages = ["src/alpine_bits_python"]
[tool.pytest.ini_options] [tool.pytest.ini_options]
testpaths = ["test"] testpaths = ["tests"]
pythonpath = ["src"] pythonpath = ["src"]
[tool.ruff] [tool.ruff]

View File

@@ -658,12 +658,7 @@ def _process_single_reservation(
else: else:
raise ValueError("Unsupported message type: %s", message_type.value) raise ValueError("Unsupported message type: %s", message_type.value)
unique_id_str = reservation.unique_id unique_id_str = reservation.md5_unique_id
# TODO MAGIC shortening
if len(unique_id_str) > 32:
# strip to first 35 chars
unique_id_str = unique_id_str[:32]
# UniqueID # UniqueID
unique_id = UniqueId(type_value=UniqueIdType2.VALUE_14, id=unique_id_str) unique_id = UniqueId(type_value=UniqueIdType2.VALUE_14, id=unique_id_str)

View File

@@ -21,12 +21,19 @@ from xsdata.formats.dataclass.serializers.config import SerializerConfig
from xsdata_pydantic.bindings import XmlParser, XmlSerializer from xsdata_pydantic.bindings import XmlParser, XmlSerializer
from alpine_bits_python.alpine_bits_helpers import ( from alpine_bits_python.alpine_bits_helpers import (
create_res_notif_push_message, create_res_retrieve_response) create_res_notif_push_message,
create_res_retrieve_response,
)
from .db import AckedRequest, Customer, Reservation from .db import AckedRequest, Customer, Reservation
from .generated.alpinebits import (OtaNotifReportRq, OtaNotifReportRs, from .generated.alpinebits import (
OtaPingRq, OtaPingRs, OtaReadRq, OtaNotifReportRq,
WarningStatus) OtaNotifReportRs,
OtaPingRq,
OtaPingRs,
OtaReadRq,
WarningStatus,
)
# Configure logging # Configure logging
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
@@ -518,7 +525,7 @@ class ReadAction(AlpineBitsAction):
select(Reservation.id) select(Reservation.id)
.join( .join(
AckedRequest, AckedRequest,
AckedRequest.unique_id == Reservation.unique_id, Reservation.md5_unique_id == AckedRequest.unique_id,
) )
.filter(AckedRequest.client_id == client_info.client_id) .filter(AckedRequest.client_id == client_info.client_id)
) )

View File

@@ -4,8 +4,10 @@ import json
import logging import logging
import os import os
import urllib.parse import urllib.parse
from collections import defaultdict
from datetime import UTC, date, datetime from datetime import UTC, date, datetime
from functools import partial from functools import partial
from pathlib import Path
from typing import Any from typing import Any
import httpx import httpx
@@ -16,6 +18,8 @@ from fastapi.security import HTTPBasic, HTTPBasicCredentials
from slowapi.errors import RateLimitExceeded from slowapi.errors import RateLimitExceeded
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from alpine_bits_python.schemas import ReservationData
from .alpinebits_server import ( from .alpinebits_server import (
AlpineBitsActionName, AlpineBitsActionName,
AlpineBitsClientInfo, AlpineBitsClientInfo,
@@ -43,8 +47,6 @@ _LOGGER = logging.getLogger(__name__)
# HTTP Basic auth for AlpineBits # HTTP Basic auth for AlpineBits
security_basic = HTTPBasic() security_basic = HTTPBasic()
from collections import defaultdict
# --- Enhanced event dispatcher with hotel-specific routing --- # --- Enhanced event dispatcher with hotel-specific routing ---
class EventDispatcher: class EventDispatcher:
@@ -240,42 +242,6 @@ app.add_middleware(
) )
async def process_form_submission(submission_data: dict[str, Any]) -> None:
"""Background task to process the form submission.
Add your business logic here.
"""
try:
_LOGGER.info(
f"Processing form submission: {submission_data.get('submissionId')}"
)
# Example processing - you can replace this with your actual logic
form_name = submission_data.get("formName")
contact_email = (
submission_data.get("contact", {}).get("email")
if submission_data.get("contact")
else None
)
# Extract form fields
form_fields = {
k: v for k, v in submission_data.items() if k.startswith("field:")
}
_LOGGER.info(
f"Form: {form_name}, Contact: {contact_email}, Fields: {len(form_fields)}"
)
# Here you could:
# - Save to database
# - Send emails
# - Call external APIs
# - Process the data further
except Exception as e:
_LOGGER.error(f"Error processing form submission: {e!s}")
@api_router.get("/") @api_router.get("/")
@limiter.limit(DEFAULT_RATE_LIMIT) @limiter.limit(DEFAULT_RATE_LIMIT)
async def root(request: Request): async def root(request: Request):
@@ -307,6 +273,22 @@ async def health_check(request: Request):
} }
def create_db_reservation_from_data(
reservation_model: ReservationData, db_customer_id: int
) -> DBReservation:
"""Convert ReservationData to DBReservation, handling children_ages conversion."""
data = reservation_model.model_dump(exclude_none=True)
children_list = data.pop("children_ages", [])
children_csv = ",".join(str(int(a)) for a in children_list) if children_list else ""
data["children_ages"] = children_csv
# Inject FK
data["customer_id"] = db_customer_id
return DBReservation(**data)
# Extracted business logic for handling Wix form submissions # Extracted business logic for handling Wix form submissions
async def process_wix_form_submission(request: Request, data: dict[str, Any], db): async def process_wix_form_submission(request: Request, data: dict[str, Any], db):
"""Shared business logic for handling Wix form submissions (test and production).""" """Shared business logic for handling Wix form submissions (test and production)."""
@@ -392,15 +374,6 @@ async def process_wix_form_submission(request: Request, data: dict[str, Any], db
offer = data.get("field:angebot_auswaehlen") offer = data.get("field:angebot_auswaehlen")
# UTM and offer
utm_fields = [
("utm_Source", "utm_source"),
("utm_Medium", "utm_medium"),
("utm_Campaign", "utm_campaign"),
("utm_Term", "utm_term"),
("utm_Content", "utm_content"),
]
# get submissionId and ensure max length 35. Generate one if not present # get submissionId and ensure max length 35. Generate one if not present
unique_id = data.get("submissionId", generate_unique_id()) unique_id = data.get("submissionId", generate_unique_id())
@@ -446,14 +419,15 @@ async def process_wix_form_submission(request: Request, data: dict[str, Any], db
or "Frangart Inn" # fallback or "Frangart Inn" # fallback
) )
db_reservation = DBReservation( reservation = ReservationData(
customer_id=db_customer.id,
unique_id=unique_id, unique_id=unique_id,
start_date=date.fromisoformat(start_date) if start_date else None, start_date=date.fromisoformat(start_date),
end_date=date.fromisoformat(end_date) if end_date else None, end_date=date.fromisoformat(end_date),
num_adults=num_adults, num_adults=num_adults,
num_children=num_children, num_children=num_children,
children_ages=",".join(str(a) for a in children_ages), children_ages=children_ages,
hotel_code=hotel_code,
hotel_name=hotel_name,
offer=offer, offer=offer,
created_at=datetime.now(UTC), created_at=datetime.now(UTC),
utm_source=data.get("field:utm_source"), utm_source=data.get("field:utm_source"),
@@ -464,9 +438,12 @@ async def process_wix_form_submission(request: Request, data: dict[str, Any], db
user_comment=data.get("field:long_answer_3524", ""), user_comment=data.get("field:long_answer_3524", ""),
fbclid=data.get("field:fbclid"), fbclid=data.get("field:fbclid"),
gclid=data.get("field:gclid"), gclid=data.get("field:gclid"),
hotel_code=hotel_code,
hotel_name=hotel_name,
) )
if reservation.md5_unique_id is None:
raise HTTPException(status_code=400, detail="Failed to generate md5_unique_id")
db_reservation = create_db_reservation_from_data(reservation, db_customer.id)
db.add(db_reservation) db.add(db_reservation)
await db.commit() await db.commit()
await db.refresh(db_reservation) await db.refresh(db_reservation)
@@ -499,65 +476,6 @@ async def process_wix_form_submission(request: Request, data: dict[str, Any], db
} }
@api_router.post("/webhook/wix-form")
@webhook_limiter.limit(WEBHOOK_RATE_LIMIT)
async def handle_wix_form(
request: Request, data: dict[str, Any], db_session=Depends(get_async_session)
):
"""Unified endpoint to handle Wix form submissions (test and production).
No authentication required for this endpoint.
"""
try:
return await process_wix_form_submission(request, data, db_session)
except Exception as e:
_LOGGER.error(f"Error in handle_wix_form: {e!s}")
# log stacktrace
import traceback
traceback_str = traceback.format_exc()
_LOGGER.error(f"Stack trace for handle_wix_form: {traceback_str}")
raise HTTPException(status_code=500, detail="Error processing Wix form data")
@api_router.post("/webhook/wix-form/test")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def handle_wix_form_test(
request: Request, data: dict[str, Any], db_session=Depends(get_async_session)
):
"""Test endpoint to verify the API is working with raw JSON data.
No authentication required for testing purposes.
"""
try:
return await process_wix_form_submission(request, data, db_session)
except Exception as e:
_LOGGER.error(f"Error in handle_wix_form_test: {e!s}")
raise HTTPException(status_code=500, detail="Error processing test data")
# UNUSED
@api_router.post("/admin/generate-api-key")
@limiter.limit("5/hour") # Very restrictive for admin operations
async def generate_new_api_key(
request: Request, admin_key: str = Depends(validate_api_key)
):
"""Admin endpoint to generate new API keys.
Requires admin API key and is heavily rate limited.
"""
if admin_key != "admin-key":
raise HTTPException(status_code=403, detail="Admin access required")
new_key = generate_api_key()
_LOGGER.info(f"Generated new API key (requested by: {admin_key})")
return {
"status": "success",
"message": "New API key generated",
"api_key": new_key,
"timestamp": datetime.now().isoformat(),
"note": "Store this key securely - it won't be shown again",
}
async def validate_basic_auth( async def validate_basic_auth(
credentials: HTTPBasicCredentials = Depends(security_basic), credentials: HTTPBasicCredentials = Depends(security_basic),
) -> str: ) -> str:
@@ -595,6 +513,142 @@ async def validate_basic_auth(
return credentials.username, credentials.password return credentials.username, credentials.password
@api_router.post("/webhook/wix-form")
@webhook_limiter.limit(WEBHOOK_RATE_LIMIT)
async def handle_wix_form(
request: Request, data: dict[str, Any], db_session=Depends(get_async_session)
):
"""Unified endpoint to handle Wix form submissions (test and production).
No authentication required for this endpoint.
"""
try:
return await process_wix_form_submission(request, data, db_session)
except Exception as e:
_LOGGER.error(f"Error in handle_wix_form: {e!s}")
# log stacktrace
import traceback
traceback_str = traceback.format_exc()
_LOGGER.error(f"Stack trace for handle_wix_form: {traceback_str}")
raise HTTPException(status_code=500, detail="Error processing Wix form data")
@api_router.post("/webhook/wix-form/test")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def handle_wix_form_test(
request: Request, data: dict[str, Any], db_session=Depends(get_async_session)
):
"""Test endpoint to verify the API is working with raw JSON data.
No authentication required for testing purposes.
"""
try:
return await process_wix_form_submission(request, data, db_session)
except Exception as e:
_LOGGER.error(f"Error in handle_wix_form_test: {e!s}")
raise HTTPException(status_code=500, detail="Error processing test data")
@api_router.post("/hoteldata/conversions_import")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def handle_xml_upload(
request: Request, credentials_tupel: tuple = Depends(validate_basic_auth)
):
"""Endpoint for receiving XML files for conversion processing.
Requires basic authentication and saves XML files to log directory.
Supports gzip compression via Content-Encoding header.
"""
try:
# Get the raw body content
body = await request.body()
if not body:
raise HTTPException(
status_code=400, detail="ERROR: No XML content provided"
)
# Check if content is gzip compressed
content_encoding = request.headers.get("content-encoding", "").lower()
is_gzipped = content_encoding == "gzip"
# Decompress if gzipped
if is_gzipped:
try:
body = gzip.decompress(body)
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"ERROR: Failed to decompress gzip content: {e}",
) from e
# Try to decode as UTF-8
try:
xml_content = body.decode("utf-8")
except UnicodeDecodeError:
# If UTF-8 fails, try with latin-1 as fallback
xml_content = body.decode("latin-1")
# Basic validation that it's XML-like
if not xml_content.strip().startswith("<"):
raise HTTPException(
status_code=400, detail="ERROR: Content does not appear to be XML"
)
# Create logs directory for XML conversions
logs_dir = Path("logs/conversions_import")
if not logs_dir.exists():
logs_dir.mkdir(parents=True, mode=0o755, exist_ok=True)
_LOGGER.info("Created directory: %s", logs_dir)
# Generate filename with timestamp and authenticated user
username, _ = credentials_tupel
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_filename = logs_dir / f"xml_import_{username}_{timestamp}.xml"
# Save XML content to file
log_filename.write_text(xml_content, encoding="utf-8")
_LOGGER.info("XML file saved to %s by user %s", log_filename, username)
response_headers = {
"Content-Type": "application/xml; charset=utf-8",
"X-AlpineBits-Server-Accept-Encoding": "gzip",
}
return Response(
content="Xml received", headers=response_headers, status_code=200
)
except HTTPException:
raise
except Exception:
_LOGGER.exception("Error in handle_xml_upload")
raise HTTPException(status_code=500, detail="Error processing XML upload")
# UNUSED
@api_router.post("/admin/generate-api-key")
@limiter.limit("5/hour") # Very restrictive for admin operations
async def generate_new_api_key(
request: Request, admin_key: str = Depends(validate_api_key)
):
"""Admin endpoint to generate new API keys.
Requires admin API key and is heavily rate limited.
"""
if admin_key != "admin-key":
raise HTTPException(status_code=403, detail="Admin access required")
new_key = generate_api_key()
_LOGGER.info(f"Generated new API key (requested by: {admin_key})")
return {
"status": "success",
"message": "New API key generated",
"api_key": new_key,
"timestamp": datetime.now().isoformat(),
"note": "Store this key securely - it won't be shown again",
}
# TODO Bit sketchy. May need requests-toolkit in the future # TODO Bit sketchy. May need requests-toolkit in the future
def parse_multipart_data(content_type: str, body: bytes) -> dict[str, Any]: def parse_multipart_data(content_type: str, body: bytes) -> dict[str, Any]:
"""Parse multipart/form-data from raw request body. """Parse multipart/form-data from raw request body.

View File

@@ -44,7 +44,8 @@ class Reservation(Base):
__tablename__ = "reservations" __tablename__ = "reservations"
id = Column(Integer, primary_key=True) id = Column(Integer, primary_key=True)
customer_id = Column(Integer, ForeignKey("customers.id")) customer_id = Column(Integer, ForeignKey("customers.id"))
unique_id = Column(String(35), unique=True) # max length 35 unique_id = Column(String, unique=True)
md5_unique_id = Column(String(32), unique=True) # max length 32 guaranteed
start_date = Column(Date) start_date = Column(Date)
end_date = Column(Date) end_date = Column(Date)
num_adults = Column(Integer) num_adults = Column(Integer)

View File

@@ -9,6 +9,7 @@ Separating validation (Pydantic) from persistence (SQLAlchemy) and
from XML generation (xsdata) follows clean architecture principles. from XML generation (xsdata) follows clean architecture principles.
""" """
import hashlib
from datetime import date from datetime import date
from enum import Enum from enum import Enum
@@ -35,6 +36,55 @@ class PhoneNumber(BaseModel):
return " ".join(v.split()) return " ".join(v.split())
class ReservationData(BaseModel):
"""Validated reservation data."""
unique_id: str = Field(..., min_length=1, max_length=200)
md5_unique_id: str | None = Field(None, min_length=1, max_length=32)
start_date: date
end_date: date
num_adults: int = Field(..., ge=1)
num_children: int = Field(0, ge=0, le=10)
children_ages: list[int] = Field(default_factory=list)
hotel_code: str = Field(..., min_length=1, max_length=50)
hotel_name: str | None = Field(None, max_length=200)
offer: str | None = Field(None, max_length=500)
user_comment: str | None = Field(None, max_length=2000)
fbclid: str | None = Field(None, max_length=100)
gclid: str | None = Field(None, max_length=100)
utm_source: str | None = Field(None, max_length=100)
utm_medium: str | None = Field(None, max_length=100)
utm_campaign: str | None = Field(None, max_length=100)
utm_term: str | None = Field(None, max_length=100)
utm_content: str | None = Field(None, max_length=100)
@model_validator(mode="after")
def ensure_md5(self) -> "ReservationData":
"""Ensure md5_unique_id is set after model validation.
Using a model_validator in 'after' mode lets us access all fields via
the instance and set md5_unique_id in-place when it wasn't provided.
"""
if not getattr(self, "md5_unique_id", None) and getattr(
self, "unique_id", None
):
self.md5_unique_id = hashlib.md5(self.unique_id.encode("utf-8")).hexdigest()
return self
@model_validator(mode="after")
def validate_children_ages(self) -> "ReservationData":
"""Ensure children_ages matches num_children."""
if len(self.children_ages) != self.num_children:
raise ValueError(
f"Number of children ages ({len(self.children_ages)}) "
f"must match num_children ({self.num_children})"
)
for age in self.children_ages:
if age < 0 or age > 17:
raise ValueError(f"Child age {age} must be between 0 and 17")
return self
class CustomerData(BaseModel): class CustomerData(BaseModel):
"""Validated customer data for creating reservations and guests.""" """Validated customer data for creating reservations and guests."""
@@ -168,58 +218,6 @@ class CommentsData(BaseModel):
model_config = {"from_attributes": True} model_config = {"from_attributes": True}
class ReservationData(BaseModel):
"""Validated reservation data."""
unique_id: str = Field(..., min_length=1, max_length=35)
start_date: date
end_date: date
num_adults: int = Field(..., ge=1, le=20)
num_children: int = Field(0, ge=0, le=10)
children_ages: list[int] = Field(default_factory=list)
hotel_code: str = Field(..., min_length=1, max_length=50)
hotel_name: str | None = Field(None, max_length=200)
offer: str | None = Field(None, max_length=500)
user_comment: str | None = Field(None, max_length=2000)
fbclid: str | None = Field(None, max_length=100)
gclid: str | None = Field(None, max_length=100)
utm_source: str | None = Field(None, max_length=100)
utm_medium: str | None = Field(None, max_length=100)
utm_campaign: str | None = Field(None, max_length=100)
utm_term: str | None = Field(None, max_length=100)
utm_content: str | None = Field(None, max_length=100)
@model_validator(mode="after")
def validate_dates(self) -> "ReservationData":
"""Ensure end_date is after start_date."""
if self.end_date <= self.start_date:
raise ValueError("end_date must be after start_date")
return self
@model_validator(mode="after")
def validate_children_ages(self) -> "ReservationData":
"""Ensure children_ages matches num_children."""
if len(self.children_ages) != self.num_children:
raise ValueError(
f"Number of children ages ({len(self.children_ages)}) "
f"must match num_children ({self.num_children})"
)
for age in self.children_ages:
if age < 0 or age > 17:
raise ValueError(f"Child age {age} must be between 0 and 17")
return self
@field_validator("unique_id")
@classmethod
def validate_unique_id_length(cls, v: str) -> str:
"""Ensure unique_id doesn't exceed max length."""
if len(v) > 35:
raise ValueError(f"unique_id length {len(v)} exceeds maximum of 35")
return v
model_config = {"from_attributes": True}
# Example usage in a service layer # Example usage in a service layer
class ReservationService: class ReservationService:
"""Example service showing how to use Pydantic models with SQLAlchemy.""" """Example service showing how to use Pydantic models with SQLAlchemy."""

View File

@@ -1,131 +0,0 @@
#!/usr/bin/env python3
"""Configuration and setup script for the Wix Form Handler API
"""
import os
import secrets
import sys
# Add parent directory to path to import from src
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from alpine_bits_python.auth import generate_api_key
def generate_secure_keys():
"""Generate secure API keys for the application"""
print("🔐 Generating Secure API Keys")
print("=" * 50)
# Generate API keys
wix_api_key = generate_api_key()
admin_api_key = generate_api_key()
webhook_secret = secrets.token_urlsafe(32)
print(f"🔑 Wix Webhook API Key: {wix_api_key}")
print(f"🔐 Admin API Key: {admin_api_key}")
print(f"🔒 Webhook Secret: {webhook_secret}")
print("\n📋 Environment Variables")
print("-" * 30)
print(f"export WIX_API_KEY='{wix_api_key}'")
print(f"export ADMIN_API_KEY='{admin_api_key}'")
print(f"export WIX_WEBHOOK_SECRET='{webhook_secret}'")
print("export REDIS_URL='redis://localhost:6379' # Optional for production")
print("\n🔧 .env File Content")
print("-" * 20)
print(f"WIX_API_KEY={wix_api_key}")
print(f"ADMIN_API_KEY={admin_api_key}")
print(f"WIX_WEBHOOK_SECRET={webhook_secret}")
print("REDIS_URL=redis://localhost:6379")
# Optionally write to .env file
create_env = input("\n❓ Create .env file? (y/n): ").lower().strip()
if create_env == "y":
# Create .env in the project root (two levels up from scripts)
env_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))), ".env"
)
with open(env_path, "w") as f:
f.write(f"WIX_API_KEY={wix_api_key}\n")
f.write(f"ADMIN_API_KEY={admin_api_key}\n")
f.write(f"WIX_WEBHOOK_SECRET={webhook_secret}\n")
f.write("REDIS_URL=redis://localhost:6379\n")
print(f"✅ .env file created at {env_path}!")
print("⚠️ Add .env to your .gitignore file!")
print("\n🌐 Wix Configuration")
print("-" * 20)
print("1. In your Wix site, go to Settings > Webhooks")
print("2. Add webhook URL: https://yourdomain.com/webhook/wix-form")
print("3. Add custom header: Authorization: Bearer " + wix_api_key)
print("4. Optionally configure webhook signature with the secret above")
return {
"wix_api_key": wix_api_key,
"admin_api_key": admin_api_key,
"webhook_secret": webhook_secret,
}
def check_security_setup():
"""Check current security configuration"""
print("🔍 Security Configuration Check")
print("=" * 40)
# Check environment variables
wix_key = os.getenv("WIX_API_KEY")
admin_key = os.getenv("ADMIN_API_KEY")
webhook_secret = os.getenv("WIX_WEBHOOK_SECRET")
redis_url = os.getenv("REDIS_URL")
print("Environment Variables:")
print(f" WIX_API_KEY: {'✅ Set' if wix_key else '❌ Not set'}")
print(f" ADMIN_API_KEY: {'✅ Set' if admin_key else '❌ Not set'}")
print(f" WIX_WEBHOOK_SECRET: {'✅ Set' if webhook_secret else '❌ Not set'}")
print(f" REDIS_URL: {'✅ Set' if redis_url else '⚠️ Optional (using in-memory)'}")
# Security recommendations
print("\n🛡️ Security Recommendations:")
if not wix_key:
print(" ❌ Set WIX_API_KEY environment variable")
elif len(wix_key) < 32:
print(" ⚠️ WIX_API_KEY should be longer for better security")
else:
print(" ✅ WIX_API_KEY looks secure")
if not admin_key:
print(" ❌ Set ADMIN_API_KEY environment variable")
elif wix_key and admin_key == wix_key:
print(" ❌ Admin and Wix keys should be different")
else:
print(" ✅ ADMIN_API_KEY configured")
if not webhook_secret:
print(" ⚠️ Consider setting WIX_WEBHOOK_SECRET for signature validation")
else:
print(" ✅ Webhook signature validation enabled")
print("\n🚀 Production Checklist:")
print(" - Use HTTPS in production")
print(" - Set up Redis for distributed rate limiting")
print(" - Configure proper CORS origins")
print(" - Set up monitoring and logging")
print(" - Regular key rotation")
if __name__ == "__main__":
print("🔐 Wix Form Handler API - Security Setup")
print("=" * 50)
choice = input(
"Choose an option:\n1. Generate new API keys\n2. Check current setup\n\nEnter choice (1 or 2): "
).strip()
if choice == "1":
generate_secure_keys()
elif choice == "2":
check_security_setup()
else:
print("Invalid choice. Please run again and choose 1 or 2.")

View File

@@ -1,219 +0,0 @@
#!/usr/bin/env python3
"""Test script for the Secure Wix Form Handler API
"""
import asyncio
import os
import sys
from datetime import datetime
import aiohttp
# Add parent directory to path to import from src
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# API Configuration
BASE_URL = "http://localhost:8000"
# API Keys for testing - replace with your actual keys
TEST_API_KEY = os.getenv("WIX_API_KEY", "sk_live_your_secure_api_key_here")
ADMIN_API_KEY = os.getenv("ADMIN_API_KEY", "sk_admin_your_admin_key_here")
# Sample Wix form data based on your example
SAMPLE_WIX_DATA = {
"formName": "Contact Form",
"submissions": [],
"submissionTime": "2024-03-20T10:30:00+00:00",
"formFieldMask": ["email", "name", "phone"],
"submissionId": "test-submission-123",
"contactId": "test-contact-456",
"submissionsLink": "https://www.wix.app/forms/test-form/submissions",
"submissionPdf": {
"url": "https://example.com/submission.pdf",
"filename": "submission.pdf",
},
"formId": "test-form-789",
"field:email_5139": "test@example.com",
"field:first_name_abae": "John",
"field:last_name_d97c": "Doe",
"field:phone_4c77": "+1234567890",
"field:anrede": "Herr",
"field:anzahl_kinder": "2",
"field:alter_kind_3": "8",
"field:alter_kind_4": "12",
"field:long_answer_3524": "This is a long answer field with more details about the inquiry.",
"contact": {
"name": {"first": "John", "last": "Doe"},
"email": "test@example.com",
"locale": "de",
"company": "Test Company",
"birthdate": "1985-05-15",
"labelKeys": {},
"contactId": "test-contact-456",
"address": {
"street": "Test Street 123",
"city": "Test City",
"country": "Germany",
"postalCode": "12345",
},
"jobTitle": "Manager",
"phone": "+1234567890",
"createdDate": "2024-03-20T10:00:00.000Z",
"updatedDate": "2024-03-20T10:30:00.000Z",
},
}
async def test_api():
"""Test the API endpoints with authentication"""
headers_with_auth = {
"Content-Type": "application/json",
"Authorization": f"Bearer {TEST_API_KEY}",
}
admin_headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {ADMIN_API_KEY}",
}
async with aiohttp.ClientSession() as session:
# Test health endpoint (no auth required)
print("1. Testing health endpoint (no auth)...")
try:
async with session.get(f"{BASE_URL}/api/health") as response:
result = await response.json()
print(f" ✅ Health check: {response.status} - {result.get('status')}")
except Exception as e:
print(f" ❌ Health check failed: {e}")
# Test root endpoint (no auth required)
print("\n2. Testing root endpoint (no auth)...")
try:
async with session.get(f"{BASE_URL}/api/") as response:
result = await response.json()
print(f" ✅ Root: {response.status} - {result.get('message')}")
except Exception as e:
print(f" ❌ Root endpoint failed: {e}")
# Test webhook endpoint without auth (should fail)
print("\n3. Testing webhook endpoint WITHOUT auth (should fail)...")
try:
async with session.post(
f"{BASE_URL}/api/webhook/wix-form",
json=SAMPLE_WIX_DATA,
headers={"Content-Type": "application/json"},
) as response:
result = await response.json()
if response.status == 401:
print(
f" ✅ Correctly rejected: {response.status} - {result.get('detail')}"
)
else:
print(f" ❌ Unexpected response: {response.status} - {result}")
except Exception as e:
print(f" ❌ Test failed: {e}")
# Test webhook endpoint with valid auth
print("\n4. Testing webhook endpoint WITH valid auth...")
try:
async with session.post(
f"{BASE_URL}/api/webhook/wix-form",
json=SAMPLE_WIX_DATA,
headers=headers_with_auth,
) as response:
result = await response.json()
if response.status == 200:
print(
f" ✅ Webhook success: {response.status} - {result.get('status')}"
)
else:
print(f" ❌ Webhook failed: {response.status} - {result}")
except Exception as e:
print(f" ❌ Webhook test failed: {e}")
# Test test endpoint with auth
print("\n5. Testing simple test endpoint WITH auth...")
try:
async with session.post(
f"{BASE_URL}/api/webhook/wix-form/test",
json={"test": "data", "timestamp": datetime.now().isoformat()},
headers=headers_with_auth,
) as response:
result = await response.json()
if response.status == 200:
print(
f" ✅ Test endpoint: {response.status} - {result.get('status')}"
)
else:
print(f" ❌ Test endpoint failed: {response.status} - {result}")
except Exception as e:
print(f" ❌ Test endpoint failed: {e}")
# Test rate limiting by making multiple rapid requests
print("\n6. Testing rate limiting (making 5 rapid requests)...")
rate_limit_test_count = 0
for i in range(5):
try:
async with session.get(f"{BASE_URL}/api/health") as response:
if response.status == 200:
rate_limit_test_count += 1
elif response.status == 429:
print(f" ✅ Rate limit triggered on request {i + 1}")
break
except Exception as e:
print(f" ❌ Rate limit test failed: {e}")
break
if rate_limit_test_count == 5:
print(" No rate limit reached (normal for low request volume)")
# Test admin endpoint (if admin key is configured)
print("\n7. Testing admin stats endpoint...")
try:
async with session.get(
f"{BASE_URL}/api/admin/stats", headers=admin_headers
) as response:
result = await response.json()
if response.status == 200:
print(
f" ✅ Admin stats: {response.status} - {result.get('status')}"
)
elif response.status == 401:
print(
f" ⚠️ Admin access denied (API key not configured): {result.get('detail')}"
)
else:
print(f" ❌ Admin endpoint failed: {response.status} - {result}")
except Exception as e:
print(f" ❌ Admin test failed: {e}")
if __name__ == "__main__":
print("🔒 Testing Secure Wix Form Handler API...")
print("=" * 60)
print("📍 API URL:", BASE_URL)
print(
"🔑 Using API Key:",
TEST_API_KEY[:20] + "..." if len(TEST_API_KEY) > 20 else TEST_API_KEY,
)
print(
"🔐 Using Admin Key:",
ADMIN_API_KEY[:20] + "..." if len(ADMIN_API_KEY) > 20 else ADMIN_API_KEY,
)
print("=" * 60)
print("Make sure the API is running with: python3 run_api.py")
print("-" * 60)
try:
asyncio.run(test_api())
print("\n" + "=" * 60)
print("✅ Testing completed!")
print("\n📋 Quick Setup Reminder:")
print("1. Set environment variables:")
print(" export WIX_API_KEY='your_secure_api_key'")
print(" export ADMIN_API_KEY='your_admin_key'")
print("2. Configure Wix webhook URL: https://yourdomain.com/webhook/wix-form")
print("3. Add Authorization header: Bearer your_api_key")
except Exception as e:
print(f"\n❌ Error testing API: {e}")
print("Make sure the API server is running!")

View File

@@ -16,6 +16,7 @@ from alpine_bits_python.alpinebits_server import AlpineBitsClientInfo
from alpine_bits_python.db import Base, Customer, Reservation from alpine_bits_python.db import Base, Customer, Reservation
from alpine_bits_python.generated import OtaReadRq from alpine_bits_python.generated import OtaReadRq
from alpine_bits_python.generated.alpinebits import OtaResRetrieveRs from alpine_bits_python.generated.alpinebits import OtaResRetrieveRs
from alpine_bits_python.schemas import ReservationData
@pytest.fixture @pytest.fixture
@@ -76,15 +77,13 @@ def sample_customer():
@pytest.fixture @pytest.fixture
def sample_reservation(sample_customer): def sample_reservation(sample_customer):
"""Create a sample reservation for testing.""" """Create a sample reservation for testing."""
return Reservation( reservation = ReservationData(
id=1,
customer_id=1,
unique_id="RES-2024-001", unique_id="RES-2024-001",
start_date=date(2024, 12, 25), start_date=date(2024, 12, 25),
end_date=date(2024, 12, 31), end_date=date(2024, 12, 31),
num_adults=2, num_adults=2,
num_children=1, num_children=1,
children_ages="8", children_ages=[8],
offer="Christmas Special", offer="Christmas Special",
created_at=datetime.now(UTC), created_at=datetime.now(UTC),
utm_source="google", utm_source="google",
@@ -97,6 +96,19 @@ def sample_reservation(sample_customer):
gclid="abc123xyz", gclid="abc123xyz",
hotel_code="HOTEL123", hotel_code="HOTEL123",
hotel_name="Alpine Paradise Resort", hotel_name="Alpine Paradise Resort",
)
data = reservation.model_dump(exclude_none=True)
children_list = data.pop("children_ages", [])
children_csv = ",".join(str(int(a)) for a in children_list) if children_list else ""
data["children_ages"] = children_csv
print(data)
return Reservation(
id=1,
customer_id=1,
**data,
customer=sample_customer, customer=sample_customer,
) )
@@ -115,18 +127,28 @@ def minimal_customer():
@pytest.fixture @pytest.fixture
def minimal_reservation(minimal_customer): def minimal_reservation(minimal_customer):
"""Create a minimal reservation with only required fields.""" """Create a minimal reservation with only required fields."""
return Reservation( reservation = ReservationData(
id=2,
customer_id=2,
unique_id="RES-2024-002", unique_id="RES-2024-002",
start_date=date(2025, 1, 15), start_date=date(2025, 1, 15),
end_date=date(2025, 1, 20), end_date=date(2025, 1, 20),
num_adults=1, num_adults=1,
num_children=0, num_children=0,
children_ages="", children_ages=[],
hotel_code="HOTEL123", hotel_code="HOTEL123",
hotel_name="Alpine Paradise Resort",
created_at=datetime.now(UTC), created_at=datetime.now(UTC),
hotel_name="Alpine Paradise Resort",
)
data = reservation.model_dump(exclude_none=True)
children_list = data.pop("children_ages", [])
children_csv = ",".join(str(int(a)) for a in children_list) if children_list else ""
data["children_ages"] = children_csv
return Reservation(
id=2,
customer_id=2,
**data,
customer=minimal_customer, customer=minimal_customer,
) )
@@ -235,7 +257,7 @@ class TestCreateResRetrieveResponse:
) )
assert xml_output is not None assert xml_output is not None
assert "RES-2024-001" in xml_output # assert "RES-2024-001" in xml_output does not work due to hashing
assert "John" in xml_output assert "John" in xml_output
assert "Doe" in xml_output assert "Doe" in xml_output
assert "HOTEL123" in xml_output assert "HOTEL123" in xml_output
@@ -265,8 +287,8 @@ class TestCreateResRetrieveResponse:
response, ns_map={None: "http://www.opentravel.org/OTA/2003/05"} response, ns_map={None: "http://www.opentravel.org/OTA/2003/05"}
) )
assert "RES-2024-001" in xml_output # assert "RES-2024-001" in xml_output
assert "RES-2024-002" in xml_output # assert "RES-2024-002" in xml_output
assert "John" in xml_output assert "John" in xml_output
assert "Jane" in xml_output assert "Jane" in xml_output
@@ -344,7 +366,7 @@ class TestXMLParsing:
assert "john.doe@example.com" in xml_output assert "john.doe@example.com" in xml_output
# Verify reservation data is present # Verify reservation data is present
assert "RES-2024-001" in xml_output # assert "RES-2024-001" in xml_output
assert "HOTEL123" in xml_output assert "HOTEL123" in xml_output
@@ -384,33 +406,6 @@ class TestEdgeCases:
assert response is not None assert response is not None
assert xml_output is not None assert xml_output is not None
def test_long_unique_id_truncation(self):
"""Test that long unique IDs are handled properly."""
customer = Customer(
id=98,
given_name="Test",
surname="User",
contact_id="CONTACT-98",
)
# Unique ID at max length (35 chars)
reservation = Reservation(
id=98,
customer_id=98,
unique_id="A" * 35, # Max length
start_date=date(2025, 1, 1),
end_date=date(2025, 1, 5),
num_adults=1,
num_children=0,
children_ages="",
hotel_code="HOTEL123",
created_at=datetime.now(UTC),
)
reservation_pairs = [(reservation, customer)]
response = create_res_retrieve_response(reservation_pairs)
assert response is not None
def test_reservation_with_all_utm_parameters(self): def test_reservation_with_all_utm_parameters(self):
"""Test reservation with all UTM tracking parameters.""" """Test reservation with all UTM tracking parameters."""
customer = Customer( customer = Customer(
@@ -419,15 +414,13 @@ class TestEdgeCases:
surname="Test", surname="Test",
contact_id="CONTACT-97", contact_id="CONTACT-97",
) )
reservation = Reservation( reservation = ReservationData(
id=97,
customer_id=97,
unique_id="RES-UTM-TEST", unique_id="RES-UTM-TEST",
start_date=date(2025, 2, 1), start_date=date(2025, 2, 1),
end_date=date(2025, 2, 7), end_date=date(2025, 2, 7),
num_adults=2, num_adults=2,
num_children=0, num_children=0,
children_ages="", children_ages=[],
hotel_code="HOTEL123", hotel_code="HOTEL123",
created_at=datetime.now(UTC), created_at=datetime.now(UTC),
utm_source="facebook", utm_source="facebook",
@@ -439,7 +432,13 @@ class TestEdgeCases:
gclid="", gclid="",
) )
reservation_pairs = [(reservation, customer)] reservation_db = Reservation(
id=97,
customer_id=97,
**reservation.model_dump(exclude_none=True),
)
reservation_pairs = [(reservation_db, customer)]
response = create_res_retrieve_response(reservation_pairs) response = create_res_retrieve_response(reservation_pairs)
config = SerializerConfig(pretty_print=True) config = SerializerConfig(pretty_print=True)
@@ -450,7 +449,6 @@ class TestEdgeCases:
assert response is not None assert response is not None
# UTM parameters should be in comments or other fields # UTM parameters should be in comments or other fields
assert "RES-UTM-TEST" in xml_output
if __name__ == "__main__": if __name__ == "__main__":