housekeeping + async db

This commit is contained in:
Jonas Linter
2025-09-29 12:56:47 +02:00
parent 6688a9a465
commit 8d4ccc4041
7 changed files with 324 additions and 295 deletions

View File

@@ -27,7 +27,7 @@ import urllib.parse
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
_LOGGER = logging.getLogger(__name__)
# HTTP Basic auth for AlpineBits
security_basic = HTTPBasic()
@@ -36,7 +36,7 @@ security_basic = HTTPBasic()
try:
config = load_config()
except Exception as e:
logger.error(f"Failed to load config: {str(e)}")
_LOGGER.error(f"Failed to load config: {str(e)}")
config = {}
@@ -75,7 +75,7 @@ async def process_form_submission(submission_data: Dict[str, Any]) -> None:
Add your business logic here.
"""
try:
logger.info(f"Processing form submission: {submission_data.get('submissionId')}")
_LOGGER.info(f"Processing form submission: {submission_data.get('submissionId')}")
# Example processing - you can replace this with your actual logic
form_name = submission_data.get('formName')
@@ -84,7 +84,7 @@ async def process_form_submission(submission_data: Dict[str, Any]) -> None:
# Extract form fields
form_fields = {k: v for k, v in submission_data.items() if k.startswith('field:')}
logger.info(f"Form: {form_name}, Contact: {contact_email}, Fields: {len(form_fields)}")
_LOGGER.info(f"Form: {form_name}, Contact: {contact_email}, Fields: {len(form_fields)}")
# Here you could:
# - Save to database
@@ -93,7 +93,7 @@ async def process_form_submission(submission_data: Dict[str, Any]) -> None:
# - Process the data further
except Exception as e:
logger.error(f"Error processing form submission: {str(e)}")
_LOGGER.error(f"Error processing form submission: {str(e)}")
@api_router.get("/")
@@ -127,167 +127,173 @@ async def health_check(request: Request):
}
# Extracted business logic for handling Wix form submissions
async def process_wix_form_submission(request: Request, data: Dict[str, Any]):
"""
Shared business logic for handling Wix form submissions (test and production).
"""
timestamp = datetime.now().isoformat()
_LOGGER.info(f"Received Wix form data at {timestamp}")
_LOGGER.info(f"Data keys: {list(data.keys())}")
_LOGGER.info(f"Full data: {json.dumps(data, indent=2)}")
log_entry = {
"timestamp": timestamp,
"client_ip": request.client.host if request.client else "unknown",
"headers": dict(request.headers),
"data": data,
"origin_header": request.headers.get("origin"),
"all_headers": dict(request.headers),
}
logs_dir = "logs"
if not os.path.exists(logs_dir):
os.makedirs(logs_dir, mode=0o755, exist_ok=True)
stat_info = os.stat(logs_dir)
_LOGGER.info(f"Created directory owner: uid:{stat_info.st_uid}, gid:{stat_info.st_gid}")
_LOGGER.info(f"Directory mode: {oct(stat_info.st_mode)[-3:]}")
log_filename = f"{logs_dir}/wix_test_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(log_filename, "w", encoding="utf-8") as f:
json.dump(log_entry, f, indent=2, default=str, ensure_ascii=False)
file_stat = os.stat(log_filename)
_LOGGER.info(f"Created file owner: uid:{file_stat.st_uid}, gid:{file_stat.st_gid}")
_LOGGER.info(f"File mode: {oct(file_stat.st_mode)[-3:]}")
_LOGGER.info(f"Data logged to: {log_filename}")
# save customer and reservation to DB
contact_info = data.get("contact", {})
first_name = contact_info.get("name", {}).get("first")
last_name = contact_info.get("name", {}).get("last")
email = contact_info.get("email")
phone_number = contact_info.get("phones", [{}])[0].get("e164Phone")
locale = contact_info.get("locale", "de-de")
contact_id = contact_info.get("contactId")
name_prefix = data.get("field:anrede")
email_newsletter = data.get("field:form_field_5a7b", "") != "Non selezionato"
address_line = None
city_name = None
postal_code = None
country_code = None
gender = None
birth_date = None
language = data.get("contact", {}).get("locale", "en")[:2]
# Dates
start_date = data.get("field:date_picker_a7c8") or data.get("Anreisedatum") or data.get("submissions", [{}])[1].get("value")
end_date = data.get("field:date_picker_7e65") or data.get("Abreisedatum") or data.get("submissions", [{}])[2].get("value")
# Room/guest info
num_adults = int(data.get("field:number_7cf5") or 2)
num_children = int(data.get("field:anzahl_kinder") or 0)
children_ages = []
if num_children > 0:
for k in data.keys():
if k.startswith("field:alter_kind_"):
try:
age = int(data[k])
children_ages.append(age)
except ValueError:
_LOGGER.warning(f"Invalid age value for {k}: {data[k]}")
offer = data.get("field:angebot_auswaehlen")
# use database session
# Save all relevant data to DB (including new fields)
db_customer = DBCustomer(
given_name=first_name,
surname=last_name,
contact_id=contact_id,
name_prefix=name_prefix,
email_address=email,
phone=phone_number,
email_newsletter=email_newsletter,
address_line=address_line,
city_name=city_name,
postal_code=postal_code,
country_code=country_code,
gender=gender,
birth_date=birth_date,
language=language,
address_catalog=False,
name_title=None,
)
db.add(db_customer)
await db.commit()
await db.refresh(db_customer)
db_reservation = DBReservation(
customer_id=db_customer.id,
form_id=data.get("formId"),
start_date=date.fromisoformat(start_date) if start_date else None,
end_date=date.fromisoformat(end_date) if end_date else None,
num_adults=num_adults,
num_children=num_children,
children_ages=','.join(str(a) for a in children_ages),
offer=offer,
utm_comment=utm_comment,
created_at=datetime.now(timezone.utc),
utm_source=data.get("field:utm_source"),
utm_medium=data.get("field:utm_medium"),
utm_campaign=data.get("field:utm_campaign"),
utm_term=data.get("field:utm_term"),
utm_content=data.get("field:utm_content"),
user_comment=data.get("field:long_answer_3524", ""),
fbclid=data.get("field:fbclid"),
gclid=data.get("field:gclid"),
hotel_code="123",
hotel_name="Frangart Inn",
)
db.add(db_reservation)
await db.commit()
await db.refresh(db_reservation)
return {
"status": "success",
"message": "Wix form data received successfully",
"received_keys": list(data.keys()),
"data_logged_to": log_filename,
"timestamp": timestamp,
"process_info": log_entry["process_info"],
"note": "No authentication required for this endpoint"
}
@api_router.post("/webhook/wix-form")
@webhook_limiter.limit(WEBHOOK_RATE_LIMIT)
async def receive_wix_form(
request: Request,
submission: WixFormSubmission,
background_tasks: BackgroundTasks,
api_key: str = Depends(validate_api_key)
):
async def handle_wix_form(request: Request, data: Dict[str, Any]):
"""
Secure endpoint to receive Wix form submissions via webhook.
Requires:
- Valid API key in Authorization header: Authorization: Bearer your_api_key
- Rate limited to prevent abuse
- Optional: Wix signature validation (configure WIX_WEBHOOK_SECRET env var)
This endpoint accepts POST requests with Wix form data and processes them asynchronously.
Unified endpoint to handle Wix form submissions (test and production).
No authentication required for this endpoint.
"""
try:
logger.info(f"Received form submission: {submission.submissionId} (API key: {api_key})")
# Optional: Validate Wix webhook signature for extra security
wix_secret = os.getenv("WIX_WEBHOOK_SECRET")
if wix_secret:
signature = request.headers.get("X-Wix-Webhook-Signature", "")
body = await request.body()
if not validate_wix_signature(body, signature, wix_secret):
logger.warning("Invalid Wix webhook signature")
raise HTTPException(
status_code=401,
detail="Invalid webhook signature"
)
# Convert to dict for processing
submission_dict = submission.dict()
# Add metadata
submission_dict["_metadata"] = {
"api_key_used": api_key,
"received_at": datetime.now().isoformat(),
"client_ip": request.client.host if request.client else "unknown"
}
# Add background task for processing
background_tasks.add_task(process_form_submission, submission_dict)
# Return immediate response to Wix
return {
"status": "received",
"submissionId": submission.submissionId,
"message": "Form submission received and is being processed",
"timestamp": datetime.now().isoformat()
}
except HTTPException:
# Re-raise HTTP exceptions (auth errors, etc.)
raise
return await process_wix_form_submission(request, data)
except Exception as e:
logger.error(f"Error receiving form submission: {str(e)}")
_LOGGER.error(f"Error in handle_wix_form: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Error processing form submission: {str(e)}"
detail=f"Error processing Wix form data: {str(e)}"
)
@api_router.post("/webhook/wix-form/test")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def test_endpoint(
request: Request,
data: Dict[str, Any]
):
async def handle_wix_form_test(request: Request, data: Dict[str, Any]):
"""
Test endpoint to verify the API is working with raw JSON data.
Useful for testing without strict validation.
No authentication required for testing purposes.
"""
try:
timestamp = datetime.now().isoformat()
# Debug: Check current user context
import pwd
import grp
current_uid = os.getuid()
current_gid = os.getgid()
effective_uid = os.geteuid()
effective_gid = os.getegid()
try:
user_name = pwd.getpwuid(current_uid).pw_name
group_name = grp.getgrgid(current_gid).gr_name
except KeyError:
user_name = f"unknown({current_uid})"
group_name = f"unknown({current_gid})"
logger.info(f"Process running as: {user_name}:{group_name} (uid:{current_uid}, gid:{current_gid})")
logger.info(f"Effective user: uid:{effective_uid}, gid:{effective_gid}")
logger.info(f"Current working directory: {os.getcwd()}")
logger.info(f"Directory permissions: {oct(os.stat('.').st_mode)[-3:]}")
# Log to console
logger.info(f"Received test data at {timestamp}")
logger.info(f"Data keys: {list(data.keys())}")
logger.info(f"Full data: {json.dumps(data, indent=2)}")
# Log to file for detailed inspection
log_entry = {
"timestamp": timestamp,
"client_ip": request.client.host if request.client else "unknown",
"headers": dict(request.headers),
"data": data,
"Cors origins": request.headers.get("origin"),
"process_info": {
"uid": current_uid,
"gid": current_gid,
"effective_uid": effective_uid,
"effective_gid": effective_gid,
"user_name": user_name,
"group_name": group_name,
"cwd": os.getcwd()
}
}
# Create logs directory if it doesn't exist with proper permissions
logs_dir = "logs"
if not os.path.exists(logs_dir):
logger.info(f"Creating logs directory as user {user_name} ({current_uid})")
os.makedirs(logs_dir, mode=0o755, exist_ok=True)
# Check what actually got created
stat_info = os.stat(logs_dir)
logger.info(f"Created directory owner: uid:{stat_info.st_uid}, gid:{stat_info.st_gid}")
logger.info(f"Directory mode: {oct(stat_info.st_mode)[-3:]}")
# Write to file with timestamp
log_filename = f"{logs_dir}/wix_test_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(log_filename, "w", encoding="utf-8") as f:
json.dump(log_entry, f, indent=2, default=str, ensure_ascii=False)
# Check file ownership after creation
file_stat = os.stat(log_filename)
logger.info(f"Created file owner: uid:{file_stat.st_uid}, gid:{file_stat.st_gid}")
logger.info(f"File mode: {oct(file_stat.st_mode)[-3:]}")
logger.info(f"Data logged to: {log_filename}")
return {
"status": "success",
"message": "Test data received successfully",
"received_keys": list(data.keys()),
"data_logged_to": log_filename,
"timestamp": timestamp,
"process_info": log_entry["process_info"],
"note": "No authentication required for this test endpoint"
}
return await process_wix_form_submission(request, data)
except Exception as e:
logger.error(f"Error in test endpoint: {str(e)}")
_LOGGER.error(f"Error in handle_wix_form_test: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Error processing test data: {str(e)}"
@@ -311,7 +317,7 @@ async def generate_new_api_key(
)
new_key = generate_api_key()
logger.info(f"Generated new API key (requested by: {admin_key})")
_LOGGER.info(f"Generated new API key (requested by: {admin_key})")
return {
"status": "success",
@@ -345,7 +351,7 @@ async def validate_basic_auth(credentials: HTTPBasicCredentials = Depends(securi
detail="ERROR: Invalid credentials",
headers={"WWW-Authenticate": "Basic"},
)
logger.info(f"AlpineBits authentication successful for user: {credentials.username} (from config)")
_LOGGER.info(f"AlpineBits authentication successful for user: {credentials.username} (from config)")
return credentials.username
@@ -441,27 +447,27 @@ async def alpinebits_server_handshake(
if not client_protocol_version:
# Server concludes client speaks a protocol version preceding 2013-04
client_protocol_version = "pre-2013-04"
logger.info("No X-AlpineBits-ClientProtocolVersion header found, assuming pre-2013-04")
_LOGGER.info("No X-AlpineBits-ClientProtocolVersion header found, assuming pre-2013-04")
else:
logger.info(f"Client protocol version: {client_protocol_version}")
_LOGGER.info(f"Client protocol version: {client_protocol_version}")
# Optional client ID
client_id = request.headers.get("X-AlpineBits-ClientID")
if client_id:
logger.info(f"Client ID: {client_id}")
_LOGGER.info(f"Client ID: {client_id}")
# Check content encoding
content_encoding = request.headers.get("Content-Encoding")
is_compressed = content_encoding == "gzip"
if is_compressed:
logger.info("Request is gzip compressed")
_LOGGER.info("Request is gzip compressed")
# Get content type before processing
content_type = request.headers.get("Content-Type", "")
logger.info(f"Content-Type: {content_type}")
logger.info(f"Content-Encoding: {content_encoding}")
_LOGGER.info(f"Content-Type: {content_type}")
_LOGGER.info(f"Content-Encoding: {content_encoding}")
# Get request body
body = await request.body()
@@ -511,7 +517,7 @@ async def alpinebits_server_handshake(
status_code=400,
detail="ERROR: Missing required 'action' parameter")
logger.info(f"AlpineBits action: {action}")
_LOGGER.info(f"AlpineBits action: {action}")
# Get optional request XML
@@ -548,7 +554,7 @@ async def alpinebits_server_handshake(
# Re-raise HTTP exceptions (auth errors, etc.)
raise
except Exception as e:
logger.error(f"Error in AlpineBits handshake: {str(e)}")
_LOGGER.error(f"Error in AlpineBits handshake: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Internal server error: {str(e)}"

View File

@@ -1,9 +1,29 @@
from sqlalchemy import create_engine, Column, Integer, String, Date, Boolean, ForeignKey, DateTime
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
from sqlalchemy import Column, Integer, String, Date, Boolean, ForeignKey, DateTime
from sqlalchemy.orm import declarative_base, relationship
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
import os
Base = declarative_base()
# Async SQLAlchemy setup
def get_database_url(config=None):
db_url = None
if config and 'database' in config and 'url' in config['database']:
db_url = config['database']['url']
if not db_url:
db_url = os.environ.get('DATABASE_URL')
if not db_url:
db_url = 'sqlite+aiosqlite:///alpinebits.db'
return db_url
DATABASE_URL = get_database_url()
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
async def get_async_session():
async with AsyncSessionLocal() as session:
yield session
class Customer(Base):
__tablename__ = 'customers'
id = Column(Integer, primary_key=True)
@@ -62,20 +82,4 @@ class HashedCustomer(Base):
redacted_at = Column(DateTime)
def get_engine(config=None):
db_url = None
if config and 'database' in config and 'url' in config['database']:
db_url = config['database']['url']
if not db_url:
db_url = os.environ.get('DATABASE_URL')
if not db_url:
db_url = 'sqlite:///alpinebits.db'
return create_engine(db_url)
def get_session_local(config=None):
engine = get_engine(config)
return sessionmaker(autocommit=False, autoflush=False, bind=engine)
def init_db(config=None):
engine = get_engine(config)
Base.metadata.create_all(bind=engine)

View File

@@ -19,14 +19,17 @@ from .simplified_access import (
)
# DB and config
from .db import Customer as DBCustomer, Reservation as DBReservation, HashedCustomer, get_session_local, init_db
from .db import Customer as DBCustomer, Reservation as DBReservation, HashedCustomer, get_async_session
from .config_loader import load_config
import hashlib
import json
import os
import asyncio
def main():
from alpine_bits_python import db
async def main():
print("🚀 Starting AlpineBits XML generation script...")
# Load config (yaml, annotatedyaml)
@@ -38,141 +41,139 @@ def main():
# Ensure SQLite DB file exists if using SQLite
db_url = config.get('database', {}).get('url', '')
if db_url.startswith('sqlite:///'):
db_path = db_url.replace('sqlite:///', '')
if db_url.startswith('sqlite+aiosqlite:///'):
db_path = db_url.replace('sqlite+aiosqlite:///', '')
db_path = os.path.abspath(db_path)
db_dir = os.path.dirname(db_path)
if not os.path.exists(db_dir):
os.makedirs(db_dir, exist_ok=True)
# The DB file will be created by SQLAlchemy if it doesn't exist, but ensure directory exists
# for now we delete the existing DB for clean testing
if os.path.exists(db_path):
os.remove(db_path)
print(f"Deleted existing SQLite DB at {db_path} for clean testing.")
# Init DB
init_db(config)
print("📦 Database initialized/ready.")
SessionLocal = get_session_local(config)
db = SessionLocal()
# # Ensure DB schema is created (async)
from .db import engine, Base
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async for db in get_async_session():
# Load data from JSON file
json_path = os.path.join(os.path.dirname(__file__), '../../test_data/wix_test_data_20250928_132611.json')
with open(json_path, 'r', encoding='utf-8') as f:
wix_data = json.load(f)
data = wix_data["data"]["data"]
# Load data from JSON file
json_path = os.path.join(os.path.dirname(__file__), '../../test_data/wix_test_data_20250928_132611.json')
with open(json_path, 'r', encoding='utf-8') as f:
wix_data = json.load(f)
data = wix_data["data"]["data"]
contact_info = data.get("contact", {})
first_name = contact_info.get("name", {}).get("first")
last_name = contact_info.get("name", {}).get("last")
email = contact_info.get("email")
phone_number = contact_info.get("phones", [{}])[0].get("e164Phone")
locale = contact_info.get("locale", "de-de")
contact_id = contact_info.get("contactId")
contact_info = data.get("contact", {})
first_name = contact_info.get("name", {}).get("first")
last_name = contact_info.get("name", {}).get("last")
email = contact_info.get("email")
phone_number = contact_info.get("phones", [{}])[0].get("e164Phone")
locale = contact_info.get("locale", "de-de")
contact_id = contact_info.get("contactId")
name_prefix = data.get("field:anrede")
email_newsletter = data.get("field:form_field_5a7b", "") != "Non selezionato"
address_line = None
city_name = None
postal_code = None
country_code = None
gender = None
birth_date = None
language = data.get("contact", {}).get("locale", "en")[:2]
name_prefix = data.get("field:anrede")
email_newsletter = data.get("field:form_field_5a7b", "") != "Non selezionato"
address_line = None
city_name = None
postal_code = None
country_code = None
gender = None
birth_date = None
language = data.get("contact", {}).get("locale", "en")[:2]
# Dates
start_date = data.get("field:date_picker_a7c8") or data.get("Anreisedatum") or data.get("submissions", [{}])[1].get("value")
end_date = data.get("field:date_picker_7e65") or data.get("Abreisedatum") or data.get("submissions", [{}])[2].get("value")
# Dates
start_date = data.get("field:date_picker_a7c8") or data.get("Anreisedatum") or data.get("submissions", [{}])[1].get("value")
end_date = data.get("field:date_picker_7e65") or data.get("Abreisedatum") or data.get("submissions", [{}])[2].get("value")
# Room/guest info
num_adults = int(data.get("field:number_7cf5") or 2)
num_children = int(data.get("field:anzahl_kinder") or 0)
children_ages = []
if num_children > 0:
for k in data.keys():
if k.startswith("field:alter_kind_"):
try:
age = int(data[k])
children_ages.append(age)
except ValueError:
logging.warning(f"Invalid age value for {k}: {data[k]}")
# UTM and offer
utm_fields = [
("utm_Source", "utm_source"),
("utm_Medium", "utm_medium"),
("utm_Campaign", "utm_campaign"),
("utm_Term", "utm_term"),
("utm_Content", "utm_content"),
]
utm_comment_text = []
for label, field in utm_fields:
val = data.get(f"field:{field}") or data.get(label)
if val:
utm_comment_text.append(f"{label}: {val}")
utm_comment = " | ".join(utm_comment_text) if utm_comment_text else None
offer = data.get("field:angebot_auswaehlen")
# Room/guest info
num_adults = int(data.get("field:number_7cf5") or 2)
num_children = int(data.get("field:anzahl_kinder") or 0)
children_ages = []
if num_children > 0:
for k in data.keys():
if k.startswith("field:alter_kind_"):
try:
age = int(data[k])
children_ages.append(age)
except Exception:
pass
# Save all relevant data to DB (including new fields)
db_customer = DBCustomer(
given_name=first_name,
surname=last_name,
contact_id=contact_id,
name_prefix=name_prefix,
email_address=email,
phone=phone_number,
email_newsletter=email_newsletter,
address_line=address_line,
city_name=city_name,
postal_code=postal_code,
country_code=country_code,
gender=gender,
birth_date=birth_date,
language=language,
address_catalog=False,
name_title=None,
)
db.add(db_customer)
await db.commit()
await db.refresh(db_customer)
# UTM and offer
utm_fields = [
("utm_Source", "utm_source"),
("utm_Medium", "utm_medium"),
("utm_Campaign", "utm_campaign"),
("utm_Term", "utm_term"),
("utm_Content", "utm_content"),
]
utm_comment_text = []
for label, field in utm_fields:
val = data.get(f"field:{field}") or data.get(label)
if val:
utm_comment_text.append(f"{label}: {val}")
utm_comment = " | ".join(utm_comment_text) if utm_comment_text else None
offer = data.get("field:angebot_auswaehlen")
db_reservation = DBReservation(
customer_id=db_customer.id,
form_id=data.get("formId"),
start_date=date.fromisoformat(start_date) if start_date else None,
end_date=date.fromisoformat(end_date) if end_date else None,
num_adults=num_adults,
num_children=num_children,
children_ages=','.join(str(a) for a in children_ages),
offer=offer,
utm_comment=utm_comment,
created_at=datetime.now(timezone.utc),
utm_source=data.get("field:utm_source"),
utm_medium=data.get("field:utm_medium"),
utm_campaign=data.get("field:utm_campaign"),
utm_term=data.get("field:utm_term"),
utm_content=data.get("field:utm_content"),
user_comment=data.get("field:long_answer_3524", ""),
fbclid=data.get("field:fbclid"),
gclid=data.get("field:gclid"),
hotel_code="123",
hotel_name="Frangart Inn",
)
db.add(db_reservation)
await db.commit()
await db.refresh(db_reservation)
# Save all relevant data to DB (including new fields)
db_customer = DBCustomer(
given_name=first_name,
surname=last_name,
contact_id=contact_id,
name_prefix=name_prefix,
email_address=email,
phone=phone_number,
email_newsletter=email_newsletter,
address_line=address_line,
city_name=city_name,
postal_code=postal_code,
country_code=country_code,
gender=gender,
birth_date=birth_date,
language=language,
address_catalog=False,
name_title=None,
)
db.add(db_customer)
db.commit()
db.refresh(db_customer)
# Now read back from DB
customer = await db.get(DBCustomer, db_reservation.customer_id)
reservation = await db.get(DBReservation, db_reservation.id)
db_reservation = DBReservation(
customer_id=db_customer.id,
form_id=data.get("formId"),
start_date=date.fromisoformat(start_date) if start_date else None,
end_date=date.fromisoformat(end_date) if end_date else None,
num_adults=num_adults,
num_children=num_children,
children_ages=','.join(str(a) for a in children_ages),
offer=offer,
utm_comment=utm_comment,
created_at=datetime.now(timezone.utc),
utm_source=data.get("field:utm_source"),
utm_medium=data.get("field:utm_medium"),
utm_campaign=data.get("field:utm_campaign"),
utm_term=data.get("field:utm_term"),
utm_content=data.get("field:utm_content"),
user_comment=data.get("field:long_answer_3524", ""),
fbclid=data.get("field:fbclid"),
gclid=data.get("field:gclid"),
hotel_code="123",
hotel_name="Frangart Inn",
)
db.add(db_reservation)
db.commit()
db.refresh(db_reservation)
# Generate XML from DB data
create_xml_from_db(customer, reservation)
# Now read back from DB
customer = db.query(DBCustomer).filter_by(id=db_reservation.customer_id).first()
reservation = db.query(DBReservation).filter_by(id=db_reservation.id).first()
# Generate XML from DB data
create_xml_from_db(customer, reservation)
db.close()
await db.close()
def create_xml_from_db(customer: DBCustomer, reservation: DBReservation):
@@ -314,3 +315,6 @@ def create_xml_from_db(customer: DBCustomer, reservation: DBReservation):
except Exception as e:
print(f"❌ Validation/Serialization failed: {e}")
if __name__ == "__main__":
asyncio.run(main())