notif_report #3
@@ -6,8 +6,8 @@ database:
|
|||||||
# url: "postgresql://user:password@host:port/dbname" # Example for Postgres
|
# url: "postgresql://user:password@host:port/dbname" # Example for Postgres
|
||||||
|
|
||||||
alpine_bits_auth:
|
alpine_bits_auth:
|
||||||
- hotel_id: "123"
|
- hotel_id: "12345"
|
||||||
hotel_name: "Frangart Inn"
|
hotel_name: "Bemelmans Post"
|
||||||
username: "alice"
|
username: "alice"
|
||||||
password: !secret ALICE_PASSWORD
|
password: !secret ALICE_PASSWORD
|
||||||
push_endpoint:
|
push_endpoint:
|
||||||
|
|||||||
@@ -864,7 +864,7 @@ def _process_single_reservation(reservation: Reservation, customer: Customer, me
|
|||||||
|
|
||||||
comments_data = CommentsData(comments=comments)
|
comments_data = CommentsData(comments=comments)
|
||||||
comments_xml = alpine_bits_factory.create(
|
comments_xml = alpine_bits_factory.create(
|
||||||
comments_data, OtaMessageType.RETRIEVE
|
comments_data, message_type
|
||||||
)
|
)
|
||||||
|
|
||||||
res_global_info = (
|
res_global_info = (
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ from xml.etree import ElementTree as ET
|
|||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from enum import Enum, IntEnum
|
from enum import Enum, IntEnum
|
||||||
|
|
||||||
from alpine_bits_python.alpine_bits_helpers import PhoneTechType, create_res_retrieve_response
|
from alpine_bits_python.alpine_bits_helpers import PhoneTechType, create_res_notif_push_message, create_res_retrieve_response
|
||||||
|
|
||||||
|
|
||||||
from .generated.alpinebits import OtaNotifReportRq, OtaNotifReportRs, OtaPingRq, OtaPingRs, WarningStatus, OtaReadRq
|
from .generated.alpinebits import OtaNotifReportRq, OtaNotifReportRs, OtaPingRq, OtaPingRs, WarningStatus, OtaReadRq
|
||||||
@@ -658,7 +658,7 @@ class PushAction(AlpineBitsAction):
|
|||||||
async def handle(
|
async def handle(
|
||||||
self,
|
self,
|
||||||
action: str,
|
action: str,
|
||||||
request_xml: str,
|
request_xml: Tuple[Reservation, Customer],
|
||||||
version: Version,
|
version: Version,
|
||||||
client_info: AlpineBitsClientInfo,
|
client_info: AlpineBitsClientInfo,
|
||||||
dbsession=None,
|
dbsession=None,
|
||||||
@@ -666,7 +666,18 @@ class PushAction(AlpineBitsAction):
|
|||||||
) -> AlpineBitsResponse:
|
) -> AlpineBitsResponse:
|
||||||
"""Create push request XML."""
|
"""Create push request XML."""
|
||||||
|
|
||||||
pass
|
xml_push_request = create_res_notif_push_message(request_xml)
|
||||||
|
|
||||||
|
|
||||||
|
config = SerializerConfig(
|
||||||
|
pretty_print=True, xml_declaration=True, encoding="UTF-8"
|
||||||
|
)
|
||||||
|
serializer = XmlSerializer(config=config)
|
||||||
|
xml_push_request = serializer.render(
|
||||||
|
xml_push_request, ns_map={None: "http://www.opentravel.org/OTA/2003/05"}
|
||||||
|
)
|
||||||
|
|
||||||
|
return AlpineBitsResponse(xml_push_request, HttpStatusCode.OK)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@@ -703,7 +714,7 @@ class AlpineBitsServer:
|
|||||||
async def handle_request(
|
async def handle_request(
|
||||||
self,
|
self,
|
||||||
request_action_name: str,
|
request_action_name: str,
|
||||||
request_xml: str,
|
request_xml: str | Tuple[Reservation, Customer],
|
||||||
client_info: AlpineBitsClientInfo,
|
client_info: AlpineBitsClientInfo,
|
||||||
version: str = "2024-10",
|
version: str = "2024-10",
|
||||||
dbsession=None,
|
dbsession=None,
|
||||||
@@ -713,7 +724,7 @@ class AlpineBitsServer:
|
|||||||
|
|
||||||
Args:
|
Args:
|
||||||
request_action_name: The action name from the request (e.g., "OTA_Read:GuestRequests")
|
request_action_name: The action name from the request (e.g., "OTA_Read:GuestRequests")
|
||||||
request_xml: The XML request body
|
request_xml: The XML request body. Gets passed to the action handler. In case of PushRequest can be the data to be pushed
|
||||||
version: The AlpineBits version (defaults to "2024-10")
|
version: The AlpineBits version (defaults to "2024-10")
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
@@ -757,11 +768,26 @@ class AlpineBitsServer:
|
|||||||
# Handle the request
|
# Handle the request
|
||||||
try:
|
try:
|
||||||
# Special case for ping action - pass server capabilities
|
# Special case for ping action - pass server capabilities
|
||||||
|
|
||||||
|
if action_enum == AlpineBitsActionName.OTA_HOTEL_RES_NOTIF_GUEST_REQUESTS:
|
||||||
|
|
||||||
|
action_instance: PushAction
|
||||||
|
if request_xml is None or not isinstance(request_xml, tuple):
|
||||||
|
return AlpineBitsResponse(
|
||||||
|
f"Error: Invalid data for push request",
|
||||||
|
HttpStatusCode.BAD_REQUEST,
|
||||||
|
)
|
||||||
|
return await action_instance.handle(
|
||||||
|
action=request_action_name, request_xml=request_xml, version=version_enum, client_info=client_info
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
if action_enum == AlpineBitsActionName.OTA_PING:
|
if action_enum == AlpineBitsActionName.OTA_PING:
|
||||||
return await action_instance.handle(
|
return await action_instance.handle(
|
||||||
action=request_action_name, request_xml=request_xml, version=version_enum, server_capabilities=self.capabilities, client_info=client_info
|
action=request_action_name, request_xml=request_xml, version=version_enum, server_capabilities=self.capabilities, client_info=client_info
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
|
|
||||||
return await action_instance.handle(
|
return await action_instance.handle(
|
||||||
action=request_action_name,
|
action=request_action_name,
|
||||||
request_xml=request_xml,
|
request_xml=request_xml,
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ import json
|
|||||||
import os
|
import os
|
||||||
import gzip
|
import gzip
|
||||||
import xml.etree.ElementTree as ET
|
import xml.etree.ElementTree as ET
|
||||||
from .alpinebits_server import AlpineBitsClientInfo, AlpineBitsServer, Version
|
from .alpinebits_server import AlpineBitsClientInfo, AlpineBitsServer, Version, AlpineBitsActionName
|
||||||
import urllib.parse
|
import urllib.parse
|
||||||
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
|
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
|
||||||
from functools import partial
|
from functools import partial
|
||||||
@@ -56,15 +56,28 @@ security_basic = HTTPBasic()
|
|||||||
|
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
|
||||||
# --- Simple event dispatcher ---
|
# --- Enhanced event dispatcher with hotel-specific routing ---
|
||||||
class EventDispatcher:
|
class EventDispatcher:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.listeners = defaultdict(list)
|
self.listeners = defaultdict(list)
|
||||||
|
self.hotel_listeners = defaultdict(list) # hotel_code -> list of listeners
|
||||||
|
|
||||||
def register(self, event_name, func):
|
def register(self, event_name, func):
|
||||||
self.listeners[event_name].append(func)
|
self.listeners[event_name].append(func)
|
||||||
|
|
||||||
|
def register_hotel_listener(self, event_name, hotel_code, func):
|
||||||
|
"""Register a listener for a specific hotel"""
|
||||||
|
self.hotel_listeners[f"{event_name}:{hotel_code}"].append(func)
|
||||||
|
|
||||||
async def dispatch(self, event_name, *args, **kwargs):
|
async def dispatch(self, event_name, *args, **kwargs):
|
||||||
for func in self.listeners[event_name]:
|
for func in self.listeners[event_name]:
|
||||||
await func(*args, **kwargs)
|
await func(*args, **kwargs)
|
||||||
|
|
||||||
|
async def dispatch_for_hotel(self, event_name, hotel_code, *args, **kwargs):
|
||||||
|
"""Dispatch event only to listeners registered for specific hotel"""
|
||||||
|
key = f"{event_name}:{hotel_code}"
|
||||||
|
for func in self.hotel_listeners[key]:
|
||||||
|
await func(*args, **kwargs)
|
||||||
|
|
||||||
event_dispatcher = EventDispatcher()
|
event_dispatcher = EventDispatcher()
|
||||||
|
|
||||||
@@ -72,34 +85,56 @@ event_dispatcher = EventDispatcher()
|
|||||||
|
|
||||||
|
|
||||||
async def push_listener(customer: DBCustomer, reservation: DBReservation, hotel):
|
async def push_listener(customer: DBCustomer, reservation: DBReservation, hotel):
|
||||||
|
"""
|
||||||
|
Push listener that sends reservation data to hotel's push endpoint.
|
||||||
|
Only called for reservations that match this hotel's hotel_id.
|
||||||
|
"""
|
||||||
push_endpoint = hotel.get("push_endpoint")
|
push_endpoint = hotel.get("push_endpoint")
|
||||||
|
if not push_endpoint:
|
||||||
|
_LOGGER.warning(f"No push endpoint configured for hotel {hotel.get('hotel_id')}")
|
||||||
|
return
|
||||||
|
|
||||||
server: AlpineBitsServer = app.state.alpine_bits_server
|
server: AlpineBitsServer = app.state.alpine_bits_server
|
||||||
|
|
||||||
hotel_id = hotel['hotel_id']
|
hotel_id = hotel['hotel_id']
|
||||||
reservation_hotel_id = reservation.hotel_code
|
reservation_hotel_id = reservation.hotel_code
|
||||||
|
|
||||||
|
# Double-check hotel matching (should be guaranteed by dispatcher)
|
||||||
|
if hotel_id != reservation_hotel_id:
|
||||||
|
_LOGGER.warning(f"Hotel ID mismatch: listener for {hotel_id}, reservation for {reservation_hotel_id}")
|
||||||
|
return
|
||||||
|
|
||||||
action = "OTA_HotelResNotifRQ"
|
_LOGGER.info(f"Processing push notification for hotel {hotel_id}, reservation {reservation.unique_id}")
|
||||||
|
|
||||||
# request = server.handle_request(
|
# Prepare payload for push notification
|
||||||
# action,)
|
|
||||||
|
|
||||||
|
|
||||||
|
request = await server.handle_request(request_action_name=AlpineBitsActionName.OTA_HOTEL_RES_NOTIF_GUEST_REQUESTS.request_name, request_xml=(reservation, customer), client_info=None, version=Version.V2024_10)
|
||||||
|
|
||||||
|
if request.status_code != 200:
|
||||||
|
_LOGGER.error(f"Failed to generate push request for hotel {hotel_id}, reservation {reservation.unique_id}: {request.xml_content}")
|
||||||
|
return
|
||||||
|
|
||||||
|
print(request.xml_content)
|
||||||
|
# TODO: Generate AlpineBits OTA_HotelResNotifRQ request
|
||||||
|
# action = "OTA_HotelResNotifRQ"
|
||||||
|
# request = server.handle_request(action, ...)
|
||||||
|
|
||||||
|
print(f"--- Push Payload --- received. Sending to endpoint., hotelid {hotel_id}, reservation {reservation.unique_id}")
|
||||||
|
return
|
||||||
|
|
||||||
headers = {"Authorization": f"Bearer {push_endpoint.get('token','')}"} if push_endpoint.get('token') else {}
|
headers = {"Authorization": f"Bearer {push_endpoint.get('token','')}"} if push_endpoint.get('token') else {}
|
||||||
|
""
|
||||||
try:
|
try:
|
||||||
async with httpx.AsyncClient() as client:
|
async with httpx.AsyncClient() as client:
|
||||||
resp = await client.post(push_endpoint["url"], json=payload, headers=headers, timeout=10)
|
resp = await client.post(push_endpoint["url"], json=payload, headers=headers, timeout=10)
|
||||||
_LOGGER.info(f"Push event fired to {push_endpoint['url']} for hotel {hotel['hotel_id']}, status: {resp.status_code}")
|
_LOGGER.info(f"Push event fired to {push_endpoint['url']} for hotel {hotel['hotel_id']}, status: {resp.status_code}")
|
||||||
|
|
||||||
|
if resp.status_code not in [200, 201, 202]:
|
||||||
|
_LOGGER.warning(f"Push endpoint returned non-success status {resp.status_code}: {resp.text}")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
_LOGGER.error(f"Push event failed for hotel {hotel['hotel_id']}: {e}")
|
_LOGGER.error(f"Push event failed for hotel {hotel['hotel_id']}: {e}")
|
||||||
|
# Optionally implement retry logic here@asynccontextmanager
|
||||||
@asynccontextmanager
|
|
||||||
async def lifespan(app: FastAPI):
|
async def lifespan(app: FastAPI):
|
||||||
# Setup DB
|
# Setup DB
|
||||||
|
|
||||||
@@ -123,9 +158,20 @@ async def lifespan(app: FastAPI):
|
|||||||
# Register push listeners for hotels with push_endpoint
|
# Register push listeners for hotels with push_endpoint
|
||||||
for hotel in config.get("alpine_bits_auth", []):
|
for hotel in config.get("alpine_bits_auth", []):
|
||||||
push_endpoint = hotel.get("push_endpoint")
|
push_endpoint = hotel.get("push_endpoint")
|
||||||
if push_endpoint:
|
hotel_id = hotel.get("hotel_id")
|
||||||
|
|
||||||
event_dispatcher.register("form_processed", partial(push_listener, hotel=hotel))
|
if push_endpoint and hotel_id:
|
||||||
|
# Register hotel-specific listener
|
||||||
|
event_dispatcher.register_hotel_listener(
|
||||||
|
"form_processed",
|
||||||
|
hotel_id,
|
||||||
|
partial(push_listener, hotel=hotel)
|
||||||
|
)
|
||||||
|
_LOGGER.info(f"Registered push listener for hotel {hotel_id} with endpoint {push_endpoint.get('url')}")
|
||||||
|
elif push_endpoint and not hotel_id:
|
||||||
|
_LOGGER.warning(f"Hotel has push_endpoint but no hotel_id: {hotel}")
|
||||||
|
elif hotel_id and not push_endpoint:
|
||||||
|
_LOGGER.info(f"Hotel {hotel_id} has no push_endpoint configured")
|
||||||
|
|
||||||
# Create tables
|
# Create tables
|
||||||
async with engine.begin() as conn:
|
async with engine.begin() as conn:
|
||||||
@@ -373,6 +419,22 @@ async def process_wix_form_submission(request: Request, data: Dict[str, Any], db
|
|||||||
#await db.refresh(db_customer)
|
#await db.refresh(db_customer)
|
||||||
|
|
||||||
|
|
||||||
|
# Determine hotel_code and hotel_name
|
||||||
|
# Priority: 1) Form field, 2) Configuration default, 3) Hardcoded fallback
|
||||||
|
hotel_code = (
|
||||||
|
data.get("field:hotelid") or
|
||||||
|
data.get("hotelid") or
|
||||||
|
request.app.state.config.get("default_hotel_code") or
|
||||||
|
"123" # fallback
|
||||||
|
)
|
||||||
|
|
||||||
|
hotel_name = (
|
||||||
|
data.get("field:hotelname") or
|
||||||
|
data.get("hotelname") or
|
||||||
|
request.app.state.config.get("default_hotel_name") or
|
||||||
|
"Frangart Inn" # fallback
|
||||||
|
)
|
||||||
|
|
||||||
db_reservation = DBReservation(
|
db_reservation = DBReservation(
|
||||||
customer_id=db_customer.id,
|
customer_id=db_customer.id,
|
||||||
unique_id=unique_id,
|
unique_id=unique_id,
|
||||||
@@ -391,18 +453,24 @@ async def process_wix_form_submission(request: Request, data: Dict[str, Any], db
|
|||||||
user_comment=data.get("field:long_answer_3524", ""),
|
user_comment=data.get("field:long_answer_3524", ""),
|
||||||
fbclid=data.get("field:fbclid"),
|
fbclid=data.get("field:fbclid"),
|
||||||
gclid=data.get("field:gclid"),
|
gclid=data.get("field:gclid"),
|
||||||
hotel_code="123",
|
hotel_code=hotel_code,
|
||||||
hotel_name="Frangart Inn",
|
hotel_name=hotel_name,
|
||||||
)
|
)
|
||||||
db.add(db_reservation)
|
db.add(db_reservation)
|
||||||
await db.commit()
|
await db.commit()
|
||||||
await db.refresh(db_reservation)
|
await db.refresh(db_reservation)
|
||||||
|
|
||||||
|
|
||||||
# Fire event for listeners (push, etc.)
|
# Fire event for listeners (push, etc.) - hotel-specific dispatch
|
||||||
dispatcher = getattr(request.app.state, "event_dispatcher", None)
|
dispatcher = getattr(request.app.state, "event_dispatcher", None)
|
||||||
if dispatcher:
|
if dispatcher:
|
||||||
await dispatcher.dispatch("form_processed", db_customer, db_reservation)
|
# Get hotel_code from reservation to target the right listeners
|
||||||
|
hotel_code = getattr(db_reservation, 'hotel_code', None)
|
||||||
|
if hotel_code and hotel_code.strip():
|
||||||
|
await dispatcher.dispatch_for_hotel("form_processed", hotel_code, db_customer, db_reservation)
|
||||||
|
_LOGGER.info(f"Dispatched form_processed event for hotel {hotel_code}")
|
||||||
|
else:
|
||||||
|
_LOGGER.warning("No hotel_code in reservation, skipping push notifications")
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"status": "success",
|
"status": "success",
|
||||||
|
|||||||
Reference in New Issue
Block a user