Files
alpinebits_python/src/alpine_bits_python/customer_service.py

282 lines
10 KiB
Python

"""Customer service layer for handling customer and hashed customer operations."""
from datetime import UTC, datetime
from pydantic import ValidationError
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from .db import Customer
from .logging_config import get_logger
from .schemas import CustomerData
_LOGGER = get_logger(__name__)
class CustomerService:
"""Service for managing customers and their hashed versions.
Automatically maintains hashed customer data whenever customers are
created or updated, ensuring data is always in sync for Meta Conversion API.
"""
def __init__(self, session: AsyncSession):
self.session = session
async def create_customer(self, customer_data: dict, auto_commit: bool = True) -> Customer:
"""Create a new customer and automatically create its hashed version.
Args:
customer_data: Dictionary containing customer fields
auto_commit: If True, commits the transaction. If False, caller must commit.
Returns:
The created Customer instance (with hashed_version relationship populated)
Raises:
ValidationError: If customer_data fails validation
(e.g., invalid country code)
"""
# Validate customer data through Pydantic model
validated_data = CustomerData(**customer_data)
# Create the customer with validated data
# Exclude 'phone_numbers' as Customer model uses 'phone' field
customer = Customer(
**validated_data.model_dump(exclude_none=True, exclude={"phone_numbers"})
)
# Set fields not in CustomerData model separately
if "contact_id" in customer_data:
customer.contact_id = customer_data["contact_id"]
if "phone" in customer_data:
customer.phone = customer_data["phone"]
# Set creation timestamp
customer.created_at = datetime.now(UTC)
# Update hashed fields
customer.update_hashed_fields()
self.session.add(customer)
if auto_commit:
await self.session.commit()
await self.session.refresh(customer)
return customer
async def update_customer(self, customer: Customer, update_data: dict, auto_commit: bool = True) -> Customer:
"""Update an existing customer and sync its hashed version.
Args:
customer: The customer to update
update_data: Dictionary of fields to update
auto_commit: If True, commits the transaction. If False, caller must commit.
Returns:
The updated Customer instance
Raises:
ValidationError: If update_data fails validation
(e.g., invalid country code)
"""
# Validate update data through Pydantic model
# We need to merge with existing data for validation
existing_data = {
"given_name": customer.given_name,
"surname": customer.surname,
"name_prefix": customer.name_prefix,
"email_address": customer.email_address,
"phone": customer.phone,
"email_newsletter": customer.email_newsletter,
"address_line": customer.address_line,
"city_name": customer.city_name,
"postal_code": customer.postal_code,
"country_code": customer.country_code,
"gender": customer.gender,
"birth_date": customer.birth_date,
"language": customer.language,
"address_catalog": customer.address_catalog,
"name_title": customer.name_title,
}
# Merge update_data into existing_data (only CustomerData fields)
# Filter to include only fields that exist in CustomerData model
customer_data_fields = set(CustomerData.model_fields.keys())
# Include 'phone' field (maps to CustomerData)
existing_data.update(
{
k: v
for k, v in update_data.items()
if k in customer_data_fields or k == "phone"
}
)
# Validate merged data
validated_data = CustomerData(**existing_data)
# Update customer fields with validated data
# Exclude 'phone_numbers' as Customer model uses 'phone' field
# Note: We don't use exclude_none=True to allow setting fields to None
for key, value in validated_data.model_dump(exclude={"phone_numbers"}).items():
if hasattr(customer, key):
setattr(customer, key, value)
# Update fields not in CustomerData model separately
if "contact_id" in update_data:
customer.contact_id = update_data["contact_id"]
if "phone" in update_data:
customer.phone = update_data["phone"]
# Update hashed fields
customer.update_hashed_fields()
if auto_commit:
await self.session.commit()
await self.session.refresh(customer)
return customer
async def get_customer_by_contact_id(self, contact_id: str) -> Customer | None:
"""Get a customer by contact_id.
Args:
contact_id: The contact_id to search for
Returns:
Customer instance if found, None otherwise
"""
result = await self.session.execute(
select(Customer).where(Customer.contact_id == contact_id)
)
return result.scalar_one_or_none()
async def get_or_create_customer(self, customer_data: dict, auto_commit: bool = True) -> Customer:
"""Get existing customer or create new one if not found.
Uses contact_id to identify existing customers if provided.
Args:
customer_data: Dictionary containing customer fields
(contact_id is optional)
auto_commit: If True, commits the transaction. If False, caller must commit.
Returns:
Existing or newly created Customer instance
"""
contact_id = customer_data.get("contact_id")
if contact_id:
existing = await self.get_customer_by_contact_id(contact_id)
if existing:
# Update existing customer
return await self.update_customer(existing, customer_data, auto_commit=auto_commit)
# Create new customer (either no contact_id or customer doesn't exist)
return await self.create_customer(customer_data, auto_commit=auto_commit)
async def get_customer(self, customer_id: int) -> Customer | None:
"""Get the hashed version of a customer.
Args:
customer_id: The customer ID
Returns:
Customer instance if found, None otherwise
"""
result = await self.session.execute(
select(Customer).where(Customer.id == customer_id)
)
return result.scalar_one_or_none()
async def hash_existing_customers(self) -> int:
"""Hash all existing customers that don't have hashed fields populated yet.
This is useful for backfilling hashed data for customers created
before the hashing system was implemented, or after migrating from
the separate hashed_customers table.
Also validates and sanitizes customer data (e.g., normalizes country
codes to uppercase). Customers with invalid data that cannot be fixed
will be skipped and logged.
Returns:
Number of customers that were hashed
"""
# Get all customers without hashed data
result = await self.session.execute(
select(Customer).where(Customer.hashed_email.is_(None))
)
customers = result.scalars().all()
hashed_count = 0
skipped_count = 0
for customer in customers:
# Validate and sanitize customer data before hashing
customer_dict = {
"given_name": customer.given_name,
"surname": customer.surname,
"name_prefix": customer.name_prefix,
"email_address": customer.email_address,
"phone": customer.phone,
"email_newsletter": customer.email_newsletter,
"address_line": customer.address_line,
"city_name": customer.city_name,
"postal_code": customer.postal_code,
"country_code": customer.country_code,
"gender": customer.gender,
"birth_date": customer.birth_date,
"language": customer.language,
"address_catalog": customer.address_catalog,
"name_title": customer.name_title,
}
try:
# Validate through Pydantic (normalizes country code)
validated = CustomerData(**customer_dict)
# Update customer with sanitized data
# Exclude 'phone_numbers' as Customer model uses 'phone' field
for key, value in validated.model_dump(
exclude_none=True, exclude={"phone_numbers"}
).items():
if hasattr(customer, key):
setattr(customer, key, value)
# Update hashed fields with sanitized data
customer.update_hashed_fields()
# Set created_at if not already set
if not customer.created_at:
customer.created_at = datetime.now(UTC)
hashed_count += 1
except ValidationError as e:
# Skip customers with invalid data and log
skipped_count += 1
_LOGGER.warning(
"Skipping customer ID %s due to validation error: %s",
customer.id,
e,
)
if hashed_count > 0:
await self.session.commit()
if skipped_count > 0:
_LOGGER.warning(
"Skipped %d customers with invalid data. "
"Please fix these customers manually.",
skipped_count,
)
return hashed_count