Compare commits
2 Commits
2560f61ee8
...
592a9d7ce7
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
592a9d7ce7 | ||
|
|
b045c62cee |
@@ -15,7 +15,6 @@ from enum import Enum, IntEnum
|
||||
from typing import Any, Optional, override
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
from sqlalchemy import select
|
||||
from xsdata.formats.dataclass.serializers.config import SerializerConfig
|
||||
from xsdata_pydantic.bindings import XmlParser, XmlSerializer
|
||||
|
||||
@@ -26,6 +25,7 @@ from alpine_bits_python.alpine_bits_helpers import (
|
||||
from alpine_bits_python.logging_config import get_logger
|
||||
|
||||
from .db import AckedRequest, Customer, Reservation
|
||||
from .reservation_service import ReservationService
|
||||
from .generated.alpinebits import (
|
||||
OtaNotifReportRq,
|
||||
OtaNotifReportRs,
|
||||
@@ -510,32 +510,29 @@ class ReadAction(AlpineBitsAction):
|
||||
hotel_read_request.selection_criteria.start
|
||||
)
|
||||
|
||||
# query all reservations for this hotel from the database, where start_date is greater than or equal to the given start_date
|
||||
# Use ReservationService to query reservations
|
||||
reservation_service = ReservationService(dbsession)
|
||||
|
||||
stmt = (
|
||||
select(Reservation, Customer)
|
||||
.join(Customer, Reservation.customer_id == Customer.id)
|
||||
.filter(Reservation.hotel_code == hotelid)
|
||||
)
|
||||
if start_date:
|
||||
_LOGGER.info("Filtering reservations from start date %s", start_date)
|
||||
stmt = stmt.filter(Reservation.created_at >= start_date)
|
||||
# remove reservations that have been acknowledged via client_id
|
||||
elif client_info.client_id:
|
||||
subquery = (
|
||||
select(Reservation.id)
|
||||
.join(
|
||||
AckedRequest,
|
||||
Reservation.md5_unique_id == AckedRequest.unique_id,
|
||||
reservation_customer_pairs = (
|
||||
await reservation_service.get_reservations_with_filters(
|
||||
start_date=start_date, hotel_code=hotelid
|
||||
)
|
||||
)
|
||||
elif client_info.client_id:
|
||||
# Remove reservations that have been acknowledged via client_id
|
||||
reservation_customer_pairs = (
|
||||
await reservation_service.get_unacknowledged_reservations(
|
||||
client_id=client_info.client_id, hotel_code=hotelid
|
||||
)
|
||||
)
|
||||
else:
|
||||
reservation_customer_pairs = (
|
||||
await reservation_service.get_reservations_with_filters(
|
||||
hotel_code=hotelid
|
||||
)
|
||||
.filter(AckedRequest.client_id == client_info.client_id)
|
||||
)
|
||||
stmt = stmt.filter(~Reservation.id.in_(subquery))
|
||||
|
||||
result = await dbsession.execute(stmt)
|
||||
reservation_customer_pairs: list[tuple[Reservation, Customer]] = (
|
||||
result.all()
|
||||
) # List of (Reservation, Customer) tuples
|
||||
|
||||
_LOGGER.info(
|
||||
"Querying reservations and customers for hotel %s from database",
|
||||
@@ -616,19 +613,16 @@ class NotifReportReadAction(AlpineBitsAction):
|
||||
"Error: Something went wrong", HttpStatusCode.INTERNAL_SERVER_ERROR
|
||||
)
|
||||
|
||||
timestamp = datetime.now(ZoneInfo("UTC"))
|
||||
# Use ReservationService to record acknowledgements
|
||||
reservation_service = ReservationService(dbsession)
|
||||
|
||||
for entry in (
|
||||
notif_report_details.hotel_notif_report.hotel_reservations.hotel_reservation
|
||||
): # type: ignore
|
||||
unique_id = entry.unique_id.id
|
||||
acked_request = AckedRequest(
|
||||
unique_id=unique_id,
|
||||
client_id=client_info.client_id,
|
||||
timestamp=timestamp,
|
||||
await reservation_service.record_acknowledgement(
|
||||
client_id=client_info.client_id, unique_id=unique_id
|
||||
)
|
||||
dbsession.add(acked_request)
|
||||
|
||||
await dbsession.commit()
|
||||
|
||||
return AlpineBitsResponse(response_xml, HttpStatusCode.OK)
|
||||
|
||||
|
||||
@@ -15,7 +15,6 @@ from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import HTMLResponse, Response
|
||||
from fastapi.security import HTTPBasic, HTTPBasicCredentials
|
||||
from slowapi.errors import RateLimitExceeded
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
|
||||
|
||||
from alpine_bits_python.schemas import ReservationData
|
||||
@@ -28,10 +27,12 @@ from .alpinebits_server import (
|
||||
)
|
||||
from .auth import generate_unique_id, validate_api_key
|
||||
from .config_loader import load_config
|
||||
from .customer_service import CustomerService
|
||||
from .db import Base, get_database_url
|
||||
from .db import Customer as DBCustomer
|
||||
from .db import Reservation as DBReservation
|
||||
from .logging_config import get_logger, setup_logging
|
||||
from .reservation_service import ReservationService
|
||||
from .rate_limit import (
|
||||
BURST_RATE_LIMIT,
|
||||
DEFAULT_RATE_LIMIT,
|
||||
@@ -222,6 +223,17 @@ async def lifespan(app: FastAPI):
|
||||
await conn.run_sync(Base.metadata.create_all)
|
||||
_LOGGER.info("Database tables checked/created at startup.")
|
||||
|
||||
# Hash any existing customers that don't have hashed versions yet
|
||||
async with AsyncSessionLocal() as session:
|
||||
customer_service = CustomerService(session)
|
||||
hashed_count = await customer_service.hash_existing_customers()
|
||||
if hashed_count > 0:
|
||||
_LOGGER.info(
|
||||
"Backfilled hashed data for %d existing customers", hashed_count
|
||||
)
|
||||
else:
|
||||
_LOGGER.info("All existing customers already have hashed data")
|
||||
|
||||
yield
|
||||
|
||||
# Optional: Dispose engine on shutdown
|
||||
@@ -294,22 +306,6 @@ async def health_check(request: Request):
|
||||
}
|
||||
|
||||
|
||||
def create_db_reservation_from_data(
|
||||
reservation_model: ReservationData, db_customer_id: int
|
||||
) -> DBReservation:
|
||||
"""Convert ReservationData to DBReservation, handling children_ages conversion."""
|
||||
data = reservation_model.model_dump(exclude_none=True)
|
||||
|
||||
children_list = data.pop("children_ages", [])
|
||||
children_csv = ",".join(str(int(a)) for a in children_list) if children_list else ""
|
||||
data["children_ages"] = children_csv
|
||||
|
||||
# Inject FK
|
||||
data["customer_id"] = db_customer_id
|
||||
|
||||
return DBReservation(**data)
|
||||
|
||||
|
||||
# Extracted business logic for handling Wix form submissions
|
||||
async def process_wix_form_submission(request: Request, data: dict[str, Any], db):
|
||||
"""Shared business logic for handling Wix form submissions (test and production)."""
|
||||
@@ -372,59 +368,30 @@ async def process_wix_form_submission(request: Request, data: dict[str, Any], db
|
||||
|
||||
unique_id = data.get("submissionId", generate_unique_id())
|
||||
|
||||
# use database session
|
||||
# Use CustomerService to handle customer creation/update with hashing
|
||||
customer_service = CustomerService(db)
|
||||
|
||||
# Check if customer with this contact_id already exists
|
||||
existing_customer = None
|
||||
if contact_id:
|
||||
result = await db.execute(
|
||||
select(DBCustomer).where(DBCustomer.contact_id == contact_id)
|
||||
)
|
||||
existing_customer = result.scalar_one_or_none()
|
||||
customer_data = {
|
||||
"given_name": first_name,
|
||||
"surname": last_name,
|
||||
"contact_id": contact_id,
|
||||
"name_prefix": name_prefix,
|
||||
"email_address": email,
|
||||
"phone": phone_number,
|
||||
"email_newsletter": email_newsletter,
|
||||
"address_line": address_line,
|
||||
"city_name": city_name,
|
||||
"postal_code": postal_code,
|
||||
"country_code": country_code,
|
||||
"gender": gender,
|
||||
"birth_date": birth_date,
|
||||
"language": language,
|
||||
"address_catalog": False,
|
||||
"name_title": None,
|
||||
}
|
||||
|
||||
if existing_customer:
|
||||
# Update existing customer with new information
|
||||
_LOGGER.info("Updating existing customer with contact_id: %s", contact_id)
|
||||
existing_customer.given_name = first_name
|
||||
existing_customer.surname = last_name
|
||||
existing_customer.name_prefix = name_prefix
|
||||
existing_customer.email_address = email
|
||||
existing_customer.phone = phone_number
|
||||
existing_customer.email_newsletter = email_newsletter
|
||||
existing_customer.address_line = address_line
|
||||
existing_customer.city_name = city_name
|
||||
existing_customer.postal_code = postal_code
|
||||
existing_customer.country_code = country_code
|
||||
existing_customer.gender = gender
|
||||
existing_customer.birth_date = birth_date
|
||||
existing_customer.language = language
|
||||
existing_customer.address_catalog = False
|
||||
existing_customer.name_title = None
|
||||
db_customer = existing_customer
|
||||
await db.flush()
|
||||
else:
|
||||
# Create new customer
|
||||
_LOGGER.info("Creating new customer with contact_id: %s", contact_id)
|
||||
db_customer = DBCustomer(
|
||||
given_name=first_name,
|
||||
surname=last_name,
|
||||
contact_id=contact_id,
|
||||
name_prefix=name_prefix,
|
||||
email_address=email,
|
||||
phone=phone_number,
|
||||
email_newsletter=email_newsletter,
|
||||
address_line=address_line,
|
||||
city_name=city_name,
|
||||
postal_code=postal_code,
|
||||
country_code=country_code,
|
||||
gender=gender,
|
||||
birth_date=birth_date,
|
||||
language=language,
|
||||
address_catalog=False,
|
||||
name_title=None,
|
||||
)
|
||||
db.add(db_customer)
|
||||
await db.flush() # This assigns db_customer.id without committing
|
||||
# This automatically creates/updates both Customer and HashedCustomer
|
||||
db_customer = await customer_service.get_or_create_customer(customer_data)
|
||||
|
||||
# Determine hotel_code and hotel_name
|
||||
# Priority: 1) Form field, 2) Configuration default, 3) Hardcoded fallback
|
||||
@@ -476,10 +443,11 @@ async def process_wix_form_submission(request: Request, data: dict[str, Any], db
|
||||
if reservation.md5_unique_id is None:
|
||||
raise HTTPException(status_code=400, detail="Failed to generate md5_unique_id")
|
||||
|
||||
db_reservation = create_db_reservation_from_data(reservation, db_customer.id)
|
||||
db.add(db_reservation)
|
||||
await db.commit()
|
||||
await db.refresh(db_reservation)
|
||||
# Use ReservationService to create reservation
|
||||
reservation_service = ReservationService(db)
|
||||
db_reservation = await reservation_service.create_reservation(
|
||||
reservation, db_customer.id
|
||||
)
|
||||
|
||||
async def push_event():
|
||||
# Fire event for listeners (push, etc.) - hotel-specific dispatch
|
||||
|
||||
178
src/alpine_bits_python/customer_service.py
Normal file
178
src/alpine_bits_python/customer_service.py
Normal file
@@ -0,0 +1,178 @@
|
||||
"""Customer service layer for handling customer and hashed customer operations."""
|
||||
|
||||
from datetime import UTC, datetime
|
||||
from typing import Optional
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from .db import Customer, HashedCustomer
|
||||
|
||||
|
||||
class CustomerService:
|
||||
"""Service for managing customers and their hashed versions.
|
||||
|
||||
Automatically maintains hashed customer data whenever customers are
|
||||
created or updated, ensuring data is always in sync for Meta Conversion API.
|
||||
"""
|
||||
|
||||
def __init__(self, session: AsyncSession):
|
||||
self.session = session
|
||||
|
||||
async def create_customer(self, customer_data: dict) -> Customer:
|
||||
"""Create a new customer and automatically create its hashed version.
|
||||
|
||||
Args:
|
||||
customer_data: Dictionary containing customer fields
|
||||
|
||||
Returns:
|
||||
The created Customer instance (with hashed_version relationship populated)
|
||||
"""
|
||||
# Create the customer
|
||||
customer = Customer(**customer_data)
|
||||
self.session.add(customer)
|
||||
await self.session.flush() # Flush to get the customer.id
|
||||
|
||||
# Create hashed version
|
||||
hashed_customer = customer.create_hashed_customer()
|
||||
hashed_customer.created_at = datetime.now(UTC)
|
||||
self.session.add(hashed_customer)
|
||||
|
||||
await self.session.commit()
|
||||
await self.session.refresh(customer)
|
||||
|
||||
return customer
|
||||
|
||||
async def update_customer(
|
||||
self, customer: Customer, update_data: dict
|
||||
) -> Customer:
|
||||
"""Update an existing customer and sync its hashed version.
|
||||
|
||||
Args:
|
||||
customer: The customer to update
|
||||
update_data: Dictionary of fields to update
|
||||
|
||||
Returns:
|
||||
The updated Customer instance
|
||||
"""
|
||||
# Update customer fields
|
||||
for key, value in update_data.items():
|
||||
if hasattr(customer, key):
|
||||
setattr(customer, key, value)
|
||||
|
||||
# Update or create hashed version
|
||||
result = await self.session.execute(
|
||||
select(HashedCustomer).where(
|
||||
HashedCustomer.customer_id == customer.id
|
||||
)
|
||||
)
|
||||
hashed_customer = result.scalar_one_or_none()
|
||||
|
||||
if hashed_customer:
|
||||
# Update existing hashed customer
|
||||
new_hashed = customer.create_hashed_customer()
|
||||
hashed_customer.hashed_email = new_hashed.hashed_email
|
||||
hashed_customer.hashed_phone = new_hashed.hashed_phone
|
||||
hashed_customer.hashed_given_name = new_hashed.hashed_given_name
|
||||
hashed_customer.hashed_surname = new_hashed.hashed_surname
|
||||
hashed_customer.hashed_city = new_hashed.hashed_city
|
||||
hashed_customer.hashed_postal_code = new_hashed.hashed_postal_code
|
||||
hashed_customer.hashed_country_code = new_hashed.hashed_country_code
|
||||
hashed_customer.hashed_gender = new_hashed.hashed_gender
|
||||
hashed_customer.hashed_birth_date = new_hashed.hashed_birth_date
|
||||
else:
|
||||
# Create new hashed customer if it doesn't exist
|
||||
hashed_customer = customer.create_hashed_customer()
|
||||
hashed_customer.created_at = datetime.now(UTC)
|
||||
self.session.add(hashed_customer)
|
||||
|
||||
await self.session.commit()
|
||||
await self.session.refresh(customer)
|
||||
|
||||
return customer
|
||||
|
||||
async def get_customer_by_contact_id(
|
||||
self, contact_id: str
|
||||
) -> Optional[Customer]:
|
||||
"""Get a customer by contact_id.
|
||||
|
||||
Args:
|
||||
contact_id: The contact_id to search for
|
||||
|
||||
Returns:
|
||||
Customer instance if found, None otherwise
|
||||
"""
|
||||
result = await self.session.execute(
|
||||
select(Customer).where(Customer.contact_id == contact_id)
|
||||
)
|
||||
return result.scalar_one_or_none()
|
||||
|
||||
async def get_or_create_customer(self, customer_data: dict) -> Customer:
|
||||
"""Get existing customer or create new one if not found.
|
||||
|
||||
Uses contact_id to identify existing customers if provided.
|
||||
|
||||
Args:
|
||||
customer_data: Dictionary containing customer fields
|
||||
(contact_id is optional)
|
||||
|
||||
Returns:
|
||||
Existing or newly created Customer instance
|
||||
"""
|
||||
contact_id = customer_data.get("contact_id")
|
||||
|
||||
if contact_id:
|
||||
existing = await self.get_customer_by_contact_id(contact_id)
|
||||
if existing:
|
||||
# Update existing customer
|
||||
return await self.update_customer(existing, customer_data)
|
||||
|
||||
# Create new customer (either no contact_id or customer doesn't exist)
|
||||
return await self.create_customer(customer_data)
|
||||
|
||||
async def get_hashed_customer(
|
||||
self, customer_id: int
|
||||
) -> Optional[HashedCustomer]:
|
||||
"""Get the hashed version of a customer.
|
||||
|
||||
Args:
|
||||
customer_id: The customer ID
|
||||
|
||||
Returns:
|
||||
HashedCustomer instance if found, None otherwise
|
||||
"""
|
||||
result = await self.session.execute(
|
||||
select(HashedCustomer).where(
|
||||
HashedCustomer.customer_id == customer_id
|
||||
)
|
||||
)
|
||||
return result.scalar_one_or_none()
|
||||
|
||||
async def hash_existing_customers(self) -> int:
|
||||
"""Hash all existing customers that don't have a hashed version yet.
|
||||
|
||||
This is useful for backfilling hashed data for customers created
|
||||
before the hashing system was implemented.
|
||||
|
||||
Returns:
|
||||
Number of customers that were hashed
|
||||
"""
|
||||
# Get all customers
|
||||
result = await self.session.execute(select(Customer))
|
||||
customers = result.scalars().all()
|
||||
|
||||
hashed_count = 0
|
||||
for customer in customers:
|
||||
# Check if this customer already has a hashed version
|
||||
existing_hashed = await self.get_hashed_customer(customer.id)
|
||||
if not existing_hashed:
|
||||
# Create hashed version
|
||||
hashed_customer = customer.create_hashed_customer()
|
||||
hashed_customer.created_at = datetime.now(UTC)
|
||||
self.session.add(hashed_customer)
|
||||
hashed_count += 1
|
||||
|
||||
if hashed_count > 0:
|
||||
await self.session.commit()
|
||||
|
||||
return hashed_count
|
||||
@@ -1,3 +1,4 @@
|
||||
import hashlib
|
||||
import os
|
||||
|
||||
from sqlalchemy import Boolean, Column, Date, DateTime, ForeignKey, Integer, String
|
||||
@@ -39,6 +40,67 @@ class Customer(Base):
|
||||
name_title = Column(String) # Added for XML
|
||||
reservations = relationship("Reservation", back_populates="customer")
|
||||
|
||||
@staticmethod
|
||||
def _normalize_and_hash(value):
|
||||
"""Normalize and hash a value according to Meta Conversion API requirements."""
|
||||
if not value:
|
||||
return None
|
||||
# Normalize: lowercase, strip whitespace
|
||||
normalized = str(value).lower().strip()
|
||||
# Remove spaces for phone numbers
|
||||
is_phone = normalized.startswith("+") or normalized.replace(
|
||||
"-", ""
|
||||
).replace(" ", "").isdigit()
|
||||
if is_phone:
|
||||
chars_to_remove = [" ", "-", "(", ")"]
|
||||
for char in chars_to_remove:
|
||||
normalized = normalized.replace(char, "")
|
||||
# SHA256 hash
|
||||
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()
|
||||
|
||||
def create_hashed_customer(self):
|
||||
"""Create a HashedCustomer instance from this Customer."""
|
||||
return HashedCustomer(
|
||||
customer_id=self.id,
|
||||
contact_id=self.contact_id,
|
||||
hashed_email=self._normalize_and_hash(self.email_address),
|
||||
hashed_phone=self._normalize_and_hash(self.phone),
|
||||
hashed_given_name=self._normalize_and_hash(self.given_name),
|
||||
hashed_surname=self._normalize_and_hash(self.surname),
|
||||
hashed_city=self._normalize_and_hash(self.city_name),
|
||||
hashed_postal_code=self._normalize_and_hash(self.postal_code),
|
||||
hashed_country_code=self._normalize_and_hash(self.country_code),
|
||||
hashed_gender=self._normalize_and_hash(self.gender),
|
||||
hashed_birth_date=self._normalize_and_hash(self.birth_date),
|
||||
)
|
||||
|
||||
|
||||
class HashedCustomer(Base):
|
||||
"""Hashed customer data for Meta Conversion API.
|
||||
|
||||
Stores SHA256 hashed versions of customer PII according to Meta's requirements.
|
||||
This allows sending conversion events without exposing raw customer data.
|
||||
"""
|
||||
|
||||
__tablename__ = "hashed_customers"
|
||||
id = Column(Integer, primary_key=True)
|
||||
customer_id = Column(
|
||||
Integer, ForeignKey("customers.id"), unique=True, nullable=False
|
||||
)
|
||||
contact_id = Column(String, unique=True) # Keep unhashed for reference
|
||||
hashed_email = Column(String(64)) # SHA256 produces 64 hex chars
|
||||
hashed_phone = Column(String(64))
|
||||
hashed_given_name = Column(String(64))
|
||||
hashed_surname = Column(String(64))
|
||||
hashed_city = Column(String(64))
|
||||
hashed_postal_code = Column(String(64))
|
||||
hashed_country_code = Column(String(64))
|
||||
hashed_gender = Column(String(64))
|
||||
hashed_birth_date = Column(String(64))
|
||||
created_at = Column(DateTime)
|
||||
|
||||
customer = relationship("Customer", backref="hashed_version")
|
||||
|
||||
|
||||
class Reservation(Base):
|
||||
__tablename__ = "reservations"
|
||||
|
||||
263
src/alpine_bits_python/reservation_service.py
Normal file
263
src/alpine_bits_python/reservation_service.py
Normal file
@@ -0,0 +1,263 @@
|
||||
"""Reservation service layer for handling reservation database operations."""
|
||||
|
||||
import hashlib
|
||||
from datetime import UTC, datetime
|
||||
from typing import Optional
|
||||
|
||||
from sqlalchemy import and_, select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from .db import AckedRequest, Customer, Reservation
|
||||
from .schemas import ReservationData
|
||||
|
||||
|
||||
class ReservationService:
|
||||
"""Service for managing reservations and related operations.
|
||||
|
||||
Handles all database operations for reservations including creation,
|
||||
retrieval, and acknowledgement tracking.
|
||||
"""
|
||||
|
||||
def __init__(self, session: AsyncSession):
|
||||
self.session = session
|
||||
|
||||
def _convert_reservation_data_to_db(
|
||||
self, reservation_model: ReservationData, customer_id: int
|
||||
) -> Reservation:
|
||||
"""Convert ReservationData to Reservation model.
|
||||
|
||||
Args:
|
||||
reservation_model: ReservationData instance
|
||||
customer_id: Customer ID to link to
|
||||
|
||||
Returns:
|
||||
Reservation instance ready for database insertion
|
||||
"""
|
||||
data = reservation_model.model_dump(exclude_none=True)
|
||||
|
||||
# Convert children_ages list to CSV string
|
||||
children_list = data.pop("children_ages", [])
|
||||
children_csv = (
|
||||
",".join(str(int(a)) for a in children_list) if children_list else ""
|
||||
)
|
||||
data["children_ages"] = children_csv
|
||||
|
||||
# Inject foreign key
|
||||
data["customer_id"] = customer_id
|
||||
|
||||
return Reservation(**data)
|
||||
|
||||
async def create_reservation(
|
||||
self, reservation_data: ReservationData, customer_id: int
|
||||
) -> Reservation:
|
||||
"""Create a new reservation.
|
||||
|
||||
Args:
|
||||
reservation_data: ReservationData containing reservation details
|
||||
customer_id: ID of the customer making the reservation
|
||||
|
||||
Returns:
|
||||
Created Reservation instance
|
||||
"""
|
||||
reservation = self._convert_reservation_data_to_db(
|
||||
reservation_data, customer_id
|
||||
)
|
||||
self.session.add(reservation)
|
||||
await self.session.commit()
|
||||
await self.session.refresh(reservation)
|
||||
return reservation
|
||||
|
||||
async def get_reservation_by_unique_id(
|
||||
self, unique_id: str
|
||||
) -> Optional[Reservation]:
|
||||
"""Get a reservation by unique_id.
|
||||
|
||||
Args:
|
||||
unique_id: The unique_id to search for
|
||||
|
||||
Returns:
|
||||
Reservation instance if found, None otherwise
|
||||
"""
|
||||
result = await self.session.execute(
|
||||
select(Reservation).where(Reservation.unique_id == unique_id)
|
||||
)
|
||||
return result.scalar_one_or_none()
|
||||
|
||||
async def get_reservation_by_md5_unique_id(
|
||||
self, md5_unique_id: str
|
||||
) -> Optional[Reservation]:
|
||||
"""Get a reservation by md5_unique_id.
|
||||
|
||||
Args:
|
||||
md5_unique_id: The MD5 hash of unique_id
|
||||
|
||||
Returns:
|
||||
Reservation instance if found, None otherwise
|
||||
"""
|
||||
result = await self.session.execute(
|
||||
select(Reservation).where(
|
||||
Reservation.md5_unique_id == md5_unique_id
|
||||
)
|
||||
)
|
||||
return result.scalar_one_or_none()
|
||||
|
||||
async def check_duplicate_reservation(
|
||||
self, unique_id: str, md5_unique_id: str
|
||||
) -> bool:
|
||||
"""Check if a reservation already exists.
|
||||
|
||||
Args:
|
||||
unique_id: The unique_id to check
|
||||
md5_unique_id: The MD5 hash to check
|
||||
|
||||
Returns:
|
||||
True if reservation exists, False otherwise
|
||||
"""
|
||||
existing = await self.get_reservation_by_unique_id(unique_id)
|
||||
if existing:
|
||||
return True
|
||||
|
||||
existing_md5 = await self.get_reservation_by_md5_unique_id(md5_unique_id)
|
||||
return existing_md5 is not None
|
||||
|
||||
async def get_reservations_for_customer(
|
||||
self, customer_id: int
|
||||
) -> list[Reservation]:
|
||||
"""Get all reservations for a customer.
|
||||
|
||||
Args:
|
||||
customer_id: The customer ID
|
||||
|
||||
Returns:
|
||||
List of Reservation instances
|
||||
"""
|
||||
result = await self.session.execute(
|
||||
select(Reservation).where(Reservation.customer_id == customer_id)
|
||||
)
|
||||
return list(result.scalars().all())
|
||||
|
||||
async def get_reservations_with_filters(
|
||||
self,
|
||||
start_date: Optional[datetime] = None,
|
||||
end_date: Optional[datetime] = None,
|
||||
hotel_code: Optional[str] = None,
|
||||
) -> list[tuple[Reservation, Customer]]:
|
||||
"""Get reservations with optional filters, joined with customers.
|
||||
|
||||
Args:
|
||||
start_date: Filter by created_at >= this value
|
||||
end_date: Filter by created_at <= this value
|
||||
hotel_code: Filter by hotel code
|
||||
|
||||
Returns:
|
||||
List of (Reservation, Customer) tuples
|
||||
"""
|
||||
query = select(Reservation, Customer).join(
|
||||
Customer, Reservation.customer_id == Customer.id
|
||||
)
|
||||
|
||||
filters = []
|
||||
if start_date:
|
||||
filters.append(Reservation.created_at >= start_date)
|
||||
if end_date:
|
||||
filters.append(Reservation.created_at <= end_date)
|
||||
if hotel_code:
|
||||
filters.append(Reservation.hotel_code == hotel_code)
|
||||
|
||||
if filters:
|
||||
query = query.where(and_(*filters))
|
||||
|
||||
result = await self.session.execute(query)
|
||||
return list(result.all())
|
||||
|
||||
async def get_unacknowledged_reservations(
|
||||
self,
|
||||
client_id: str,
|
||||
start_date: Optional[datetime] = None,
|
||||
end_date: Optional[datetime] = None,
|
||||
hotel_code: Optional[str] = None,
|
||||
) -> list[tuple[Reservation, Customer]]:
|
||||
"""Get reservations that haven't been acknowledged by a client.
|
||||
|
||||
Args:
|
||||
client_id: The client ID to check acknowledgements for
|
||||
start_date: Filter by start date >= this value
|
||||
end_date: Filter by end date <= this value
|
||||
hotel_code: Filter by hotel code
|
||||
|
||||
Returns:
|
||||
List of (Reservation, Customer) tuples that are unacknowledged
|
||||
"""
|
||||
# Get all acknowledged md5_unique_ids for this client
|
||||
acked_result = await self.session.execute(
|
||||
select(AckedRequest.unique_id).where(
|
||||
AckedRequest.client_id == client_id
|
||||
)
|
||||
)
|
||||
acked_md5_ids = {row[0] for row in acked_result.all()}
|
||||
|
||||
# Get all reservations with filters
|
||||
all_reservations = await self.get_reservations_with_filters(
|
||||
start_date, end_date, hotel_code
|
||||
)
|
||||
|
||||
# Filter out acknowledged ones (comparing md5_unique_id)
|
||||
return [
|
||||
(res, cust)
|
||||
for res, cust in all_reservations
|
||||
if res.md5_unique_id not in acked_md5_ids
|
||||
]
|
||||
|
||||
async def record_acknowledgement(
|
||||
self, client_id: str, unique_id: str
|
||||
) -> AckedRequest:
|
||||
"""Record that a client has acknowledged a reservation.
|
||||
|
||||
Args:
|
||||
client_id: The client ID
|
||||
unique_id: The unique_id of the reservation
|
||||
|
||||
Returns:
|
||||
Created AckedRequest instance
|
||||
"""
|
||||
acked = AckedRequest(
|
||||
client_id=client_id,
|
||||
unique_id=unique_id,
|
||||
timestamp=datetime.now(UTC),
|
||||
)
|
||||
self.session.add(acked)
|
||||
await self.session.commit()
|
||||
await self.session.refresh(acked)
|
||||
return acked
|
||||
|
||||
async def is_acknowledged(self, client_id: str, unique_id: str) -> bool:
|
||||
"""Check if a reservation has been acknowledged by a client.
|
||||
|
||||
Args:
|
||||
client_id: The client ID
|
||||
unique_id: The reservation unique_id
|
||||
|
||||
Returns:
|
||||
True if acknowledged, False otherwise
|
||||
"""
|
||||
result = await self.session.execute(
|
||||
select(AckedRequest).where(
|
||||
and_(
|
||||
AckedRequest.client_id == client_id,
|
||||
AckedRequest.unique_id == unique_id,
|
||||
)
|
||||
)
|
||||
)
|
||||
return result.scalar_one_or_none() is not None
|
||||
|
||||
@staticmethod
|
||||
def generate_md5_unique_id(unique_id: str) -> str:
|
||||
"""Generate MD5 hash of unique_id.
|
||||
|
||||
Args:
|
||||
unique_id: The unique_id to hash
|
||||
|
||||
Returns:
|
||||
MD5 hash as hex string
|
||||
"""
|
||||
return hashlib.md5(unique_id.encode("utf-8")).hexdigest()
|
||||
216
tests/test_customer_service.py
Normal file
216
tests/test_customer_service.py
Normal file
@@ -0,0 +1,216 @@
|
||||
"""Tests for CustomerService functionality."""
|
||||
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
||||
|
||||
from alpine_bits_python.customer_service import CustomerService
|
||||
from alpine_bits_python.db import Base, Customer, HashedCustomer
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def async_session():
|
||||
"""Create an async session for testing."""
|
||||
engine = create_async_engine("sqlite+aiosqlite:///:memory:", echo=False)
|
||||
async with engine.begin() as conn:
|
||||
await conn.run_sync(Base.metadata.create_all)
|
||||
|
||||
async_session_maker = async_sessionmaker(engine, expire_on_commit=False)
|
||||
async with async_session_maker() as session:
|
||||
yield session
|
||||
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_customer_creates_hashed_version(async_session: AsyncSession):
|
||||
"""Test that creating a customer automatically creates hashed version."""
|
||||
service = CustomerService(async_session)
|
||||
|
||||
customer_data = {
|
||||
"given_name": "John",
|
||||
"surname": "Doe",
|
||||
"contact_id": "test123",
|
||||
"email_address": "john@example.com",
|
||||
"phone": "+1234567890",
|
||||
}
|
||||
|
||||
customer = await service.create_customer(customer_data)
|
||||
|
||||
assert customer.id is not None
|
||||
assert customer.given_name == "John"
|
||||
|
||||
# Check that hashed version was created
|
||||
hashed = await service.get_hashed_customer(customer.id)
|
||||
assert hashed is not None
|
||||
assert hashed.customer_id == customer.id
|
||||
assert hashed.hashed_email is not None
|
||||
assert hashed.hashed_phone is not None
|
||||
assert hashed.hashed_given_name is not None
|
||||
assert hashed.hashed_surname is not None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_customer_updates_hashed_version(async_session: AsyncSession):
|
||||
"""Test that updating a customer updates the hashed version."""
|
||||
service = CustomerService(async_session)
|
||||
|
||||
# Create initial customer
|
||||
customer_data = {
|
||||
"given_name": "John",
|
||||
"surname": "Doe",
|
||||
"contact_id": "test123",
|
||||
"email_address": "john@example.com",
|
||||
}
|
||||
customer = await service.create_customer(customer_data)
|
||||
|
||||
# Get initial hashed email
|
||||
hashed = await service.get_hashed_customer(customer.id)
|
||||
original_hashed_email = hashed.hashed_email
|
||||
|
||||
# Update customer email
|
||||
update_data = {"email_address": "newemail@example.com"}
|
||||
updated_customer = await service.update_customer(customer, update_data)
|
||||
|
||||
# Check that hashed version was updated
|
||||
updated_hashed = await service.get_hashed_customer(updated_customer.id)
|
||||
assert updated_hashed.hashed_email != original_hashed_email
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_or_create_customer_creates_new(async_session: AsyncSession):
|
||||
"""Test get_or_create creates new customer when not found."""
|
||||
service = CustomerService(async_session)
|
||||
|
||||
customer_data = {
|
||||
"given_name": "Jane",
|
||||
"surname": "Smith",
|
||||
"contact_id": "new123",
|
||||
"email_address": "jane@example.com",
|
||||
}
|
||||
|
||||
customer = await service.get_or_create_customer(customer_data)
|
||||
assert customer.id is not None
|
||||
assert customer.contact_id == "new123"
|
||||
|
||||
# Verify hashed version exists
|
||||
hashed = await service.get_hashed_customer(customer.id)
|
||||
assert hashed is not None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_or_create_customer_updates_existing(async_session: AsyncSession):
|
||||
"""Test get_or_create updates existing customer when found."""
|
||||
service = CustomerService(async_session)
|
||||
|
||||
# Create initial customer
|
||||
customer_data = {
|
||||
"given_name": "Jane",
|
||||
"surname": "Smith",
|
||||
"contact_id": "existing123",
|
||||
"email_address": "jane@example.com",
|
||||
}
|
||||
original_customer = await service.create_customer(customer_data)
|
||||
|
||||
# Try to create again with same contact_id but different data
|
||||
updated_data = {
|
||||
"given_name": "Janet",
|
||||
"surname": "Smith",
|
||||
"contact_id": "existing123",
|
||||
"email_address": "janet@example.com",
|
||||
}
|
||||
customer = await service.get_or_create_customer(updated_data)
|
||||
|
||||
# Should be same customer ID but updated data
|
||||
assert customer.id == original_customer.id
|
||||
assert customer.given_name == "Janet"
|
||||
assert customer.email_address == "janet@example.com"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_hash_existing_customers_backfills(async_session: AsyncSession):
|
||||
"""Test that hash_existing_customers backfills missing hashed data."""
|
||||
# Create a customer directly in DB without using service
|
||||
customer = Customer(
|
||||
given_name="Bob",
|
||||
surname="Builder",
|
||||
contact_id="bob123",
|
||||
email_address="bob@example.com",
|
||||
phone="+9876543210",
|
||||
)
|
||||
async_session.add(customer)
|
||||
await async_session.commit()
|
||||
await async_session.refresh(customer)
|
||||
|
||||
# Verify no hashed version exists
|
||||
result = await async_session.execute(
|
||||
select(HashedCustomer).where(HashedCustomer.customer_id == customer.id)
|
||||
)
|
||||
hashed = result.scalar_one_or_none()
|
||||
assert hashed is None
|
||||
|
||||
# Run backfill
|
||||
service = CustomerService(async_session)
|
||||
count = await service.hash_existing_customers()
|
||||
|
||||
assert count == 1
|
||||
|
||||
# Verify hashed version now exists
|
||||
result = await async_session.execute(
|
||||
select(HashedCustomer).where(HashedCustomer.customer_id == customer.id)
|
||||
)
|
||||
hashed = result.scalar_one_or_none()
|
||||
assert hashed is not None
|
||||
assert hashed.hashed_email is not None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_hash_existing_customers_skips_already_hashed(
|
||||
async_session: AsyncSession,
|
||||
):
|
||||
"""Test that hash_existing_customers skips customers already hashed."""
|
||||
service = CustomerService(async_session)
|
||||
|
||||
# Create customer with service (creates hashed version)
|
||||
customer_data = {
|
||||
"given_name": "Alice",
|
||||
"surname": "Wonder",
|
||||
"contact_id": "alice123",
|
||||
"email_address": "alice@example.com",
|
||||
}
|
||||
await service.create_customer(customer_data)
|
||||
|
||||
# Run backfill - should return 0 since customer is already hashed
|
||||
count = await service.hash_existing_customers()
|
||||
assert count == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_hashing_normalization(async_session: AsyncSession):
|
||||
"""Test that hashing properly normalizes data."""
|
||||
service = CustomerService(async_session)
|
||||
|
||||
# Create customer with mixed case and spaces
|
||||
customer_data = {
|
||||
"given_name": " John ",
|
||||
"surname": "DOE",
|
||||
"contact_id": "test123",
|
||||
"email_address": " John.Doe@Example.COM ",
|
||||
"phone": "+1 (234) 567-8900",
|
||||
}
|
||||
customer = await service.create_customer(customer_data)
|
||||
|
||||
hashed = await service.get_hashed_customer(customer.id)
|
||||
|
||||
# Verify hashes exist (normalization should have occurred)
|
||||
assert hashed.hashed_email is not None
|
||||
assert hashed.hashed_phone is not None
|
||||
|
||||
# Hash should be consistent for same normalized value
|
||||
from alpine_bits_python.db import Customer as CustomerModel
|
||||
|
||||
normalized_email_hash = CustomerModel._normalize_and_hash(
|
||||
"john.doe@example.com"
|
||||
)
|
||||
assert hashed.hashed_email == normalized_email_hash
|
||||
Reference in New Issue
Block a user