Files
alpinebits_python/src/alpine_bits_python/reservation_service.py
Jonas Linter d88a53327f Migration done
2025-11-19 15:01:16 +01:00

315 lines
10 KiB
Python

"""Reservation service layer for handling reservation database operations."""
import hashlib
from datetime import UTC, datetime
from typing import Optional
from sqlalchemy import and_, select
from sqlalchemy.ext.asyncio import AsyncSession
from .db import AckedRequest, Customer, HashedCustomer, Reservation
from .schemas import ReservationData
class ReservationService:
"""Service for managing reservations and related operations.
Handles all database operations for reservations including creation,
retrieval, and acknowledgement tracking.
"""
def __init__(self, session: AsyncSession):
self.session = session
def _convert_reservation_data_to_db(
self, reservation_model: ReservationData, customer_id: int
) -> Reservation:
"""Convert ReservationData to Reservation model.
Args:
reservation_model: ReservationData instance
customer_id: Customer ID to link to
Returns:
Reservation instance ready for database insertion
"""
data = reservation_model.model_dump(exclude_none=True)
# Convert children_ages list to CSV string
children_list = data.pop("children_ages", [])
children_csv = (
",".join(str(int(a)) for a in children_list) if children_list else ""
)
data["children_ages"] = children_csv
# Inject foreign key
data["customer_id"] = customer_id
return Reservation(**data)
async def create_reservation(
self, reservation_data: ReservationData, customer_id: int, auto_commit: bool = True
) -> Reservation:
"""Create a new reservation.
Args:
reservation_data: ReservationData containing reservation details
customer_id: ID of the customer making the reservation
auto_commit: If True, commits the transaction. If False, caller must commit.
Returns:
Created Reservation instance
"""
reservation = self._convert_reservation_data_to_db(
reservation_data, customer_id
)
# Automatically populate hashed_customer_id from the customer
# Since hashed_customer is always created when a customer is created,
# we can get it by querying for the hashed_customer with matching customer_id
hashed_customer_result = await self.session.execute(
select(HashedCustomer).where(
HashedCustomer.customer_id == customer_id
)
)
hashed_customer = hashed_customer_result.scalar_one_or_none()
if hashed_customer:
reservation.hashed_customer_id = hashed_customer.id
self.session.add(reservation)
if auto_commit:
await self.session.commit()
await self.session.refresh(reservation)
else:
await self.session.flush() # Flush to get the reservation.id
return reservation
async def get_reservation_by_unique_id(
self, unique_id: str
) -> Optional[Reservation]:
"""Get a reservation by unique_id.
Args:
unique_id: The unique_id to search for
Returns:
Reservation instance if found, None otherwise
"""
result = await self.session.execute(
select(Reservation).where(Reservation.unique_id == unique_id)
)
return result.scalar_one_or_none()
async def get_reservation_by_md5_unique_id(
self, md5_unique_id: str
) -> Optional[Reservation]:
"""Get a reservation by md5_unique_id.
Args:
md5_unique_id: The MD5 hash of unique_id
Returns:
Reservation instance if found, None otherwise
"""
result = await self.session.execute(
select(Reservation).where(
Reservation.md5_unique_id == md5_unique_id
)
)
return result.scalar_one_or_none()
async def check_duplicate_reservation(
self, unique_id: str, md5_unique_id: str
) -> bool:
"""Check if a reservation already exists.
Args:
unique_id: The unique_id to check
md5_unique_id: The MD5 hash to check
Returns:
True if reservation exists, False otherwise
"""
existing = await self.get_reservation_by_unique_id(unique_id)
if existing:
return True
existing_md5 = await self.get_reservation_by_md5_unique_id(md5_unique_id)
return existing_md5 is not None
async def get_reservations_for_customer(
self, customer_id: int
) -> list[Reservation]:
"""Get all reservations for a customer.
Args:
customer_id: The customer ID
Returns:
List of Reservation instances
"""
result = await self.session.execute(
select(Reservation).where(Reservation.customer_id == customer_id)
)
return list(result.scalars().all())
async def get_reservations_with_filters(
self,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
hotel_code: Optional[str] = None,
) -> list[tuple[Reservation, Customer]]:
"""Get reservations with optional filters, joined with customers.
Args:
start_date: Filter by created_at >= this value
end_date: Filter by created_at <= this value
hotel_code: Filter by hotel code
Returns:
List of (Reservation, Customer) tuples
"""
query = select(Reservation, Customer).join(
Customer, Reservation.customer_id == Customer.id
)
filters = []
if start_date:
filters.append(Reservation.created_at >= start_date)
if end_date:
filters.append(Reservation.created_at <= end_date)
if hotel_code:
filters.append(Reservation.hotel_code == hotel_code)
if filters:
query = query.where(and_(*filters))
result = await self.session.execute(query)
return list(result.all())
async def get_unacknowledged_reservations(
self,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
hotel_code: Optional[str] = None,
username: Optional[str] = None,
client_id: Optional[str] = None,
) -> list[tuple[Reservation, Customer]]:
"""Get reservations that haven't been acknowledged by a client.
Prioritizes checking by username if provided, falls back to client_id for backward compatibility.
Args:
start_date: Filter by start date >= this value
end_date: Filter by end date <= this value
hotel_code: Filter by hotel code
username: The username of the client (preferred for lookup)
client_id: The client ID (fallback for backward compatibility)
Returns:
List of (Reservation, Customer) tuples that are unacknowledged
"""
# Get all acknowledged unique_ids for this client/username
if username:
acked_result = await self.session.execute(
select(AckedRequest.unique_id).where(
AckedRequest.username == username
)
)
else:
acked_result = await self.session.execute(
select(AckedRequest.unique_id).where(
AckedRequest.client_id == client_id
)
)
acked_md5_ids = {row[0] for row in acked_result.all()}
# Get all reservations with filters
all_reservations = await self.get_reservations_with_filters(
start_date, end_date, hotel_code
)
# Filter out acknowledged ones (comparing md5_unique_id)
return [
(res, cust)
for res, cust in all_reservations
if res.md5_unique_id not in acked_md5_ids
]
async def record_acknowledgement(
self, client_id: str, unique_id: str, username: Optional[str] = None, auto_commit: bool = True
) -> AckedRequest:
"""Record that a client has acknowledged a reservation.
Args:
client_id: The client ID
unique_id: The unique_id of the reservation (md5_unique_id)
username: The username of the client making the request (optional)
auto_commit: If True, commits the transaction. If False, caller must commit.
Returns:
Created AckedRequest instance
"""
acked = AckedRequest(
client_id=client_id,
username=username,
unique_id=unique_id,
timestamp=datetime.now(UTC),
)
self.session.add(acked)
if auto_commit:
await self.session.commit()
await self.session.refresh(acked)
else:
await self.session.flush() # Flush to get the acked.id
return acked
async def is_acknowledged(self, unique_id: str, username: Optional[str] = None, client_id: Optional[str] = None) -> bool:
"""Check if a reservation has been acknowledged by a client.
Prioritizes checking by username if provided, falls back to client_id for backward compatibility.
Args:
unique_id: The reservation unique_id
username: The username of the client (preferred for lookup)
client_id: The client ID (fallback for backward compatibility)
Returns:
True if acknowledged, False otherwise
"""
if username:
result = await self.session.execute(
select(AckedRequest).where(
and_(
AckedRequest.username == username,
AckedRequest.unique_id == unique_id,
)
)
)
else:
result = await self.session.execute(
select(AckedRequest).where(
and_(
AckedRequest.client_id == client_id,
AckedRequest.unique_id == unique_id,
)
)
)
return result.scalar_one_or_none() is not None
@staticmethod
def generate_md5_unique_id(unique_id: str) -> str:
"""Generate MD5 hash of unique_id.
Args:
unique_id: The unique_id to hash
Returns:
MD5 hash as hex string
"""
return hashlib.md5(unique_id.encode("utf-8")).hexdigest()