Files
alpinebits_python/src/alpine_bits_python/csv_import.py
2025-11-19 10:20:48 +01:00

637 lines
24 KiB
Python

"""CSV import functionality for landing page forms and email lead exports.
Handles importing CSV data from landing_page_form.csv and email lead exports
(from extract_leads.py) and creating/updating reservations and customers in
the database. Supports both German (landing page form) and English (email lead
export) column names.
Supported CSV columns (German - Landing Page Form):
- Zeit der Einreichung: Submission timestamp
- Angebot auswählen: Room offer
- Anreisedatum: Check-in date (YYYY-MM-DD or DD.MM.YYYY)
- Abreisedatum: Check-out date (YYYY-MM-DD or DD.MM.YYYY)
- Anzahl Erwachsene: Number of adults
- Anzahl Kinder: Number of children
- Alter Kind 1-10: Ages of children
- Anrede: Title/salutation (e.g., "Herr", "Frau")
- Vorname: First name (required)
- Nachname: Last name (required)
- Email: Email address
- Phone: Phone number
- Message: Customer message/comment
- Einwilligung Marketing: Newsletter opt-in (yes/no, checked/unchecked)
- utm_Source, utm_Medium, utm_Campaign, utm_Term, utm_Content: UTM tracking
- fbclid: Facebook click ID
- gclid: Google click ID
- hotelid: Hotel ID
- hotelname: Hotel name
Supported CSV columns (English - Email Lead Export):
- name: First name (required)
- lastname: Last name (required)
- mail: Email address
- tel: Phone number
- anreise: Check-in date (YYYY-MM-DD or DD.MM.YYYY)
- abreise: Check-out date (YYYY-MM-DD or DD.MM.YYYY)
- erwachsene: Number of adults
- kinder: Number of children
- kind_ages: Child ages as comma-separated string (e.g., "3,6,10")
- apartments: Apartment preferences
- verpflegung: Meal plan preference
- sprache: Language preference
- device: Device information
- anrede: Title/salutation
- land: Country
- privacy: Privacy consent
Duplicate detection uses: name + email + dates + fbclid/gclid combination
"""
import csv
import hashlib
import json
import re
import pandas as pd
from datetime import date, datetime
from io import StringIO
from pathlib import Path
from typing import Any, Optional
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.exc import MultipleResultsFound
from .customer_service import CustomerService
from .db import Customer, Reservation
from .logging_config import get_logger
from .reservation_service import ReservationService
from .schemas import ReservationData
_LOGGER = get_logger(__name__)
class CSVImporter:
"""Handles importing CSV data into the system."""
# Column rename mapping for CSV import
COLUMN_RENAME_MAP = {
# German column names (from landing page form CSV)
"Zeit der Einreichung": "submission_timestamp",
"Angebot auswählen": "room_offer",
"Anreisedatum": "check_in_date",
"Abreisedatum": "check_out_date",
"Anzahl Erwachsene": "num_adults",
"Anzahl Kinder": "num_children",
"Alter Kind 1": "child_1_age",
"Alter Kind 2": "child_2_age",
"Alter Kind 3": "child_3_age",
"Alter Kind 4": "child_4_age",
"Alter Kind 5": "child_5_age",
"Alter Kind 6": "child_6_age",
"Alter Kind 7": "child_7_age",
"Alter Kind 8": "child_8_age",
"Alter Kind 9": "child_9_age",
"Alter Kind 10": "child_10_age",
"Alter Kind 1.1": "child_1_age_duplicate",
"Alter Kind 2.1": "child_2_age_duplicate",
"Anrede": "salutation",
"Vorname": "first_name",
"Nachname": "last_name",
"Email": "email",
"Phone": "phone",
"Message": "message",
"Einwilligung Marketing": "newsletter_opt_in",
"Kinder": "children",
# English column names (from leads export CSV)
"name": "first_name",
"lastname": "last_name",
"mail": "email",
"tel": "phone",
"anreise": "check_in_date",
"abreise": "check_out_date",
"erwachsene": "num_adults",
"kinder": "num_children",
"kind_ages": "kind_ages_csv", # Special handling - comma-separated ages
"apartments": "room_offer",
"verpflegung": "meal_plan",
"sprache": "language",
"device": "device",
"anrede": "salutation",
"land": "country",
"privacy": "privacy_consent",
# German alternate names for leads export columns
"Erwachsene": "num_adults",
"Kinder": "num_children",
# Standard tracking columns
"utm_Source": "utm_source",
"utm_Medium": "utm_medium",
"utm_Campaign": "utm_campaign",
"utm_Term": "utm_term",
"utm_Content": "utm_content",
"utm_term_id": "utm_term_id",
"utm_content_id": "utm_content_id",
"gad_source": "gad_source",
"gad_campaignid": "gad_campaign_id",
"gbraid": "gbraid",
"gclid": "gclid",
"fbclid": "fbclid",
"hotelid": "hotel_id",
"hotelname": "hotel_name",
"roomtypecode": "room_type_code",
"roomclassificationcode": "room_classification_code",
# Handle unnamed columns - these get default names like "Unnamed: 0"
# The age columns appear to be in positions 6-15 (0-indexed) based on dry run output
# We'll handle these via positional renaming in import_csv_file
}
def __init__(self, db_session: AsyncSession, config: dict[str, Any]):
"""Initialize importer.
Args:
db_session: AsyncSession for database operations
config: Application configuration dict
"""
self.db_session = db_session
self.config = config
self.customer_service = CustomerService(db_session)
self.reservation_service = ReservationService(db_session)
def _dryrun_csv_file(self, csv_file_path: str) -> dict[str, Any]:
"""Parse CSV file and return first 10 rows without importing.
Args:
csv_file_path: Path to CSV file
Returns:
Dictionary with headers and rows
"""
df = pd.read_csv(csv_file_path, encoding="utf-8-sig", nrows=10).fillna("")
df = self._normalize_csv_columns(df)
return {
"headers": df.columns.tolist(),
"rows": df.to_dict(orient="records"),
}
def _normalize_csv_columns(self, df: pd.DataFrame) -> pd.DataFrame:
"""Normalize and rename CSV columns based on mapping.
Handles both standard column renames and positional renaming for child age columns
that appear in the landing page form CSV format.
"""
# Apply standard column rename mapping
rename_dict = {col: self.COLUMN_RENAME_MAP.get(col, col) for col in df.columns}
df = df.rename(columns=rename_dict)
# Handle positional renaming for child age columns (landing page form format)
# These appear as unnamed columns immediately after num_children
col_list = list(df.columns)
if "num_children" in col_list and "kind_ages_csv" not in col_list:
num_children_idx = col_list.index("num_children")
# Rename the next 10 columns as child ages (1-10)
for i in range(1, 11):
if num_children_idx + i < len(col_list):
col_name = col_list[num_children_idx + i]
if not col_name.startswith("child_"):
df.rename(columns={col_name: f"child_{i}_age"}, inplace=True)
return df
def _get_hotel_info(self, hotel_code: str) -> tuple[str, str]:
"""Get hotel name from config by hotel_code.
Args:
hotel_code: Hotel code to look up
Returns:
Tuple of (hotel_code, hotel_name) from config
"""
for hotel in self.config.get("alpine_bits_auth", []):
if hotel.get("hotel_id") == hotel_code:
return hotel_code, hotel.get("hotel_name", "")
# Fallback to default if not found
return hotel_code, self.config.get("default_hotel_name", "Frangart Inn")
async def find_duplicate_reservation(
self,
first_name: str,
last_name: str,
email: Optional[str],
start_date: date,
end_date: date,
fbclid: Optional[str],
gclid: Optional[str],
) -> Optional[Reservation]:
"""Find if a reservation already exists based on unique criteria.
Uses name, email, dates, fbclid, and gclid to identify duplicates.
Args:
first_name: Customer first name
last_name: Customer last name
email: Customer email
start_date: Reservation start date
end_date: Reservation end date
fbclid: Facebook click ID
gclid: Google click ID
Returns:
Existing Reservation if found, None otherwise
"""
from sqlalchemy import and_, or_, select
# Build a hash from key fields for quick comparison
key_fields = f"{first_name.lower().strip()}|{last_name.lower().strip()}|{email.lower().strip() if email else ''}|{start_date}|{end_date}|{fbclid or ''}|{gclid or ''}"
key_hash = hashlib.md5(key_fields.encode()).hexdigest()
# Query reservations with similar name/email/dates
query = (
select(Reservation)
.select_from(Reservation)
.join(Customer, Reservation.customer_id == Customer.id)
.where(
and_(
Reservation.start_date == start_date,
Reservation.end_date == end_date,
or_(
and_(
Customer.given_name.ilike(first_name),
Customer.surname.ilike(last_name),
),
(email and Customer.email_address.ilike(email)),
),
)
)
)
result = await self.db_session.execute(query)
candidates = result.scalars().all()
# Further filter by fbclid/gclid if provided
for candidate in candidates:
if fbclid and candidate.fbclid == fbclid:
return candidate
if gclid and candidate.gclid == gclid:
return candidate
# If no tracking IDs in input, match on name/email/dates
if not fbclid and not gclid:
return candidate
return None
async def import_csv_file(
self, csv_file_path: str, hotel_code: str, dryrun: bool = False, pre_acknowledge: bool = False, client_id: Optional[str] = None, username: Optional[str] = None
) -> dict[str, Any]:
"""Import reservations from a CSV file.
Args:
csv_file_path: Path to CSV file
hotel_code: Hotel code (mandatory) - used to look up hotel name from config
dryrun: If True, parse and print first 10 rows as JSON without importing
pre_acknowledge: If True, pre-acknowledges all imported reservations
client_id: Client ID for pre-acknowledgement (required if pre_acknowledge=True)
username: Username for pre-acknowledgement (optional, but recommended)
Returns:
Dictionary with import statistics or parsed data (if dryrun=True)
"""
path = Path(csv_file_path)
if not path.exists():
raise FileNotFoundError(f"CSV file not found: {csv_file_path}")
if pre_acknowledge and not client_id:
raise ValueError("client_id is required when pre_acknowledge=True")
# Start a transaction - will rollback on any exception
await self.db_session.begin()
try:
# Handle dry-run mode
if dryrun:
return self._dryrun_csv_file(path)
# Load and prepare CSV
df = pd.read_csv(path, encoding="utf-8-sig").fillna("")
df = self._normalize_csv_columns(df)
stats = {
"total_rows": 0,
"skipped_empty": 0,
"created_customers": 0,
"existing_customers": 0,
"created_reservations": 0,
"skipped_duplicates": 0,
"pre_acknowledged": 0,
"errors": [],
}
# Process each row
for row_num, row in df.iterrows():
stats["total_rows"] += 1
row_num += 2 # Convert to 1-based and account for header
# Extract and validate required fields
first_name = str(row.get("first_name", "")).strip()
last_name = str(row.get("last_name", "")).strip()
email = str(row.get("email", "")).strip()
if not first_name or not last_name:
_LOGGER.warning("Skipping row %d: missing name", row_num)
stats["skipped_empty"] += 1
continue
# Parse and validate dates
start_date = self._parse_date(str(row.get("check_in_date", "")).strip())
end_date = self._parse_date(str(row.get("check_out_date", "")).strip())
if not start_date or not end_date:
_LOGGER.warning("Skipping row %d: invalid or missing dates", row_num)
stats["skipped_empty"] += 1
continue
# Get tracking IDs for duplicate detection
fbclid = str(row.get("fbclid", "")).strip() or None
gclid = str(row.get("gclid", "")).strip() or None
# Check for duplicate reservation
existing_res = await self.find_duplicate_reservation(
first_name, last_name, email or None, start_date, end_date, fbclid, gclid
)
if existing_res:
_LOGGER.info(
"Skipping row %d: duplicate reservation found (ID: %s)",
row_num,
existing_res.unique_id,
)
stats["skipped_duplicates"] += 1
continue
# Get or create customer
customer_data = self._build_customer_data(first_name, last_name, email, row)
customer = await self._find_or_create_customer(customer_data, auto_commit=False)
if customer.id is None:
await self.db_session.flush()
stats["created_customers"] += 1
else:
stats["existing_customers"] += 1
# Parse adult/children counts and extract ages
num_adults = self._parse_int(row.get("num_adults", 1), default=1)
num_children = self._parse_int(row.get("num_children", 0), default=0)
children_ages, age_adjustment, adjusted_num_children = self._extract_children_ages(row, num_children)
num_adults += age_adjustment
num_children = adjusted_num_children if adjusted_num_children > 0 else num_children
# Build and create reservation
reservation = self._build_reservation_data(
row, start_date, end_date, num_adults, num_children,
children_ages, fbclid, gclid, hotel_code, row_num
)
db_reservation = await self.reservation_service.create_reservation(
reservation, customer.id, auto_commit=False
)
stats["created_reservations"] += 1
_LOGGER.info("Created reservation for %s %s", first_name, last_name)
# Pre-acknowledge if requested
if pre_acknowledge and db_reservation.md5_unique_id:
await self.reservation_service.record_acknowledgement(
client_id=client_id,
unique_id=db_reservation.md5_unique_id,
username=username,
auto_commit=False
)
stats["pre_acknowledged"] += 1
except Exception as e:
# Rollback transaction on any error
await self.db_session.rollback()
_LOGGER.exception("CSV import failed, rolling back all changes")
raise
# Commit transaction on success
await self.db_session.commit()
_LOGGER.info("CSV import completed successfully. Stats: %s", stats)
return stats
def _parse_int(self, value: Any, default: int = 0) -> int:
"""Parse value to int, returning default if parsing fails."""
try:
return int(value) if value else default
except (ValueError, TypeError):
return default
def _build_customer_data(self, first_name: str, last_name: str, email: str, row: Any) -> dict:
"""Build customer data dictionary from CSV row."""
return {
"given_name": first_name,
"surname": last_name,
"name_prefix": str(row.get("salutation", "")).strip() or None,
"email_address": email or None,
"phone": str(row.get("phone", "")).strip() or None,
"email_newsletter": self._parse_bool(row.get("newsletter_opt_in")),
"address_line": None,
"city_name": None,
"postal_code": None,
"country_code": None,
"gender": None,
"birth_date": None,
"language": "de",
"address_catalog": False,
"name_title": None,
}
def _build_reservation_data(
self, row: Any, start_date: date, end_date: date, num_adults: int,
num_children: int, children_ages: list[int], fbclid: Optional[str],
gclid: Optional[str], hotel_code: str, row_num: int
) -> ReservationData:
"""Build ReservationData from CSV row."""
submission_ts = str(row.get("submission_timestamp", "")).strip()
submission_id = submission_ts if submission_ts else f"csv_import_{row_num}_{datetime.now().isoformat()}"
final_hotel_code, final_hotel_name = self._get_hotel_info(hotel_code)
room_type_code = str(row.get("room_type_code", "")).strip() or None
room_class_code = str(row.get("room_classification_code", "")).strip() or None
return ReservationData(
unique_id=submission_id,
start_date=start_date,
end_date=end_date,
num_adults=num_adults,
num_children=num_children,
children_ages=children_ages,
hotel_code=final_hotel_code,
hotel_name=final_hotel_name,
offer=str(row.get("room_offer", "")).strip() or None,
user_comment=str(row.get("message", "")).strip() or None,
fbclid=fbclid,
gclid=gclid,
utm_source=str(row.get("utm_source", "")).strip() or None,
utm_medium=str(row.get("utm_medium", "")).strip() or None,
utm_campaign=str(row.get("utm_campaign", "")).strip() or None,
utm_term=str(row.get("utm_term", "")).strip() or None,
utm_content=str(row.get("utm_content", "")).strip() or None,
room_type_code=room_type_code,
room_classification_code=room_class_code,
)
def _parse_date(self, date_str: str) -> Optional[date]:
"""Parse date string in various formats.
Supports: YYYY-MM-DD, DD.MM.YYYY, DD/MM/YYYY
"""
if not date_str or not isinstance(date_str, str):
return None
date_str = date_str.strip()
for fmt in ["%Y-%m-%d", "%d.%m.%Y", "%d/%m/%Y"]:
try:
return datetime.strptime(date_str, fmt).date()
except ValueError:
continue
return None
def _extract_children_ages(self, row: Any, num_children: int) -> tuple[list[int], int, int]:
"""Extract and parse children ages from CSV row.
Handles both CSV format (comma-separated) and individual columns.
Returns (children_ages, adjusted_num_adults, adjusted_num_children) where:
- adjusted_num_adults accounts for 18+ year-olds in the ages list
- adjusted_num_children is the actual count of extracted children ages
"""
children_ages = []
num_adults_adjustment = 0
# Try comma-separated ages first (from leads export format)
kind_ages_csv = str(row.get("kind_ages_csv", "")).strip()
if kind_ages_csv and kind_ages_csv.lower() != "nan":
try:
ages_list = [int(age.strip()) for age in kind_ages_csv.split(",") if age.strip()]
children_ages = [age for age in ages_list if 0 <= age <= 17]
young_adults = [age for age in ages_list if age >= 18]
num_adults_adjustment = len(young_adults)
adjusted_num_children = len(children_ages)
return children_ages, num_adults_adjustment, adjusted_num_children
except (ValueError, TypeError):
pass
# Try individual column ages if no CSV format found
young_adults = []
for i in range(1, 11): # Check child_1_age through child_10_age
age_val = row.get(f"child_{i}_age", "")
if age_val != "" and age_val is not None:
try:
age = int(float(age_val))
if 0 <= age <= 17:
children_ages.append(age)
elif age >= 18:
young_adults.append(age)
except (ValueError, TypeError):
pass
# Check for duplicate child age columns
for i in range(1, 3): # child_1_age_duplicate, child_2_age_duplicate
age_val = row.get(f"child_{i}_age_duplicate", "")
if age_val != "" and age_val is not None:
try:
age = int(float(age_val))
if 0 <= age <= 17:
children_ages.append(age)
elif age >= 18:
young_adults.append(age)
except (ValueError, TypeError):
pass
num_adults_adjustment = len(young_adults)
# Trim ages list if it exceeds num_children
if len(children_ages) > num_children:
num_to_remove = len(children_ages) - num_children
for _ in range(num_to_remove):
if 0 in children_ages:
children_ages.remove(0)
else:
children_ages.pop()
adjusted_num_children = len(children_ages)
return children_ages, num_adults_adjustment, adjusted_num_children
def _parse_bool(self, value: Any) -> Optional[bool]:
"""Parse various boolean representations to bool or None.
Handles: 'yes', 'no', 'true', 'false', 'checked', 'unchecked', etc.
Returns None if value is empty or invalid.
"""
if not value or (isinstance(value, str) and not value.strip()):
return None
str_val = str(value).lower().strip()
if str_val in ("yes", "true", "checked", "1", "y", "t"):
return True
elif str_val in ("no", "false", "unchecked", "0", "n", "f"):
return False
else:
return None
async def _find_or_create_customer(self, customer_data: dict, auto_commit: bool = True) -> Customer:
"""Find existing customer or create new one.
Args:
customer_data: Customer data dictionary
Returns:
Customer instance
"""
from sqlalchemy import and_, select
# Try to find by email and name
email = customer_data.get("email_address")
given_name = customer_data.get("given_name")
surname = customer_data.get("surname")
if email or (given_name and surname):
query = select(Customer)
filters = []
if email:
filters.append(Customer.email_address == email)
if given_name and surname:
filters.append(
and_(
Customer.given_name.ilike(given_name),
Customer.surname.ilike(surname),
)
)
if filters:
from sqlalchemy import or_
query = query.where(or_(*filters))
result = await self.db_session.execute(query)
try:
existing = result.scalar()
except MultipleResultsFound:
compiled_query = query.compile(compile_kwargs={"literal_binds": True})
_LOGGER.error(compiled_query)
if existing:
# Update customer data if needed
existing_customer = await self.customer_service.update_customer(
existing, customer_data, auto_commit=auto_commit
)
return existing_customer
# Create new customer
return await self.customer_service.create_customer(customer_data, auto_commit=auto_commit)