"""Customer service layer for handling customer and hashed customer operations.""" from datetime import UTC, datetime from pydantic import ValidationError from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from .db import Customer from .logging_config import get_logger from .schemas import CustomerData _LOGGER = get_logger(__name__) class CustomerService: """Service for managing customers and their hashed versions. Automatically maintains hashed customer data whenever customers are created or updated, ensuring data is always in sync for Meta Conversion API. """ def __init__(self, session: AsyncSession): self.session = session async def create_customer(self, customer_data: dict, auto_commit: bool = True) -> Customer: """Create a new customer and automatically create its hashed version. Args: customer_data: Dictionary containing customer fields auto_commit: If True, commits the transaction. If False, caller must commit. Returns: The created Customer instance (with hashed_version relationship populated) Raises: ValidationError: If customer_data fails validation (e.g., invalid country code) """ # Validate customer data through Pydantic model validated_data = CustomerData(**customer_data) # Create the customer with validated data # Exclude 'phone_numbers' as Customer model uses 'phone' field customer = Customer( **validated_data.model_dump(exclude_none=True, exclude={"phone_numbers"}) ) # Set fields not in CustomerData model separately if "contact_id" in customer_data: customer.contact_id = customer_data["contact_id"] if "phone" in customer_data: customer.phone = customer_data["phone"] # Set creation timestamp customer.created_at = datetime.now(UTC) # Update hashed fields customer.update_hashed_fields() self.session.add(customer) if auto_commit: await self.session.commit() await self.session.refresh(customer) return customer async def update_customer(self, customer: Customer, update_data: dict, auto_commit: bool = True) -> Customer: """Update an existing customer and sync its hashed version. Args: customer: The customer to update update_data: Dictionary of fields to update auto_commit: If True, commits the transaction. If False, caller must commit. Returns: The updated Customer instance Raises: ValidationError: If update_data fails validation (e.g., invalid country code) """ # Validate update data through Pydantic model # We need to merge with existing data for validation existing_data = { "given_name": customer.given_name, "surname": customer.surname, "name_prefix": customer.name_prefix, "email_address": customer.email_address, "phone": customer.phone, "email_newsletter": customer.email_newsletter, "address_line": customer.address_line, "city_name": customer.city_name, "postal_code": customer.postal_code, "country_code": customer.country_code, "gender": customer.gender, "birth_date": customer.birth_date, "language": customer.language, "address_catalog": customer.address_catalog, "name_title": customer.name_title, } # Merge update_data into existing_data (only CustomerData fields) # Filter to include only fields that exist in CustomerData model customer_data_fields = set(CustomerData.model_fields.keys()) # Include 'phone' field (maps to CustomerData) existing_data.update( { k: v for k, v in update_data.items() if k in customer_data_fields or k == "phone" } ) # Validate merged data validated_data = CustomerData(**existing_data) # Update customer fields with validated data # Exclude 'phone_numbers' as Customer model uses 'phone' field # Note: We don't use exclude_none=True to allow setting fields to None for key, value in validated_data.model_dump(exclude={"phone_numbers"}).items(): if hasattr(customer, key): setattr(customer, key, value) # Update fields not in CustomerData model separately if "contact_id" in update_data: customer.contact_id = update_data["contact_id"] if "phone" in update_data: customer.phone = update_data["phone"] # Update hashed fields customer.update_hashed_fields() if auto_commit: await self.session.commit() await self.session.refresh(customer) return customer async def get_customer_by_contact_id(self, contact_id: str) -> Customer | None: """Get a customer by contact_id. Args: contact_id: The contact_id to search for Returns: Customer instance if found, None otherwise """ result = await self.session.execute( select(Customer).where(Customer.contact_id == contact_id) ) return result.scalar_one_or_none() async def get_or_create_customer(self, customer_data: dict, auto_commit: bool = True) -> Customer: """Get existing customer or create new one if not found. Uses contact_id to identify existing customers if provided. Args: customer_data: Dictionary containing customer fields (contact_id is optional) auto_commit: If True, commits the transaction. If False, caller must commit. Returns: Existing or newly created Customer instance """ contact_id = customer_data.get("contact_id") if contact_id: existing = await self.get_customer_by_contact_id(contact_id) if existing: # Update existing customer return await self.update_customer(existing, customer_data, auto_commit=auto_commit) # Create new customer (either no contact_id or customer doesn't exist) return await self.create_customer(customer_data, auto_commit=auto_commit) async def get_customer(self, customer_id: int) -> Customer | None: """Get the hashed version of a customer. Args: customer_id: The customer ID Returns: Customer instance if found, None otherwise """ result = await self.session.execute( select(Customer).where(Customer.id == customer_id) ) return result.scalar_one_or_none() async def hash_existing_customers(self) -> int: """Hash all existing customers that don't have hashed fields populated yet. This is useful for backfilling hashed data for customers created before the hashing system was implemented, or after migrating from the separate hashed_customers table. Also validates and sanitizes customer data (e.g., normalizes country codes to uppercase). Customers with invalid data that cannot be fixed will be skipped and logged. Returns: Number of customers that were hashed """ # Get all customers without hashed data result = await self.session.execute( select(Customer).where(Customer.hashed_email.is_(None)) ) customers = result.scalars().all() hashed_count = 0 skipped_count = 0 for customer in customers: # Validate and sanitize customer data before hashing customer_dict = { "given_name": customer.given_name, "surname": customer.surname, "name_prefix": customer.name_prefix, "email_address": customer.email_address, "phone": customer.phone, "email_newsletter": customer.email_newsletter, "address_line": customer.address_line, "city_name": customer.city_name, "postal_code": customer.postal_code, "country_code": customer.country_code, "gender": customer.gender, "birth_date": customer.birth_date, "language": customer.language, "address_catalog": customer.address_catalog, "name_title": customer.name_title, } try: # Validate through Pydantic (normalizes country code) validated = CustomerData(**customer_dict) # Update customer with sanitized data # Exclude 'phone_numbers' as Customer model uses 'phone' field for key, value in validated.model_dump( exclude_none=True, exclude={"phone_numbers"} ).items(): if hasattr(customer, key): setattr(customer, key, value) # Update hashed fields with sanitized data customer.update_hashed_fields() # Set created_at if not already set if not customer.created_at: customer.created_at = datetime.now(UTC) hashed_count += 1 except ValidationError as e: # Skip customers with invalid data and log skipped_count += 1 _LOGGER.warning( "Skipping customer ID %s due to validation error: %s", customer.id, e, ) if hashed_count > 0: await self.session.commit() if skipped_count > 0: _LOGGER.warning( "Skipped %d customers with invalid data. " "Please fix these customers manually.", skipped_count, ) return hashed_count