367 lines
13 KiB
Python
367 lines
13 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
from datetime import UTC, date, datetime
|
|
|
|
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
|
|
|
|
from .alpine_bits_helpers import (
|
|
AlpineBitsFactory,
|
|
CommentData,
|
|
CommentListItemData,
|
|
CommentsData,
|
|
CustomerData,
|
|
GuestCountsFactory,
|
|
HotelReservationIdData,
|
|
OtaMessageType,
|
|
PhoneTechType,
|
|
)
|
|
from .config_loader import load_config
|
|
|
|
# DB and config
|
|
from .db import (
|
|
Base,
|
|
get_database_url,
|
|
)
|
|
from .db import (
|
|
Customer as DBCustomer,
|
|
)
|
|
from .db import (
|
|
Reservation as DBReservation,
|
|
)
|
|
from .generated import alpinebits as ab
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
async def setup_db(config):
|
|
DATABASE_URL = get_database_url(config)
|
|
engine = create_async_engine(DATABASE_URL, echo=True)
|
|
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
|
|
|
|
# Create tables
|
|
async with engine.begin() as conn:
|
|
await conn.run_sync(Base.metadata.create_all)
|
|
_LOGGER.info("Database tables checked/created at startup.")
|
|
|
|
return engine, AsyncSessionLocal
|
|
|
|
|
|
async def main():
|
|
print("🚀 Starting AlpineBits XML generation script...")
|
|
# Load config (yaml, annotatedyaml)
|
|
config = load_config()
|
|
|
|
# print config for debugging
|
|
print("Loaded configuration:")
|
|
print(json.dumps(config, indent=2))
|
|
|
|
# Ensure SQLite DB file exists if using SQLite
|
|
db_url = config.get("database", {}).get("url", "")
|
|
if db_url.startswith("sqlite+aiosqlite:///"):
|
|
db_path = db_url.replace("sqlite+aiosqlite:///", "")
|
|
db_path = os.path.abspath(db_path)
|
|
db_dir = os.path.dirname(db_path)
|
|
if not os.path.exists(db_dir):
|
|
os.makedirs(db_dir, exist_ok=True)
|
|
# for now we delete the existing DB for clean testing
|
|
if os.path.exists(db_path):
|
|
os.remove(db_path)
|
|
print(f"Deleted existing SQLite DB at {db_path} for clean testing.")
|
|
|
|
# # Ensure DB schema is created (async)
|
|
|
|
engine, AsyncSessionLocal = await setup_db(config)
|
|
|
|
async with engine.begin() as conn:
|
|
await conn.run_sync(Base.metadata.create_all)
|
|
|
|
async with AsyncSessionLocal() as db:
|
|
# Load data from JSON file
|
|
json_path = os.path.join(
|
|
os.path.dirname(__file__),
|
|
"../../test_data/wix_test_data_20250928_132611.json",
|
|
)
|
|
with open(json_path, encoding="utf-8") as f:
|
|
wix_data = json.load(f)
|
|
data = wix_data["data"]["data"]
|
|
|
|
contact_info = data.get("contact", {})
|
|
first_name = contact_info.get("name", {}).get("first")
|
|
last_name = contact_info.get("name", {}).get("last")
|
|
email = contact_info.get("email")
|
|
phone_number = contact_info.get("phones", [{}])[0].get("e164Phone")
|
|
locale = contact_info.get("locale", "de-de")
|
|
contact_id = contact_info.get("contactId")
|
|
|
|
name_prefix = data.get("field:anrede")
|
|
email_newsletter = data.get("field:form_field_5a7b", "") != "Non selezionato"
|
|
address_line = None
|
|
city_name = None
|
|
postal_code = None
|
|
country_code = None
|
|
gender = None
|
|
birth_date = None
|
|
language = data.get("contact", {}).get("locale", "en")[:2]
|
|
|
|
# Dates
|
|
start_date = (
|
|
data.get("field:date_picker_a7c8")
|
|
or data.get("Anreisedatum")
|
|
or data.get("submissions", [{}])[1].get("value")
|
|
)
|
|
end_date = (
|
|
data.get("field:date_picker_7e65")
|
|
or data.get("Abreisedatum")
|
|
or data.get("submissions", [{}])[2].get("value")
|
|
)
|
|
|
|
# Room/guest info
|
|
num_adults = int(data.get("field:number_7cf5") or 2)
|
|
num_children = int(data.get("field:anzahl_kinder") or 0)
|
|
children_ages = []
|
|
if num_children > 0:
|
|
for k in data.keys():
|
|
if k.startswith("field:alter_kind_"):
|
|
try:
|
|
age = int(data[k])
|
|
children_ages.append(age)
|
|
except ValueError:
|
|
logging.warning(f"Invalid age value for {k}: {data[k]}")
|
|
|
|
# UTM and offer
|
|
utm_fields = [
|
|
("utm_Source", "utm_source"),
|
|
("utm_Medium", "utm_medium"),
|
|
("utm_Campaign", "utm_campaign"),
|
|
("utm_Term", "utm_term"),
|
|
("utm_Content", "utm_content"),
|
|
]
|
|
utm_comment_text = []
|
|
for label, field in utm_fields:
|
|
val = data.get(f"field:{field}") or data.get(label)
|
|
if val:
|
|
utm_comment_text.append(f"{label}: {val}")
|
|
utm_comment = " | ".join(utm_comment_text) if utm_comment_text else None
|
|
offer = data.get("field:angebot_auswaehlen")
|
|
|
|
# Save all relevant data to DB (including new fields)
|
|
db_customer = DBCustomer(
|
|
given_name=first_name,
|
|
surname=last_name,
|
|
contact_id=contact_id,
|
|
name_prefix=name_prefix,
|
|
email_address=email,
|
|
phone=phone_number,
|
|
email_newsletter=email_newsletter,
|
|
address_line=address_line,
|
|
city_name=city_name,
|
|
postal_code=postal_code,
|
|
country_code=country_code,
|
|
gender=gender,
|
|
birth_date=birth_date,
|
|
language=language,
|
|
address_catalog=False,
|
|
name_title=None,
|
|
)
|
|
db.add(db_customer)
|
|
await db.commit()
|
|
await db.refresh(db_customer)
|
|
|
|
db_reservation = DBReservation(
|
|
customer_id=db_customer.id,
|
|
form_id=data.get("submissionId"),
|
|
start_date=date.fromisoformat(start_date) if start_date else None,
|
|
end_date=date.fromisoformat(end_date) if end_date else None,
|
|
num_adults=num_adults,
|
|
num_children=num_children,
|
|
children_ages=",".join(str(a) for a in children_ages),
|
|
offer=offer,
|
|
utm_comment=utm_comment,
|
|
created_at=datetime.now(UTC),
|
|
utm_source=data.get("field:utm_source"),
|
|
utm_medium=data.get("field:utm_medium"),
|
|
utm_campaign=data.get("field:utm_campaign"),
|
|
utm_term=data.get("field:utm_term"),
|
|
utm_content=data.get("field:utm_content"),
|
|
user_comment=data.get("field:long_answer_3524", ""),
|
|
fbclid=data.get("field:fbclid"),
|
|
gclid=data.get("field:gclid"),
|
|
hotel_code="123",
|
|
hotel_name="Frangart Inn",
|
|
)
|
|
db.add(db_reservation)
|
|
await db.commit()
|
|
await db.refresh(db_reservation)
|
|
|
|
# Now read back from DB
|
|
customer = await db.get(DBCustomer, db_reservation.customer_id)
|
|
reservation = await db.get(DBReservation, db_reservation.id)
|
|
|
|
# Generate XML from DB data
|
|
create_xml_from_db(customer, reservation)
|
|
|
|
await db.close()
|
|
|
|
|
|
def create_xml_from_db(customer: DBCustomer, reservation: DBReservation):
|
|
# Prepare data for XML
|
|
phone_numbers = [(customer.phone, PhoneTechType.MOBILE)] if customer.phone else []
|
|
customer_data = CustomerData(
|
|
given_name=customer.given_name,
|
|
surname=customer.surname,
|
|
name_prefix=customer.name_prefix,
|
|
name_title=customer.name_title,
|
|
phone_numbers=phone_numbers,
|
|
email_address=customer.email_address,
|
|
email_newsletter=customer.email_newsletter,
|
|
address_line=customer.address_line,
|
|
city_name=customer.city_name,
|
|
postal_code=customer.postal_code,
|
|
country_code=customer.country_code,
|
|
address_catalog=customer.address_catalog,
|
|
gender=customer.gender,
|
|
birth_date=customer.birth_date,
|
|
language=customer.language,
|
|
)
|
|
alpine_bits_factory = AlpineBitsFactory()
|
|
res_guests = alpine_bits_factory.create_res_guests(
|
|
customer_data, OtaMessageType.RETRIEVE
|
|
)
|
|
|
|
# Guest counts
|
|
children_ages = [int(a) for a in reservation.children_ages.split(",") if a]
|
|
guest_counts = GuestCountsFactory.create_retrieve_guest_counts(
|
|
reservation.num_adults, children_ages
|
|
)
|
|
|
|
# UniqueID
|
|
unique_id = ab.OtaResRetrieveRs.ReservationsList.HotelReservation.UniqueId(
|
|
type_value=ab.UniqueIdType2.VALUE_14, id=reservation.unique_id
|
|
)
|
|
|
|
# TimeSpan
|
|
time_span = ab.OtaResRetrieveRs.ReservationsList.HotelReservation.RoomStays.RoomStay.TimeSpan(
|
|
start=reservation.start_date.isoformat() if reservation.start_date else None,
|
|
end=reservation.end_date.isoformat() if reservation.end_date else None,
|
|
)
|
|
room_stay = (
|
|
ab.OtaResRetrieveRs.ReservationsList.HotelReservation.RoomStays.RoomStay(
|
|
time_span=time_span,
|
|
guest_counts=guest_counts,
|
|
)
|
|
)
|
|
room_stays = ab.OtaResRetrieveRs.ReservationsList.HotelReservation.RoomStays(
|
|
room_stay=[room_stay],
|
|
)
|
|
|
|
# HotelReservationId
|
|
hotel_res_id_data = HotelReservationIdData(
|
|
res_id_type="13",
|
|
res_id_value=reservation.fbclid or reservation.gclid,
|
|
res_id_source=None,
|
|
res_id_source_context="99tales",
|
|
)
|
|
hotel_res_id = alpine_bits_factory.create(
|
|
hotel_res_id_data, OtaMessageType.RETRIEVE
|
|
)
|
|
hotel_res_ids = ab.OtaResRetrieveRs.ReservationsList.HotelReservation.ResGlobalInfo.HotelReservationIds(
|
|
hotel_reservation_id=[hotel_res_id]
|
|
)
|
|
basic_property_info = ab.OtaResRetrieveRs.ReservationsList.HotelReservation.ResGlobalInfo.BasicPropertyInfo(
|
|
hotel_code=reservation.hotel_code,
|
|
hotel_name=reservation.hotel_name,
|
|
)
|
|
|
|
# Comments
|
|
offer_comment = CommentData(
|
|
name=ab.CommentName2.ADDITIONAL_INFO,
|
|
text="Angebot/Offerta",
|
|
list_items=[
|
|
CommentListItemData(
|
|
value=reservation.offer,
|
|
language=customer.language,
|
|
list_item="1",
|
|
)
|
|
],
|
|
)
|
|
comment = None
|
|
if reservation.user_comment:
|
|
comment = CommentData(
|
|
name=ab.CommentName2.CUSTOMER_COMMENT,
|
|
text=reservation.user_comment,
|
|
list_items=[
|
|
CommentListItemData(
|
|
value="Landing page comment",
|
|
language=customer.language,
|
|
list_item="1",
|
|
)
|
|
],
|
|
)
|
|
comments = [offer_comment, comment] if comment else [offer_comment]
|
|
comments_data = CommentsData(comments=comments)
|
|
comments_xml = alpine_bits_factory.create(comments_data, OtaMessageType.RETRIEVE)
|
|
|
|
res_global_info = (
|
|
ab.OtaResRetrieveRs.ReservationsList.HotelReservation.ResGlobalInfo(
|
|
hotel_reservation_ids=hotel_res_ids,
|
|
basic_property_info=basic_property_info,
|
|
comments=comments_xml,
|
|
)
|
|
)
|
|
|
|
hotel_reservation = ab.OtaResRetrieveRs.ReservationsList.HotelReservation(
|
|
create_date_time=datetime.now(UTC).isoformat(),
|
|
res_status=ab.HotelReservationResStatus.REQUESTED,
|
|
room_stay_reservation="true",
|
|
unique_id=unique_id,
|
|
room_stays=room_stays,
|
|
res_guests=res_guests,
|
|
res_global_info=res_global_info,
|
|
)
|
|
reservations_list = ab.OtaResRetrieveRs.ReservationsList(
|
|
hotel_reservation=[hotel_reservation]
|
|
)
|
|
ota_res_retrieve_rs = ab.OtaResRetrieveRs(
|
|
version="7.000", success=None, reservations_list=reservations_list
|
|
)
|
|
|
|
# Serialize to XML
|
|
try:
|
|
ota_res_retrieve_rs.model_validate(ota_res_retrieve_rs.model_dump())
|
|
print("✅ Pydantic validation successful!")
|
|
from xsdata.formats.dataclass.serializers.config import SerializerConfig
|
|
from xsdata_pydantic.bindings import XmlSerializer
|
|
|
|
config = SerializerConfig(
|
|
pretty_print=True, xml_declaration=True, encoding="UTF-8"
|
|
)
|
|
serializer = XmlSerializer(config=config)
|
|
ns_map = {None: "http://www.opentravel.org/OTA/2003/05"}
|
|
xml_string = serializer.render(ota_res_retrieve_rs, ns_map=ns_map)
|
|
with open("output.xml", "w", encoding="utf-8") as outfile:
|
|
outfile.write(xml_string)
|
|
print("✅ XML serialization successful!")
|
|
print("Generated XML written to output.xml")
|
|
print("\n📄 Generated XML:")
|
|
print(xml_string)
|
|
from xsdata_pydantic.bindings import XmlParser
|
|
|
|
parser = XmlParser()
|
|
with open("output.xml", encoding="utf-8") as infile:
|
|
xml_content = infile.read()
|
|
parsed_result = parser.from_string(xml_content, ab.OtaResRetrieveRs)
|
|
print("✅ Round-trip validation successful!")
|
|
print(
|
|
f"Parsed reservation status: {parsed_result.reservations_list.hotel_reservation[0].res_status}"
|
|
)
|
|
except Exception as e:
|
|
print(f"❌ Validation/Serialization failed: {e}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|