2 Commits

Author SHA1 Message Date
Jonas Linter
592a9d7ce7 Added reservation_service aswell 2025-10-13 10:59:05 +02:00
Jonas Linter
b045c62cee Created hashed customers. migrated to service instead of using db logic directly 2025-10-13 10:51:56 +02:00
6 changed files with 783 additions and 102 deletions

View File

@@ -15,7 +15,6 @@ from enum import Enum, IntEnum
from typing import Any, Optional, override
from zoneinfo import ZoneInfo
from sqlalchemy import select
from xsdata.formats.dataclass.serializers.config import SerializerConfig
from xsdata_pydantic.bindings import XmlParser, XmlSerializer
@@ -26,6 +25,7 @@ from alpine_bits_python.alpine_bits_helpers import (
from alpine_bits_python.logging_config import get_logger
from .db import AckedRequest, Customer, Reservation
from .reservation_service import ReservationService
from .generated.alpinebits import (
OtaNotifReportRq,
OtaNotifReportRs,
@@ -510,32 +510,29 @@ class ReadAction(AlpineBitsAction):
hotel_read_request.selection_criteria.start
)
# query all reservations for this hotel from the database, where start_date is greater than or equal to the given start_date
# Use ReservationService to query reservations
reservation_service = ReservationService(dbsession)
stmt = (
select(Reservation, Customer)
.join(Customer, Reservation.customer_id == Customer.id)
.filter(Reservation.hotel_code == hotelid)
)
if start_date:
_LOGGER.info("Filtering reservations from start date %s", start_date)
stmt = stmt.filter(Reservation.created_at >= start_date)
# remove reservations that have been acknowledged via client_id
elif client_info.client_id:
subquery = (
select(Reservation.id)
.join(
AckedRequest,
Reservation.md5_unique_id == AckedRequest.unique_id,
reservation_customer_pairs = (
await reservation_service.get_reservations_with_filters(
start_date=start_date, hotel_code=hotelid
)
)
elif client_info.client_id:
# Remove reservations that have been acknowledged via client_id
reservation_customer_pairs = (
await reservation_service.get_unacknowledged_reservations(
client_id=client_info.client_id, hotel_code=hotelid
)
)
else:
reservation_customer_pairs = (
await reservation_service.get_reservations_with_filters(
hotel_code=hotelid
)
.filter(AckedRequest.client_id == client_info.client_id)
)
stmt = stmt.filter(~Reservation.id.in_(subquery))
result = await dbsession.execute(stmt)
reservation_customer_pairs: list[tuple[Reservation, Customer]] = (
result.all()
) # List of (Reservation, Customer) tuples
_LOGGER.info(
"Querying reservations and customers for hotel %s from database",
@@ -616,19 +613,16 @@ class NotifReportReadAction(AlpineBitsAction):
"Error: Something went wrong", HttpStatusCode.INTERNAL_SERVER_ERROR
)
timestamp = datetime.now(ZoneInfo("UTC"))
# Use ReservationService to record acknowledgements
reservation_service = ReservationService(dbsession)
for entry in (
notif_report_details.hotel_notif_report.hotel_reservations.hotel_reservation
): # type: ignore
unique_id = entry.unique_id.id
acked_request = AckedRequest(
unique_id=unique_id,
client_id=client_info.client_id,
timestamp=timestamp,
await reservation_service.record_acknowledgement(
client_id=client_info.client_id, unique_id=unique_id
)
dbsession.add(acked_request)
await dbsession.commit()
return AlpineBitsResponse(response_xml, HttpStatusCode.OK)

View File

@@ -15,7 +15,6 @@ from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, Response
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from slowapi.errors import RateLimitExceeded
from sqlalchemy import select
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from alpine_bits_python.schemas import ReservationData
@@ -28,10 +27,12 @@ from .alpinebits_server import (
)
from .auth import generate_unique_id, validate_api_key
from .config_loader import load_config
from .customer_service import CustomerService
from .db import Base, get_database_url
from .db import Customer as DBCustomer
from .db import Reservation as DBReservation
from .logging_config import get_logger, setup_logging
from .reservation_service import ReservationService
from .rate_limit import (
BURST_RATE_LIMIT,
DEFAULT_RATE_LIMIT,
@@ -222,6 +223,17 @@ async def lifespan(app: FastAPI):
await conn.run_sync(Base.metadata.create_all)
_LOGGER.info("Database tables checked/created at startup.")
# Hash any existing customers that don't have hashed versions yet
async with AsyncSessionLocal() as session:
customer_service = CustomerService(session)
hashed_count = await customer_service.hash_existing_customers()
if hashed_count > 0:
_LOGGER.info(
"Backfilled hashed data for %d existing customers", hashed_count
)
else:
_LOGGER.info("All existing customers already have hashed data")
yield
# Optional: Dispose engine on shutdown
@@ -294,22 +306,6 @@ async def health_check(request: Request):
}
def create_db_reservation_from_data(
reservation_model: ReservationData, db_customer_id: int
) -> DBReservation:
"""Convert ReservationData to DBReservation, handling children_ages conversion."""
data = reservation_model.model_dump(exclude_none=True)
children_list = data.pop("children_ages", [])
children_csv = ",".join(str(int(a)) for a in children_list) if children_list else ""
data["children_ages"] = children_csv
# Inject FK
data["customer_id"] = db_customer_id
return DBReservation(**data)
# Extracted business logic for handling Wix form submissions
async def process_wix_form_submission(request: Request, data: dict[str, Any], db):
"""Shared business logic for handling Wix form submissions (test and production)."""
@@ -372,59 +368,30 @@ async def process_wix_form_submission(request: Request, data: dict[str, Any], db
unique_id = data.get("submissionId", generate_unique_id())
# use database session
# Use CustomerService to handle customer creation/update with hashing
customer_service = CustomerService(db)
# Check if customer with this contact_id already exists
existing_customer = None
if contact_id:
result = await db.execute(
select(DBCustomer).where(DBCustomer.contact_id == contact_id)
)
existing_customer = result.scalar_one_or_none()
customer_data = {
"given_name": first_name,
"surname": last_name,
"contact_id": contact_id,
"name_prefix": name_prefix,
"email_address": email,
"phone": phone_number,
"email_newsletter": email_newsletter,
"address_line": address_line,
"city_name": city_name,
"postal_code": postal_code,
"country_code": country_code,
"gender": gender,
"birth_date": birth_date,
"language": language,
"address_catalog": False,
"name_title": None,
}
if existing_customer:
# Update existing customer with new information
_LOGGER.info("Updating existing customer with contact_id: %s", contact_id)
existing_customer.given_name = first_name
existing_customer.surname = last_name
existing_customer.name_prefix = name_prefix
existing_customer.email_address = email
existing_customer.phone = phone_number
existing_customer.email_newsletter = email_newsletter
existing_customer.address_line = address_line
existing_customer.city_name = city_name
existing_customer.postal_code = postal_code
existing_customer.country_code = country_code
existing_customer.gender = gender
existing_customer.birth_date = birth_date
existing_customer.language = language
existing_customer.address_catalog = False
existing_customer.name_title = None
db_customer = existing_customer
await db.flush()
else:
# Create new customer
_LOGGER.info("Creating new customer with contact_id: %s", contact_id)
db_customer = DBCustomer(
given_name=first_name,
surname=last_name,
contact_id=contact_id,
name_prefix=name_prefix,
email_address=email,
phone=phone_number,
email_newsletter=email_newsletter,
address_line=address_line,
city_name=city_name,
postal_code=postal_code,
country_code=country_code,
gender=gender,
birth_date=birth_date,
language=language,
address_catalog=False,
name_title=None,
)
db.add(db_customer)
await db.flush() # This assigns db_customer.id without committing
# This automatically creates/updates both Customer and HashedCustomer
db_customer = await customer_service.get_or_create_customer(customer_data)
# Determine hotel_code and hotel_name
# Priority: 1) Form field, 2) Configuration default, 3) Hardcoded fallback
@@ -476,10 +443,11 @@ async def process_wix_form_submission(request: Request, data: dict[str, Any], db
if reservation.md5_unique_id is None:
raise HTTPException(status_code=400, detail="Failed to generate md5_unique_id")
db_reservation = create_db_reservation_from_data(reservation, db_customer.id)
db.add(db_reservation)
await db.commit()
await db.refresh(db_reservation)
# Use ReservationService to create reservation
reservation_service = ReservationService(db)
db_reservation = await reservation_service.create_reservation(
reservation, db_customer.id
)
async def push_event():
# Fire event for listeners (push, etc.) - hotel-specific dispatch

View File

@@ -0,0 +1,178 @@
"""Customer service layer for handling customer and hashed customer operations."""
from datetime import UTC, datetime
from typing import Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from .db import Customer, HashedCustomer
class CustomerService:
"""Service for managing customers and their hashed versions.
Automatically maintains hashed customer data whenever customers are
created or updated, ensuring data is always in sync for Meta Conversion API.
"""
def __init__(self, session: AsyncSession):
self.session = session
async def create_customer(self, customer_data: dict) -> Customer:
"""Create a new customer and automatically create its hashed version.
Args:
customer_data: Dictionary containing customer fields
Returns:
The created Customer instance (with hashed_version relationship populated)
"""
# Create the customer
customer = Customer(**customer_data)
self.session.add(customer)
await self.session.flush() # Flush to get the customer.id
# Create hashed version
hashed_customer = customer.create_hashed_customer()
hashed_customer.created_at = datetime.now(UTC)
self.session.add(hashed_customer)
await self.session.commit()
await self.session.refresh(customer)
return customer
async def update_customer(
self, customer: Customer, update_data: dict
) -> Customer:
"""Update an existing customer and sync its hashed version.
Args:
customer: The customer to update
update_data: Dictionary of fields to update
Returns:
The updated Customer instance
"""
# Update customer fields
for key, value in update_data.items():
if hasattr(customer, key):
setattr(customer, key, value)
# Update or create hashed version
result = await self.session.execute(
select(HashedCustomer).where(
HashedCustomer.customer_id == customer.id
)
)
hashed_customer = result.scalar_one_or_none()
if hashed_customer:
# Update existing hashed customer
new_hashed = customer.create_hashed_customer()
hashed_customer.hashed_email = new_hashed.hashed_email
hashed_customer.hashed_phone = new_hashed.hashed_phone
hashed_customer.hashed_given_name = new_hashed.hashed_given_name
hashed_customer.hashed_surname = new_hashed.hashed_surname
hashed_customer.hashed_city = new_hashed.hashed_city
hashed_customer.hashed_postal_code = new_hashed.hashed_postal_code
hashed_customer.hashed_country_code = new_hashed.hashed_country_code
hashed_customer.hashed_gender = new_hashed.hashed_gender
hashed_customer.hashed_birth_date = new_hashed.hashed_birth_date
else:
# Create new hashed customer if it doesn't exist
hashed_customer = customer.create_hashed_customer()
hashed_customer.created_at = datetime.now(UTC)
self.session.add(hashed_customer)
await self.session.commit()
await self.session.refresh(customer)
return customer
async def get_customer_by_contact_id(
self, contact_id: str
) -> Optional[Customer]:
"""Get a customer by contact_id.
Args:
contact_id: The contact_id to search for
Returns:
Customer instance if found, None otherwise
"""
result = await self.session.execute(
select(Customer).where(Customer.contact_id == contact_id)
)
return result.scalar_one_or_none()
async def get_or_create_customer(self, customer_data: dict) -> Customer:
"""Get existing customer or create new one if not found.
Uses contact_id to identify existing customers if provided.
Args:
customer_data: Dictionary containing customer fields
(contact_id is optional)
Returns:
Existing or newly created Customer instance
"""
contact_id = customer_data.get("contact_id")
if contact_id:
existing = await self.get_customer_by_contact_id(contact_id)
if existing:
# Update existing customer
return await self.update_customer(existing, customer_data)
# Create new customer (either no contact_id or customer doesn't exist)
return await self.create_customer(customer_data)
async def get_hashed_customer(
self, customer_id: int
) -> Optional[HashedCustomer]:
"""Get the hashed version of a customer.
Args:
customer_id: The customer ID
Returns:
HashedCustomer instance if found, None otherwise
"""
result = await self.session.execute(
select(HashedCustomer).where(
HashedCustomer.customer_id == customer_id
)
)
return result.scalar_one_or_none()
async def hash_existing_customers(self) -> int:
"""Hash all existing customers that don't have a hashed version yet.
This is useful for backfilling hashed data for customers created
before the hashing system was implemented.
Returns:
Number of customers that were hashed
"""
# Get all customers
result = await self.session.execute(select(Customer))
customers = result.scalars().all()
hashed_count = 0
for customer in customers:
# Check if this customer already has a hashed version
existing_hashed = await self.get_hashed_customer(customer.id)
if not existing_hashed:
# Create hashed version
hashed_customer = customer.create_hashed_customer()
hashed_customer.created_at = datetime.now(UTC)
self.session.add(hashed_customer)
hashed_count += 1
if hashed_count > 0:
await self.session.commit()
return hashed_count

View File

@@ -1,3 +1,4 @@
import hashlib
import os
from sqlalchemy import Boolean, Column, Date, DateTime, ForeignKey, Integer, String
@@ -39,6 +40,67 @@ class Customer(Base):
name_title = Column(String) # Added for XML
reservations = relationship("Reservation", back_populates="customer")
@staticmethod
def _normalize_and_hash(value):
"""Normalize and hash a value according to Meta Conversion API requirements."""
if not value:
return None
# Normalize: lowercase, strip whitespace
normalized = str(value).lower().strip()
# Remove spaces for phone numbers
is_phone = normalized.startswith("+") or normalized.replace(
"-", ""
).replace(" ", "").isdigit()
if is_phone:
chars_to_remove = [" ", "-", "(", ")"]
for char in chars_to_remove:
normalized = normalized.replace(char, "")
# SHA256 hash
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()
def create_hashed_customer(self):
"""Create a HashedCustomer instance from this Customer."""
return HashedCustomer(
customer_id=self.id,
contact_id=self.contact_id,
hashed_email=self._normalize_and_hash(self.email_address),
hashed_phone=self._normalize_and_hash(self.phone),
hashed_given_name=self._normalize_and_hash(self.given_name),
hashed_surname=self._normalize_and_hash(self.surname),
hashed_city=self._normalize_and_hash(self.city_name),
hashed_postal_code=self._normalize_and_hash(self.postal_code),
hashed_country_code=self._normalize_and_hash(self.country_code),
hashed_gender=self._normalize_and_hash(self.gender),
hashed_birth_date=self._normalize_and_hash(self.birth_date),
)
class HashedCustomer(Base):
"""Hashed customer data for Meta Conversion API.
Stores SHA256 hashed versions of customer PII according to Meta's requirements.
This allows sending conversion events without exposing raw customer data.
"""
__tablename__ = "hashed_customers"
id = Column(Integer, primary_key=True)
customer_id = Column(
Integer, ForeignKey("customers.id"), unique=True, nullable=False
)
contact_id = Column(String, unique=True) # Keep unhashed for reference
hashed_email = Column(String(64)) # SHA256 produces 64 hex chars
hashed_phone = Column(String(64))
hashed_given_name = Column(String(64))
hashed_surname = Column(String(64))
hashed_city = Column(String(64))
hashed_postal_code = Column(String(64))
hashed_country_code = Column(String(64))
hashed_gender = Column(String(64))
hashed_birth_date = Column(String(64))
created_at = Column(DateTime)
customer = relationship("Customer", backref="hashed_version")
class Reservation(Base):
__tablename__ = "reservations"

View File

@@ -0,0 +1,263 @@
"""Reservation service layer for handling reservation database operations."""
import hashlib
from datetime import UTC, datetime
from typing import Optional
from sqlalchemy import and_, select
from sqlalchemy.ext.asyncio import AsyncSession
from .db import AckedRequest, Customer, Reservation
from .schemas import ReservationData
class ReservationService:
"""Service for managing reservations and related operations.
Handles all database operations for reservations including creation,
retrieval, and acknowledgement tracking.
"""
def __init__(self, session: AsyncSession):
self.session = session
def _convert_reservation_data_to_db(
self, reservation_model: ReservationData, customer_id: int
) -> Reservation:
"""Convert ReservationData to Reservation model.
Args:
reservation_model: ReservationData instance
customer_id: Customer ID to link to
Returns:
Reservation instance ready for database insertion
"""
data = reservation_model.model_dump(exclude_none=True)
# Convert children_ages list to CSV string
children_list = data.pop("children_ages", [])
children_csv = (
",".join(str(int(a)) for a in children_list) if children_list else ""
)
data["children_ages"] = children_csv
# Inject foreign key
data["customer_id"] = customer_id
return Reservation(**data)
async def create_reservation(
self, reservation_data: ReservationData, customer_id: int
) -> Reservation:
"""Create a new reservation.
Args:
reservation_data: ReservationData containing reservation details
customer_id: ID of the customer making the reservation
Returns:
Created Reservation instance
"""
reservation = self._convert_reservation_data_to_db(
reservation_data, customer_id
)
self.session.add(reservation)
await self.session.commit()
await self.session.refresh(reservation)
return reservation
async def get_reservation_by_unique_id(
self, unique_id: str
) -> Optional[Reservation]:
"""Get a reservation by unique_id.
Args:
unique_id: The unique_id to search for
Returns:
Reservation instance if found, None otherwise
"""
result = await self.session.execute(
select(Reservation).where(Reservation.unique_id == unique_id)
)
return result.scalar_one_or_none()
async def get_reservation_by_md5_unique_id(
self, md5_unique_id: str
) -> Optional[Reservation]:
"""Get a reservation by md5_unique_id.
Args:
md5_unique_id: The MD5 hash of unique_id
Returns:
Reservation instance if found, None otherwise
"""
result = await self.session.execute(
select(Reservation).where(
Reservation.md5_unique_id == md5_unique_id
)
)
return result.scalar_one_or_none()
async def check_duplicate_reservation(
self, unique_id: str, md5_unique_id: str
) -> bool:
"""Check if a reservation already exists.
Args:
unique_id: The unique_id to check
md5_unique_id: The MD5 hash to check
Returns:
True if reservation exists, False otherwise
"""
existing = await self.get_reservation_by_unique_id(unique_id)
if existing:
return True
existing_md5 = await self.get_reservation_by_md5_unique_id(md5_unique_id)
return existing_md5 is not None
async def get_reservations_for_customer(
self, customer_id: int
) -> list[Reservation]:
"""Get all reservations for a customer.
Args:
customer_id: The customer ID
Returns:
List of Reservation instances
"""
result = await self.session.execute(
select(Reservation).where(Reservation.customer_id == customer_id)
)
return list(result.scalars().all())
async def get_reservations_with_filters(
self,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
hotel_code: Optional[str] = None,
) -> list[tuple[Reservation, Customer]]:
"""Get reservations with optional filters, joined with customers.
Args:
start_date: Filter by created_at >= this value
end_date: Filter by created_at <= this value
hotel_code: Filter by hotel code
Returns:
List of (Reservation, Customer) tuples
"""
query = select(Reservation, Customer).join(
Customer, Reservation.customer_id == Customer.id
)
filters = []
if start_date:
filters.append(Reservation.created_at >= start_date)
if end_date:
filters.append(Reservation.created_at <= end_date)
if hotel_code:
filters.append(Reservation.hotel_code == hotel_code)
if filters:
query = query.where(and_(*filters))
result = await self.session.execute(query)
return list(result.all())
async def get_unacknowledged_reservations(
self,
client_id: str,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
hotel_code: Optional[str] = None,
) -> list[tuple[Reservation, Customer]]:
"""Get reservations that haven't been acknowledged by a client.
Args:
client_id: The client ID to check acknowledgements for
start_date: Filter by start date >= this value
end_date: Filter by end date <= this value
hotel_code: Filter by hotel code
Returns:
List of (Reservation, Customer) tuples that are unacknowledged
"""
# Get all acknowledged md5_unique_ids for this client
acked_result = await self.session.execute(
select(AckedRequest.unique_id).where(
AckedRequest.client_id == client_id
)
)
acked_md5_ids = {row[0] for row in acked_result.all()}
# Get all reservations with filters
all_reservations = await self.get_reservations_with_filters(
start_date, end_date, hotel_code
)
# Filter out acknowledged ones (comparing md5_unique_id)
return [
(res, cust)
for res, cust in all_reservations
if res.md5_unique_id not in acked_md5_ids
]
async def record_acknowledgement(
self, client_id: str, unique_id: str
) -> AckedRequest:
"""Record that a client has acknowledged a reservation.
Args:
client_id: The client ID
unique_id: The unique_id of the reservation
Returns:
Created AckedRequest instance
"""
acked = AckedRequest(
client_id=client_id,
unique_id=unique_id,
timestamp=datetime.now(UTC),
)
self.session.add(acked)
await self.session.commit()
await self.session.refresh(acked)
return acked
async def is_acknowledged(self, client_id: str, unique_id: str) -> bool:
"""Check if a reservation has been acknowledged by a client.
Args:
client_id: The client ID
unique_id: The reservation unique_id
Returns:
True if acknowledged, False otherwise
"""
result = await self.session.execute(
select(AckedRequest).where(
and_(
AckedRequest.client_id == client_id,
AckedRequest.unique_id == unique_id,
)
)
)
return result.scalar_one_or_none() is not None
@staticmethod
def generate_md5_unique_id(unique_id: str) -> str:
"""Generate MD5 hash of unique_id.
Args:
unique_id: The unique_id to hash
Returns:
MD5 hash as hex string
"""
return hashlib.md5(unique_id.encode("utf-8")).hexdigest()

View File

@@ -0,0 +1,216 @@
"""Tests for CustomerService functionality."""
import pytest
import pytest_asyncio
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from alpine_bits_python.customer_service import CustomerService
from alpine_bits_python.db import Base, Customer, HashedCustomer
@pytest_asyncio.fixture
async def async_session():
"""Create an async session for testing."""
engine = create_async_engine("sqlite+aiosqlite:///:memory:", echo=False)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async_session_maker = async_sessionmaker(engine, expire_on_commit=False)
async with async_session_maker() as session:
yield session
await engine.dispose()
@pytest.mark.asyncio
async def test_create_customer_creates_hashed_version(async_session: AsyncSession):
"""Test that creating a customer automatically creates hashed version."""
service = CustomerService(async_session)
customer_data = {
"given_name": "John",
"surname": "Doe",
"contact_id": "test123",
"email_address": "john@example.com",
"phone": "+1234567890",
}
customer = await service.create_customer(customer_data)
assert customer.id is not None
assert customer.given_name == "John"
# Check that hashed version was created
hashed = await service.get_hashed_customer(customer.id)
assert hashed is not None
assert hashed.customer_id == customer.id
assert hashed.hashed_email is not None
assert hashed.hashed_phone is not None
assert hashed.hashed_given_name is not None
assert hashed.hashed_surname is not None
@pytest.mark.asyncio
async def test_update_customer_updates_hashed_version(async_session: AsyncSession):
"""Test that updating a customer updates the hashed version."""
service = CustomerService(async_session)
# Create initial customer
customer_data = {
"given_name": "John",
"surname": "Doe",
"contact_id": "test123",
"email_address": "john@example.com",
}
customer = await service.create_customer(customer_data)
# Get initial hashed email
hashed = await service.get_hashed_customer(customer.id)
original_hashed_email = hashed.hashed_email
# Update customer email
update_data = {"email_address": "newemail@example.com"}
updated_customer = await service.update_customer(customer, update_data)
# Check that hashed version was updated
updated_hashed = await service.get_hashed_customer(updated_customer.id)
assert updated_hashed.hashed_email != original_hashed_email
@pytest.mark.asyncio
async def test_get_or_create_customer_creates_new(async_session: AsyncSession):
"""Test get_or_create creates new customer when not found."""
service = CustomerService(async_session)
customer_data = {
"given_name": "Jane",
"surname": "Smith",
"contact_id": "new123",
"email_address": "jane@example.com",
}
customer = await service.get_or_create_customer(customer_data)
assert customer.id is not None
assert customer.contact_id == "new123"
# Verify hashed version exists
hashed = await service.get_hashed_customer(customer.id)
assert hashed is not None
@pytest.mark.asyncio
async def test_get_or_create_customer_updates_existing(async_session: AsyncSession):
"""Test get_or_create updates existing customer when found."""
service = CustomerService(async_session)
# Create initial customer
customer_data = {
"given_name": "Jane",
"surname": "Smith",
"contact_id": "existing123",
"email_address": "jane@example.com",
}
original_customer = await service.create_customer(customer_data)
# Try to create again with same contact_id but different data
updated_data = {
"given_name": "Janet",
"surname": "Smith",
"contact_id": "existing123",
"email_address": "janet@example.com",
}
customer = await service.get_or_create_customer(updated_data)
# Should be same customer ID but updated data
assert customer.id == original_customer.id
assert customer.given_name == "Janet"
assert customer.email_address == "janet@example.com"
@pytest.mark.asyncio
async def test_hash_existing_customers_backfills(async_session: AsyncSession):
"""Test that hash_existing_customers backfills missing hashed data."""
# Create a customer directly in DB without using service
customer = Customer(
given_name="Bob",
surname="Builder",
contact_id="bob123",
email_address="bob@example.com",
phone="+9876543210",
)
async_session.add(customer)
await async_session.commit()
await async_session.refresh(customer)
# Verify no hashed version exists
result = await async_session.execute(
select(HashedCustomer).where(HashedCustomer.customer_id == customer.id)
)
hashed = result.scalar_one_or_none()
assert hashed is None
# Run backfill
service = CustomerService(async_session)
count = await service.hash_existing_customers()
assert count == 1
# Verify hashed version now exists
result = await async_session.execute(
select(HashedCustomer).where(HashedCustomer.customer_id == customer.id)
)
hashed = result.scalar_one_or_none()
assert hashed is not None
assert hashed.hashed_email is not None
@pytest.mark.asyncio
async def test_hash_existing_customers_skips_already_hashed(
async_session: AsyncSession,
):
"""Test that hash_existing_customers skips customers already hashed."""
service = CustomerService(async_session)
# Create customer with service (creates hashed version)
customer_data = {
"given_name": "Alice",
"surname": "Wonder",
"contact_id": "alice123",
"email_address": "alice@example.com",
}
await service.create_customer(customer_data)
# Run backfill - should return 0 since customer is already hashed
count = await service.hash_existing_customers()
assert count == 0
@pytest.mark.asyncio
async def test_hashing_normalization(async_session: AsyncSession):
"""Test that hashing properly normalizes data."""
service = CustomerService(async_session)
# Create customer with mixed case and spaces
customer_data = {
"given_name": " John ",
"surname": "DOE",
"contact_id": "test123",
"email_address": " John.Doe@Example.COM ",
"phone": "+1 (234) 567-8900",
}
customer = await service.create_customer(customer_data)
hashed = await service.get_hashed_customer(customer.id)
# Verify hashes exist (normalization should have occurred)
assert hashed.hashed_email is not None
assert hashed.hashed_phone is not None
# Hash should be consistent for same normalized value
from alpine_bits_python.db import Customer as CustomerModel
normalized_email_hash = CustomerModel._normalize_and_hash(
"john.doe@example.com"
)
assert hashed.hashed_email == normalized_email_hash