Files
alpinebits_python/src/alpine_bits_python/csv_import.py

568 lines
22 KiB
Python

"""CSV import functionality for landing page forms.
Handles importing CSV data from landing_page_form.csv and creating/updating
reservations and customers in the database.
Supported CSV columns:
- Zeit der Einreichung: Submission timestamp
- Angebot auswählen: Room offer
- Anreisedatum: Check-in date (YYYY-MM-DD or DD.MM.YYYY)
- Abreisedatum: Check-out date (YYYY-MM-DD or DD.MM.YYYY)
- Anzahl Erwachsene: Number of adults
- Anzahl Kinder: Number of children
- Alter Kind 1-10: Ages of children
- Anrede: Title/salutation (e.g., "Herr", "Frau")
- Vorname: First name (required)
- Nachname: Last name (required)
- Email: Email address
- Phone: Phone number
- Message: Customer message/comment
- Einwilligung Marketing: Newsletter opt-in (yes/no, checked/unchecked)
- utm_Source, utm_Medium, utm_Campaign, utm_Term, utm_Content: UTM tracking
- fbclid: Facebook click ID
- gclid: Google click ID
- hotelid: Hotel ID
- hotelname: Hotel name
Duplicate detection uses: name + email + dates + fbclid/gclid combination
"""
import csv
import hashlib
import json
import re
import pandas as pd
from datetime import date, datetime
from io import StringIO
from pathlib import Path
from typing import Any, Optional
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.exc import MultipleResultsFound
from .customer_service import CustomerService
from .db import Customer, Reservation
from .logging_config import get_logger
from .reservation_service import ReservationService
from .schemas import ReservationData
_LOGGER = get_logger(__name__)
class CSVImporter:
"""Handles importing CSV data into the system."""
# Column rename mapping for CSV import
COLUMN_RENAME_MAP = {
"Zeit der Einreichung": "submission_timestamp",
"Angebot auswählen": "room_offer",
"Anreisedatum": "check_in_date",
"Abreisedatum": "check_out_date",
"Anzahl Erwachsene": "num_adults",
"Anzahl Kinder": "num_children",
"Alter Kind 1": "child_1_age",
"Alter Kind 2": "child_2_age",
"Alter Kind 3": "child_3_age",
"Alter Kind 4": "child_4_age",
"Alter Kind 5": "child_5_age",
"Alter Kind 6": "child_6_age",
"Alter Kind 7": "child_7_age",
"Alter Kind 8": "child_8_age",
"Alter Kind 9": "child_9_age",
"Alter Kind 10": "child_10_age",
"Alter Kind 1.1": "child_1_age_duplicate",
"Alter Kind 2.1": "child_2_age_duplicate",
"Anrede": "salutation",
"Vorname": "first_name",
"Nachname": "last_name",
"Email": "email",
"Phone": "phone",
"Message": "message",
"Einwilligung Marketing": "newsletter_opt_in",
"utm_Source": "utm_source",
"utm_Medium": "utm_medium",
"utm_Campaign": "utm_campaign",
"utm_Term": "utm_term",
"utm_Content": "utm_content",
"utm_term_id": "utm_term_id",
"utm_content_id": "utm_content_id",
"gad_source": "gad_source",
"gad_campaignid": "gad_campaign_id",
"gbraid": "gbraid",
"gclid": "gclid",
"fbclid": "fbclid",
"hotelid": "hotel_id",
"hotelname": "hotel_name",
"roomtypecode": "room_type_code",
"roomclassificationcode": "room_classification_code",
"Kinder": "children",
# Handle unnamed columns - these get default names like "Unnamed: 0"
# The age columns appear to be in positions 6-15 (0-indexed) based on dry run output
# We'll handle these via positional renaming in import_csv_file
}
def __init__(self, db_session: AsyncSession, config: dict[str, Any]):
"""Initialize importer.
Args:
db_session: AsyncSession for database operations
config: Application configuration dict
"""
self.db_session = db_session
self.config = config
self.customer_service = CustomerService(db_session)
self.reservation_service = ReservationService(db_session)
async def find_duplicate_reservation(
self,
first_name: str,
last_name: str,
email: Optional[str],
start_date: date,
end_date: date,
fbclid: Optional[str],
gclid: Optional[str],
) -> Optional[Reservation]:
"""Find if a reservation already exists based on unique criteria.
Uses name, email, dates, fbclid, and gclid to identify duplicates.
Args:
first_name: Customer first name
last_name: Customer last name
email: Customer email
start_date: Reservation start date
end_date: Reservation end date
fbclid: Facebook click ID
gclid: Google click ID
Returns:
Existing Reservation if found, None otherwise
"""
from sqlalchemy import and_, or_, select
# Build a hash from key fields for quick comparison
key_fields = f"{first_name.lower().strip()}|{last_name.lower().strip()}|{email.lower().strip() if email else ''}|{start_date}|{end_date}|{fbclid or ''}|{gclid or ''}"
key_hash = hashlib.md5(key_fields.encode()).hexdigest()
# Query reservations with similar name/email/dates
query = (
select(Reservation)
.select_from(Reservation)
.join(Customer, Reservation.customer_id == Customer.id)
.where(
and_(
Reservation.start_date == start_date,
Reservation.end_date == end_date,
or_(
and_(
Customer.given_name.ilike(first_name),
Customer.surname.ilike(last_name),
),
(email and Customer.email_address.ilike(email)),
),
)
)
)
result = await self.db_session.execute(query)
candidates = result.scalars().all()
# Further filter by fbclid/gclid if provided
for candidate in candidates:
if fbclid and candidate.fbclid == fbclid:
return candidate
if gclid and candidate.gclid == gclid:
return candidate
# If no tracking IDs in input, match on name/email/dates
if not fbclid and not gclid:
return candidate
return None
async def import_csv_file(
self, csv_file_path: str, hotel_code: Optional[str] = None, dryrun: bool = False
) -> dict[str, Any]:
"""Import reservations from a CSV file.
Args:
csv_file_path: Path to CSV file
hotel_code: Optional hotel code to override CSV values
dryrun: If True, parse and print first 10 rows as JSON without importing
Returns:
Dictionary with import statistics or parsed data (if dryrun=True)
"""
path = Path(csv_file_path)
if not path.exists():
raise FileNotFoundError(f"CSV file not found: {csv_file_path}")
# Start a transaction - will rollback on any exception
await self.db_session.begin()
try:
# Handle dry-run mode
if dryrun:
df = pd.read_csv(path, encoding="utf-8-sig", nrows=10).fillna("")
# Rename columns based on mapping
rename_dict = {col: self.COLUMN_RENAME_MAP.get(col, col) for col in df.columns}
df = df.rename(columns=rename_dict)
dryrun_data = {
"headers": df.columns.tolist(),
"rows": df.to_dict(orient="records"),
}
# Print formatted output
print("\n=== CSV Import Dry Run ===")
print(f"\nHeaders ({len(df.columns)} columns):")
for i, header in enumerate(df.columns, 1):
print(f" {i}. {header}")
print(f"\nFirst {len(df)} rows:")
print(df.to_string())
# Find and print rows with num_children > 0
print("\n=== Rows with num_children > 0 ===")
for row_num, row in df.iterrows():
try:
num_children = int(row.get("num_children", 0) or 0)
if num_children > 0:
print(f"\nRow {row_num + 2}:")
print(row.to_string())
except:
pass
return dryrun_data
# Load CSV with pandas
df = pd.read_csv(path, encoding="utf-8-sig").fillna("")
# Rename columns based on mapping
rename_dict = {col: self.COLUMN_RENAME_MAP.get(col, col) for col in df.columns}
df = df.rename(columns=rename_dict)
# Handle positional renaming for child age columns
# After "num_children" (column 5, 0-indexed), the next 10 columns are child ages
# and columns after that are duplicates (child_1_age_duplicate, child_2_age_duplicate)
col_list = list(df.columns)
if "num_children" in col_list:
num_children_idx = col_list.index("num_children")
# The 10 columns after num_children are child ages (1-10)
for i in range(1, 11):
if num_children_idx + i < len(col_list):
col_name = col_list[num_children_idx + i]
# Only rename if not already renamed
if not col_name.startswith("child_"):
df.rename(columns={col_name: f"child_{i}_age"}, inplace=True)
col_list[num_children_idx + i] = f"child_{i}_age"
# Debug: log the column names after renaming
_LOGGER.debug("CSV columns after rename: %s", list(df.columns))
stats = {
"total_rows": 0,
"skipped_empty": 0,
"created_customers": 0,
"existing_customers": 0,
"created_reservations": 0,
"skipped_duplicates": 0,
"errors": [],
}
# Helper function to parse dates
def parse_date_str(date_str: str) -> Optional[date]:
"""Parse date string in various formats."""
if not date_str or not isinstance(date_str, str):
return None
date_str = date_str.strip()
for fmt in ["%Y-%m-%d", "%d.%m.%Y", "%d/%m/%Y"]:
try:
return datetime.strptime(date_str, fmt).date()
except ValueError:
continue
return None
# Process each row - stop on first error for debugging
for row_num, row in df.iterrows():
stats["total_rows"] += 1
row_num += 2 # Convert to 1-based and account for header
# Extract required fields (using renamed column names)
first_name = str(row.get("first_name", "")).strip()
last_name = str(row.get("last_name", "")).strip()
email = str(row.get("email", "")).strip()
# Validate required name fields
if not first_name or not last_name:
_LOGGER.warning("Skipping row %d: missing name", row_num)
stats["skipped_empty"] += 1
continue
# Parse and validate dates
start_date_str = str(row.get("check_in_date", "")).strip()
end_date_str = str(row.get("check_out_date", "")).strip()
start_date = parse_date_str(start_date_str)
end_date = parse_date_str(end_date_str)
if not start_date or not end_date:
_LOGGER.warning("Skipping row %d: invalid or missing dates", row_num)
stats["skipped_empty"] += 1
continue
# Get tracking IDs for duplicate detection
fbclid = str(row.get("fbclid", "")).strip() or None
gclid = str(row.get("gclid", "")).strip() or None
# Check for duplicate reservation
existing_res = await self.find_duplicate_reservation(
first_name, last_name, email or None, start_date, end_date, fbclid, gclid
)
if existing_res:
_LOGGER.info(
"Skipping row %d: duplicate reservation found (ID: %s)",
row_num,
existing_res.unique_id,
)
stats["skipped_duplicates"] += 1
continue
# Build customer data from CSV row
customer_data = {
"given_name": first_name,
"surname": last_name,
"name_prefix": str(row.get("salutation", "")).strip() or None,
"email_address": email or None,
"phone": str(row.get("phone", "")).strip() or None,
"email_newsletter": self._parse_bool(row.get("newsletter_opt_in")),
"address_line": None,
"city_name": None,
"postal_code": None,
"country_code": None,
"gender": None,
"birth_date": None,
"language": "de",
"address_catalog": False,
"name_title": None,
}
# Get or create customer
customer = await self._find_or_create_customer(customer_data)
if customer.id is None:
await self.db_session.refresh(customer)
stats["created_customers"] += 1
else:
stats["existing_customers"] += 1
# Build reservation data from CSV row
num_adults = int(row.get("num_adults", 1) or 1)
num_children = int(row.get("num_children", 0) or 0)
# Extract children ages from columns (including duplicates)
children_ages = []
# Try to extract ages from renamed columns first
# Check primary child age columns (1-10)
for i in range(1, 11):
age_key = f"child_{i}_age"
age_val = row.get(age_key, "")
if age_val != "" and age_val is not None:
try:
# Handle both int and float values (e.g., 3, 3.0)
age = int(float(age_val))
if 0 <= age <= 17:
children_ages.append(age)
except (ValueError, TypeError):
pass
# Check for duplicate child age columns (e.g., child_1_age_duplicate, child_2_age_duplicate)
for i in range(1, 3): # Only 1.1 and 2.1 duplicates mentioned
age_key = f"child_{i}_age_duplicate"
age_val = row.get(age_key, "")
if age_val != "" and age_val is not None:
try:
# Handle both int and float values (e.g., 3, 3.0)
age = int(float(age_val))
if 0 <= age <= 17:
children_ages.append(age)
except (ValueError, TypeError):
pass
# Debug: log extraction details
_LOGGER.debug(
"Row %d: num_children=%d, extracted %d ages: %s",
row_num,
num_children,
len(children_ages),
children_ages,
)
# If we extracted ages but num_children says there are different number,
# compact the list to match num_children. Remove ages "0" first
if len(children_ages) > num_children:
# Remove ages "0" first, but only as many as needed
num_to_remove = len(children_ages) - num_children
for _ in range(num_to_remove):
if 0 in children_ages:
children_ages.remove(0)
else:
# If no "0" ages left, just remove the last one
children_ages.pop()
# Generate unique ID (use submission timestamp if available, else row number)
submission_ts = str(row.get("submission_timestamp", "")).strip()
if submission_ts:
submission_id = submission_ts
else:
submission_id = f"csv_import_{row_num}_{datetime.now().isoformat()}"
# Determine hotel code and name
final_hotel_code = (
hotel_code
or str(row.get("hotel_id", "")).strip()
or self.config.get("default_hotel_code", "123")
)
final_hotel_name = (
str(row.get("hotel_name", "")).strip()
or self.config.get("default_hotel_name", "Frangart Inn")
)
# Parse room type fields if available
room_type_code = str(row.get("room_type_code", "")).strip() or None
room_class_code = str(row.get("room_classification_code", "")).strip() or None
# Build and validate ReservationData
reservation = ReservationData(
unique_id=submission_id,
start_date=start_date,
end_date=end_date,
num_adults=num_adults,
num_children=num_children,
children_ages=children_ages,
hotel_code=final_hotel_code,
hotel_name=final_hotel_name,
offer=str(row.get("room_offer", "")).strip() or None,
user_comment=str(row.get("message", "")).strip() or None,
fbclid=fbclid,
gclid=gclid,
utm_source=str(row.get("utm_source", "")).strip() or None,
utm_medium=str(row.get("utm_medium", "")).strip() or None,
utm_campaign=str(row.get("utm_campaign", "")).strip() or None,
utm_term=str(row.get("utm_term", "")).strip() or None,
utm_content=str(row.get("utm_content", "")).strip() or None,
room_type_code=room_type_code,
room_classification_code=room_class_code,
)
# Create reservation if customer exists
if customer.id:
await self.reservation_service.create_reservation(
reservation, customer.id
)
stats["created_reservations"] += 1
_LOGGER.info("Created reservation for %s %s", first_name, last_name)
else:
raise ValueError("Failed to get or create customer")
except Exception as e:
# Rollback transaction on any error
await self.db_session.rollback()
_LOGGER.exception("CSV import failed, rolling back all changes")
raise
# Commit transaction on success
await self.db_session.commit()
_LOGGER.info("CSV import completed successfully. Stats: %s", stats)
return stats
def _parse_bool(self, value: Any) -> Optional[bool]:
"""Parse various boolean representations to bool or None.
Handles: 'yes', 'no', 'true', 'false', 'checked', 'unchecked', etc.
Returns None if value is empty or invalid.
"""
if not value or (isinstance(value, str) and not value.strip()):
return None
str_val = str(value).lower().strip()
if str_val in ("yes", "true", "checked", "1", "y", "t"):
return True
elif str_val in ("no", "false", "unchecked", "0", "n", "f"):
return False
else:
return None
async def _find_or_create_customer(self, customer_data: dict) -> Customer:
"""Find existing customer or create new one.
Args:
customer_data: Customer data dictionary
Returns:
Customer instance
"""
from sqlalchemy import and_, select
# Try to find by email and name
email = customer_data.get("email_address")
given_name = customer_data.get("given_name")
surname = customer_data.get("surname")
if email or (given_name and surname):
query = select(Customer)
filters = []
if email:
filters.append(Customer.email_address == email)
if given_name and surname:
filters.append(
and_(
Customer.given_name.ilike(given_name),
Customer.surname.ilike(surname),
)
)
if filters:
from sqlalchemy import or_
query = query.where(or_(*filters))
result = await self.db_session.execute(query)
try:
existing = result.scalar()
except MultipleResultsFound:
compiled_query = query.compile(compile_kwargs={"literal_binds": True})
_LOGGER.error(compiled_query)
if existing:
# Update customer data if needed
try:
existing_customer = await self.customer_service.update_customer(
existing, customer_data
)
except Exception as e:
print(customer_data)
print("---")
print(existing)
raise
return existing_customer
# Create new customer
return await self.customer_service.create_customer(customer_data)