Massive refactoring. Csv import still works

This commit is contained in:
Jonas Linter
2025-11-19 10:20:48 +01:00
parent 67f5894ccd
commit bbbb4d7847

View File

@@ -161,6 +161,47 @@ class CSVImporter:
self.customer_service = CustomerService(db_session)
self.reservation_service = ReservationService(db_session)
def _dryrun_csv_file(self, csv_file_path: str) -> dict[str, Any]:
"""Parse CSV file and return first 10 rows without importing.
Args:
csv_file_path: Path to CSV file
Returns:
Dictionary with headers and rows
"""
df = pd.read_csv(csv_file_path, encoding="utf-8-sig", nrows=10).fillna("")
df = self._normalize_csv_columns(df)
return {
"headers": df.columns.tolist(),
"rows": df.to_dict(orient="records"),
}
def _normalize_csv_columns(self, df: pd.DataFrame) -> pd.DataFrame:
"""Normalize and rename CSV columns based on mapping.
Handles both standard column renames and positional renaming for child age columns
that appear in the landing page form CSV format.
"""
# Apply standard column rename mapping
rename_dict = {col: self.COLUMN_RENAME_MAP.get(col, col) for col in df.columns}
df = df.rename(columns=rename_dict)
# Handle positional renaming for child age columns (landing page form format)
# These appear as unnamed columns immediately after num_children
col_list = list(df.columns)
if "num_children" in col_list and "kind_ages_csv" not in col_list:
num_children_idx = col_list.index("num_children")
# Rename the next 10 columns as child ages (1-10)
for i in range(1, 11):
if num_children_idx + i < len(col_list):
col_name = col_list[num_children_idx + i]
if not col_name.startswith("child_"):
df.rename(columns={col_name: f"child_{i}_age"}, inplace=True)
return df
def _get_hotel_info(self, hotel_code: str) -> tuple[str, str]:
"""Get hotel name from config by hotel_code.
@@ -270,67 +311,13 @@ class CSVImporter:
await self.db_session.begin()
try:
# Handle dry-run mode
if dryrun:
df = pd.read_csv(path, encoding="utf-8-sig", nrows=10).fillna("")
return self._dryrun_csv_file(path)
# Rename columns based on mapping
rename_dict = {col: self.COLUMN_RENAME_MAP.get(col, col) for col in df.columns}
df = df.rename(columns=rename_dict)
dryrun_data = {
"headers": df.columns.tolist(),
"rows": df.to_dict(orient="records"),
}
# Print formatted output
print("\n=== CSV Import Dry Run ===")
print(f"\nHeaders ({len(df.columns)} columns):")
for i, header in enumerate(df.columns, 1):
print(f" {i}. {header}")
print(f"\nFirst {len(df)} rows:")
print(df.to_string())
# Find and print rows with num_children > 0
print("\n=== Rows with num_children > 0 ===")
for row_num, row in df.iterrows():
try:
num_children = int(row.get("num_children", 0) or 0)
if num_children > 0:
print(f"\nRow {row_num + 2}:")
print(row.to_string())
except:
pass
return dryrun_data
# Load CSV with pandas
# Load and prepare CSV
df = pd.read_csv(path, encoding="utf-8-sig").fillna("")
# Rename columns based on mapping
rename_dict = {col: self.COLUMN_RENAME_MAP.get(col, col) for col in df.columns}
df = df.rename(columns=rename_dict)
# Handle positional renaming for child age columns
# After "num_children" (column 5, 0-indexed), the next 10 columns are child ages
# and columns after that are duplicates (child_1_age_duplicate, child_2_age_duplicate)
# BUT only if we don't already have kind_ages_csv (from leads export format)
col_list = list(df.columns)
if "num_children" in col_list and "kind_ages_csv" not in col_list:
num_children_idx = col_list.index("num_children")
# The 10 columns after num_children are child ages (1-10)
for i in range(1, 11):
if num_children_idx + i < len(col_list):
col_name = col_list[num_children_idx + i]
# Only rename if not already renamed
if not col_name.startswith("child_"):
df.rename(columns={col_name: f"child_{i}_age"}, inplace=True)
col_list[num_children_idx + i] = f"child_{i}_age"
# Debug: log the column names after renaming
_LOGGER.debug("CSV columns after rename: %s", list(df.columns))
df = self._normalize_csv_columns(df)
stats = {
"total_rows": 0,
@@ -343,41 +330,24 @@ class CSVImporter:
"errors": [],
}
# Helper function to parse dates
def parse_date_str(date_str: str) -> Optional[date]:
"""Parse date string in various formats."""
if not date_str or not isinstance(date_str, str):
return None
date_str = date_str.strip()
for fmt in ["%Y-%m-%d", "%d.%m.%Y", "%d/%m/%Y"]:
try:
return datetime.strptime(date_str, fmt).date()
except ValueError:
continue
return None
# Process each row - stop on first error for debugging
# Process each row
for row_num, row in df.iterrows():
stats["total_rows"] += 1
row_num += 2 # Convert to 1-based and account for header
# Extract required fields (using renamed column names)
# Extract and validate required fields
first_name = str(row.get("first_name", "")).strip()
last_name = str(row.get("last_name", "")).strip()
email = str(row.get("email", "")).strip()
# Validate required name fields
if not first_name or not last_name:
_LOGGER.warning("Skipping row %d: missing name", row_num)
stats["skipped_empty"] += 1
continue
# Parse and validate dates
start_date_str = str(row.get("check_in_date", "")).strip()
end_date_str = str(row.get("check_out_date", "")).strip()
start_date = parse_date_str(start_date_str)
end_date = parse_date_str(end_date_str)
start_date = self._parse_date(str(row.get("check_in_date", "")).strip())
end_date = self._parse_date(str(row.get("check_out_date", "")).strip())
if not start_date or not end_date:
_LOGGER.warning("Skipping row %d: invalid or missing dates", row_num)
@@ -402,199 +372,43 @@ class CSVImporter:
stats["skipped_duplicates"] += 1
continue
# Build customer data from CSV row
customer_data = {
"given_name": first_name,
"surname": last_name,
"name_prefix": str(row.get("salutation", "")).strip() or None,
"email_address": email or None,
"phone": str(row.get("phone", "")).strip() or None,
"email_newsletter": self._parse_bool(row.get("newsletter_opt_in")),
"address_line": None,
"city_name": None,
"postal_code": None,
"country_code": None,
"gender": None,
"birth_date": None,
"language": "de",
"address_catalog": False,
"name_title": None,
}
# Get or create customer (without committing)
# Get or create customer
customer_data = self._build_customer_data(first_name, last_name, email, row)
customer = await self._find_or_create_customer(customer_data, auto_commit=False)
if customer.id is None:
await self.db_session.flush() # Flush to get customer.id
await self.db_session.flush()
stats["created_customers"] += 1
else:
stats["existing_customers"] += 1
# Build reservation data from CSV row
try:
num_adults = int(row.get("num_adults", 1) or 1)
except (ValueError, TypeError):
num_adults = 1
# Parse adult/children counts and extract ages
num_adults = self._parse_int(row.get("num_adults", 1), default=1)
num_children = self._parse_int(row.get("num_children", 0), default=0)
children_ages, age_adjustment, adjusted_num_children = self._extract_children_ages(row, num_children)
num_adults += age_adjustment
num_children = adjusted_num_children if adjusted_num_children > 0 else num_children
try:
num_children = int(row.get("num_children", 0) or 0)
except (ValueError, TypeError):
num_children = 0
# Extract children ages from columns (including duplicates)
children_ages = []
# Check if we have kind_ages_csv (from leads export format)
kind_ages_csv = str(row.get("kind_ages_csv", "")).strip()
if kind_ages_csv and kind_ages_csv.lower() != "nan":
# Parse comma-separated ages
try:
ages_list = [int(age.strip()) for age in kind_ages_csv.split(",") if age.strip()]
# Separate valid children (0-17) from young adults (18+)
# 18-year-olds are counted as adults, not children
valid_children = [age for age in ages_list if 0 <= age <= 17]
young_adults = [age for age in ages_list if age >= 18]
children_ages = valid_children
# If we found 18+ year olds, adjust num_children and num_adults accordingly
if young_adults:
num_children = len(valid_children)
num_adults += len(young_adults)
_LOGGER.debug(
f"Row {row_num}: Found {len(young_adults)} young adults (18+). "
f"Adjusted num_children to {num_children}, num_adults to {num_adults}"
)
except (ValueError, TypeError):
pass
# If no kind_ages_csv, try to extract ages from individual columns
if not children_ages:
young_adults = []
# Try to extract ages from renamed columns first
# Check primary child age columns (1-10)
for i in range(1, 11):
age_key = f"child_{i}_age"
age_val = row.get(age_key, "")
if age_val != "" and age_val is not None:
try:
# Handle both int and float values (e.g., 3, 3.0)
age = int(float(age_val))
if 0 <= age <= 17:
children_ages.append(age)
elif age >= 18:
young_adults.append(age)
except (ValueError, TypeError):
pass
# Check for duplicate child age columns (e.g., child_1_age_duplicate, child_2_age_duplicate)
for i in range(1, 3): # Only 1.1 and 2.1 duplicates mentioned
age_key = f"child_{i}_age_duplicate"
age_val = row.get(age_key, "")
if age_val != "" and age_val is not None:
try:
# Handle both int and float values (e.g., 3, 3.0)
age = int(float(age_val))
if 0 <= age <= 17:
children_ages.append(age)
elif age >= 18:
young_adults.append(age)
except (ValueError, TypeError):
pass
# Adjust num_children and num_adults if we found 18+ year olds
if young_adults:
num_children = len(children_ages)
num_adults += len(young_adults)
_LOGGER.debug(
f"Row {row_num}: Found {len(young_adults)} young adults (18+) in individual columns. "
f"Adjusted num_children to {num_children}, num_adults to {num_adults}"
)
# Debug: log extraction details
_LOGGER.debug(
"Row %d: num_children=%d, extracted %d ages: %s, kind_ages_csv=%s",
row_num,
num_children,
len(children_ages),
children_ages,
kind_ages_csv,
# Build and create reservation
reservation = self._build_reservation_data(
row, start_date, end_date, num_adults, num_children,
children_ages, fbclid, gclid, hotel_code, row_num
)
# If we extracted ages but num_children says there are different number,
# compact the list to match num_children. Remove ages "0" first
if len(children_ages) > num_children:
# Remove ages "0" first, but only as many as needed
num_to_remove = len(children_ages) - num_children
for _ in range(num_to_remove):
if 0 in children_ages:
children_ages.remove(0)
else:
# If no "0" ages left, just remove the last one
children_ages.pop()
# Generate unique ID (use submission timestamp if available, else row number)
submission_ts = str(row.get("submission_timestamp", "")).strip()
if submission_ts:
submission_id = submission_ts
else:
submission_id = f"csv_import_{row_num}_{datetime.now().isoformat()}"
# Determine hotel code and name (from config)
final_hotel_code, final_hotel_name = self._get_hotel_info(hotel_code)
# Parse room type fields if available
room_type_code = str(row.get("room_type_code", "")).strip() or None
room_class_code = str(row.get("room_classification_code", "")).strip() or None
# Build and validate ReservationData
reservation = ReservationData(
unique_id=submission_id,
start_date=start_date,
end_date=end_date,
num_adults=num_adults,
num_children=num_children,
children_ages=children_ages,
hotel_code=final_hotel_code,
hotel_name=final_hotel_name,
offer=str(row.get("room_offer", "")).strip() or None,
user_comment=str(row.get("message", "")).strip() or None,
fbclid=fbclid,
gclid=gclid,
utm_source=str(row.get("utm_source", "")).strip() or None,
utm_medium=str(row.get("utm_medium", "")).strip() or None,
utm_campaign=str(row.get("utm_campaign", "")).strip() or None,
utm_term=str(row.get("utm_term", "")).strip() or None,
utm_content=str(row.get("utm_content", "")).strip() or None,
room_type_code=room_type_code,
room_classification_code=room_class_code,
db_reservation = await self.reservation_service.create_reservation(
reservation, customer.id, auto_commit=False
)
stats["created_reservations"] += 1
_LOGGER.info("Created reservation for %s %s", first_name, last_name)
# Create reservation if customer exists (without committing)
if customer.id:
db_reservation = await self.reservation_service.create_reservation(
reservation, customer.id, auto_commit=False
# Pre-acknowledge if requested
if pre_acknowledge and db_reservation.md5_unique_id:
await self.reservation_service.record_acknowledgement(
client_id=client_id,
unique_id=db_reservation.md5_unique_id,
username=username,
auto_commit=False
)
stats["created_reservations"] += 1
_LOGGER.info("Created reservation for %s %s", first_name, last_name)
# Pre-acknowledge if requested
if pre_acknowledge and db_reservation.md5_unique_id:
await self.reservation_service.record_acknowledgement(
client_id=client_id,
unique_id=db_reservation.md5_unique_id,
username=username,
auto_commit=False
)
stats["pre_acknowledged"] += 1
_LOGGER.debug(
"Pre-acknowledged reservation %s for client %s",
db_reservation.md5_unique_id,
username or client_id
)
else:
raise ValueError("Failed to get or create customer")
stats["pre_acknowledged"] += 1
@@ -611,6 +425,148 @@ class CSVImporter:
return stats
def _parse_int(self, value: Any, default: int = 0) -> int:
"""Parse value to int, returning default if parsing fails."""
try:
return int(value) if value else default
except (ValueError, TypeError):
return default
def _build_customer_data(self, first_name: str, last_name: str, email: str, row: Any) -> dict:
"""Build customer data dictionary from CSV row."""
return {
"given_name": first_name,
"surname": last_name,
"name_prefix": str(row.get("salutation", "")).strip() or None,
"email_address": email or None,
"phone": str(row.get("phone", "")).strip() or None,
"email_newsletter": self._parse_bool(row.get("newsletter_opt_in")),
"address_line": None,
"city_name": None,
"postal_code": None,
"country_code": None,
"gender": None,
"birth_date": None,
"language": "de",
"address_catalog": False,
"name_title": None,
}
def _build_reservation_data(
self, row: Any, start_date: date, end_date: date, num_adults: int,
num_children: int, children_ages: list[int], fbclid: Optional[str],
gclid: Optional[str], hotel_code: str, row_num: int
) -> ReservationData:
"""Build ReservationData from CSV row."""
submission_ts = str(row.get("submission_timestamp", "")).strip()
submission_id = submission_ts if submission_ts else f"csv_import_{row_num}_{datetime.now().isoformat()}"
final_hotel_code, final_hotel_name = self._get_hotel_info(hotel_code)
room_type_code = str(row.get("room_type_code", "")).strip() or None
room_class_code = str(row.get("room_classification_code", "")).strip() or None
return ReservationData(
unique_id=submission_id,
start_date=start_date,
end_date=end_date,
num_adults=num_adults,
num_children=num_children,
children_ages=children_ages,
hotel_code=final_hotel_code,
hotel_name=final_hotel_name,
offer=str(row.get("room_offer", "")).strip() or None,
user_comment=str(row.get("message", "")).strip() or None,
fbclid=fbclid,
gclid=gclid,
utm_source=str(row.get("utm_source", "")).strip() or None,
utm_medium=str(row.get("utm_medium", "")).strip() or None,
utm_campaign=str(row.get("utm_campaign", "")).strip() or None,
utm_term=str(row.get("utm_term", "")).strip() or None,
utm_content=str(row.get("utm_content", "")).strip() or None,
room_type_code=room_type_code,
room_classification_code=room_class_code,
)
def _parse_date(self, date_str: str) -> Optional[date]:
"""Parse date string in various formats.
Supports: YYYY-MM-DD, DD.MM.YYYY, DD/MM/YYYY
"""
if not date_str or not isinstance(date_str, str):
return None
date_str = date_str.strip()
for fmt in ["%Y-%m-%d", "%d.%m.%Y", "%d/%m/%Y"]:
try:
return datetime.strptime(date_str, fmt).date()
except ValueError:
continue
return None
def _extract_children_ages(self, row: Any, num_children: int) -> tuple[list[int], int, int]:
"""Extract and parse children ages from CSV row.
Handles both CSV format (comma-separated) and individual columns.
Returns (children_ages, adjusted_num_adults, adjusted_num_children) where:
- adjusted_num_adults accounts for 18+ year-olds in the ages list
- adjusted_num_children is the actual count of extracted children ages
"""
children_ages = []
num_adults_adjustment = 0
# Try comma-separated ages first (from leads export format)
kind_ages_csv = str(row.get("kind_ages_csv", "")).strip()
if kind_ages_csv and kind_ages_csv.lower() != "nan":
try:
ages_list = [int(age.strip()) for age in kind_ages_csv.split(",") if age.strip()]
children_ages = [age for age in ages_list if 0 <= age <= 17]
young_adults = [age for age in ages_list if age >= 18]
num_adults_adjustment = len(young_adults)
adjusted_num_children = len(children_ages)
return children_ages, num_adults_adjustment, adjusted_num_children
except (ValueError, TypeError):
pass
# Try individual column ages if no CSV format found
young_adults = []
for i in range(1, 11): # Check child_1_age through child_10_age
age_val = row.get(f"child_{i}_age", "")
if age_val != "" and age_val is not None:
try:
age = int(float(age_val))
if 0 <= age <= 17:
children_ages.append(age)
elif age >= 18:
young_adults.append(age)
except (ValueError, TypeError):
pass
# Check for duplicate child age columns
for i in range(1, 3): # child_1_age_duplicate, child_2_age_duplicate
age_val = row.get(f"child_{i}_age_duplicate", "")
if age_val != "" and age_val is not None:
try:
age = int(float(age_val))
if 0 <= age <= 17:
children_ages.append(age)
elif age >= 18:
young_adults.append(age)
except (ValueError, TypeError):
pass
num_adults_adjustment = len(young_adults)
# Trim ages list if it exceeds num_children
if len(children_ages) > num_children:
num_to_remove = len(children_ages) - num_children
for _ in range(num_to_remove):
if 0 in children_ages:
children_ages.remove(0)
else:
children_ages.pop()
adjusted_num_children = len(children_ages)
return children_ages, num_adults_adjustment, adjusted_num_children
def _parse_bool(self, value: Any) -> Optional[bool]:
"""Parse various boolean representations to bool or None.
@@ -671,19 +627,9 @@ class CSVImporter:
if existing:
# Update customer data if needed
try:
existing_customer = await self.customer_service.update_customer(
existing, customer_data, auto_commit=auto_commit
)
except Exception as e:
print(customer_data)
print("---")
print(existing)
raise
existing_customer = await self.customer_service.update_customer(
existing, customer_data, auto_commit=auto_commit
)
return existing_customer
# Create new customer