267 lines
7.6 KiB
Python
267 lines
7.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Extract lead information from MBOX email file.
|
|
Parses email entries and extracts structured lead data.
|
|
"""
|
|
|
|
import re
|
|
from dataclasses import dataclass, field, asdict
|
|
from typing import List, Optional
|
|
from datetime import datetime
|
|
import json
|
|
|
|
|
|
@dataclass
|
|
class Lead:
|
|
"""Represents a single lead extracted from email."""
|
|
name: Optional[str] = None
|
|
lastname: Optional[str] = None
|
|
mail: Optional[str] = None
|
|
tel: Optional[str] = None
|
|
anreise: Optional[str] = None # Check-in date
|
|
abreise: Optional[str] = None # Check-out date
|
|
erwachsene: Optional[int] = None # Adults
|
|
kinder: Optional[int] = None # Children
|
|
kind_ages: List[int] = field(default_factory=list) # Children ages
|
|
apartments: List[str] = field(default_factory=list)
|
|
verpflegung: Optional[str] = None # Meal plan
|
|
sprache: Optional[str] = None # Language
|
|
device: Optional[str] = None
|
|
anrede: Optional[str] = None # Salutation
|
|
land: Optional[str] = None # Country
|
|
privacy: Optional[bool] = None
|
|
received_date: Optional[str] = None
|
|
|
|
|
|
def parse_mbox_file(filepath: str) -> List[Lead]:
|
|
"""
|
|
Parse MBOX file and extract lead information.
|
|
|
|
Args:
|
|
filepath: Path to the MBOX file
|
|
|
|
Returns:
|
|
List of Lead objects with extracted data
|
|
"""
|
|
leads = []
|
|
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
# Split by "From " at the beginning of lines to separate emails
|
|
email_blocks = re.split(r'^From \d+@', content, flags=re.MULTILINE)[1:]
|
|
|
|
for email_block in email_blocks:
|
|
# Find the content section after headers (after a blank line)
|
|
# Headers end with a blank line, then the actual form data starts
|
|
parts = email_block.split('\n\n', 1)
|
|
|
|
if len(parts) < 2:
|
|
continue
|
|
|
|
headers = parts[0]
|
|
body = parts[1] if len(parts) > 1 else ""
|
|
|
|
# Extract lead data from body
|
|
lead = parse_email_body(body)
|
|
|
|
# Extract received date from headers
|
|
try:
|
|
lead.received_date = extract_received_date(headers)
|
|
except ValueError as e:
|
|
print(f"WARNING: {e}")
|
|
raise
|
|
|
|
if lead.name or lead.mail: # Only add if we have some data
|
|
leads.append(lead)
|
|
|
|
return leads
|
|
|
|
|
|
def extract_received_date(headers: str) -> Optional[str]:
|
|
"""
|
|
Extract the Date header from email headers and convert to ISO format.
|
|
|
|
Args:
|
|
headers: Email headers section
|
|
|
|
Returns:
|
|
ISO format date string from the Date header, or None if not found
|
|
|
|
Raises:
|
|
ValueError: If Date header cannot be parsed to ISO format
|
|
"""
|
|
from email.utils import parsedate_to_datetime
|
|
|
|
for line in headers.split('\n'):
|
|
if line.startswith('Date:'):
|
|
# Extract everything after "Date: "
|
|
date_value = line[6:].strip()
|
|
try:
|
|
# Parse the RFC 2822 date format and convert to ISO format
|
|
dt = parsedate_to_datetime(date_value)
|
|
return dt.isoformat()
|
|
except (TypeError, ValueError) as e:
|
|
# Raise exception so parsing failures are caught and reported
|
|
raise ValueError(f"Failed to parse date '{date_value}': {e}")
|
|
return None
|
|
|
|
|
|
def parse_email_body(body: str) -> Lead:
|
|
"""
|
|
Parse the body of an email to extract lead information.
|
|
|
|
Args:
|
|
body: Email body content
|
|
|
|
Returns:
|
|
Lead object with extracted data
|
|
"""
|
|
lead = Lead()
|
|
|
|
# Split body into lines for easier parsing
|
|
lines = body.split('\n')
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
|
|
if not line or ':' not in line:
|
|
continue
|
|
|
|
key, value = line.split(':', 1)
|
|
key = key.strip()
|
|
value = value.strip()
|
|
|
|
# Map keys to Lead attributes
|
|
if key == 'Name':
|
|
lead.name = value
|
|
elif key == 'Nachname':
|
|
lead.lastname = value
|
|
elif key == 'Mail':
|
|
lead.mail = value
|
|
elif key == 'Tel':
|
|
lead.tel = value
|
|
elif key == 'Anreise':
|
|
lead.anreise = value
|
|
elif key == 'Abreise':
|
|
lead.abreise = value
|
|
elif key == 'Erwachsene':
|
|
lead.erwachsene = int(value) if value.isdigit() else None
|
|
elif key == 'Kinder':
|
|
lead.kinder = int(value) if value.isdigit() else None
|
|
elif key.startswith('Alter Kind'):
|
|
# Extract age from "Alter Kind 1", "Alter Kind 2", etc.
|
|
try:
|
|
age = int(value)
|
|
lead.kind_ages.append(age)
|
|
except ValueError:
|
|
pass
|
|
elif key == 'Apartment':
|
|
lead.apartments.append(value)
|
|
elif key == 'Verpflegung':
|
|
lead.verpflegung = value
|
|
elif key == 'Sprache':
|
|
lead.sprache = value
|
|
elif key == 'Device':
|
|
lead.device = value
|
|
elif key == 'Anrede':
|
|
lead.anrede = value
|
|
elif key == 'Land':
|
|
lead.land = value
|
|
elif key == 'Privacy':
|
|
lead.privacy = value.lower() == 'on'
|
|
|
|
# Sort child ages to maintain order
|
|
lead.kind_ages.sort()
|
|
|
|
return lead
|
|
|
|
|
|
def export_to_json(leads: List[Lead], output_file: str) -> None:
|
|
"""Export leads to JSON file."""
|
|
data = [asdict(lead) for lead in leads]
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
print(f"Exported {len(leads)} leads to {output_file}")
|
|
|
|
|
|
def export_to_csv(leads: List[Lead], output_file: str) -> None:
|
|
"""Export leads to CSV file."""
|
|
import csv
|
|
|
|
if not leads:
|
|
return
|
|
|
|
# Define CSV headers
|
|
headers = [
|
|
'name',
|
|
'lastname',
|
|
'mail',
|
|
'tel',
|
|
'anreise',
|
|
'abreise',
|
|
'erwachsene',
|
|
'kinder',
|
|
'kind_ages',
|
|
'apartments',
|
|
'verpflegung',
|
|
'sprache',
|
|
'device',
|
|
'anrede',
|
|
'land',
|
|
'privacy',
|
|
'received_date'
|
|
]
|
|
|
|
with open(output_file, 'w', newline='', encoding='utf-8') as f:
|
|
writer = csv.DictWriter(f, fieldnames=headers)
|
|
writer.writeheader()
|
|
|
|
for lead in leads:
|
|
row = asdict(lead)
|
|
# Convert lists to comma-separated strings for CSV
|
|
row['kind_ages'] = ','.join(map(str, row['kind_ages']))
|
|
row['apartments'] = ','.join(row['apartments'])
|
|
row['privacy'] = 'Yes' if row['privacy'] else 'No' if row['privacy'] is False else ''
|
|
writer.writerow(row)
|
|
|
|
print(f"Exported {len(leads)} leads to {output_file}")
|
|
|
|
|
|
def print_summary(leads: List[Lead]) -> None:
|
|
"""Print a summary of extracted leads."""
|
|
print(f"\n{'='*60}")
|
|
print(f"Total leads extracted: {len(leads)}")
|
|
print(f"{'='*60}\n")
|
|
|
|
for i, lead in enumerate(leads, 1):
|
|
print(f"Lead {i}:")
|
|
print(f" Name: {lead.name} {lead.lastname}")
|
|
print(f" Email: {lead.mail}")
|
|
print(f" Phone: {lead.tel}")
|
|
print(f" Check-in: {lead.anreise}, Check-out: {lead.abreise}")
|
|
print(f" Adults: {lead.erwachsene}, Children: {lead.kinder}")
|
|
if lead.kind_ages:
|
|
print(f" Children ages: {lead.kind_ages}")
|
|
if lead.apartments:
|
|
print(f" Apartments: {', '.join(lead.apartments)}")
|
|
print()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
import sys
|
|
|
|
mbox_file = '/home/divusjulius/repos/alpinebits_python/Leads-Bemelmans Apartments.mbox'
|
|
|
|
print(f"Parsing {mbox_file}...")
|
|
leads = parse_mbox_file(mbox_file)
|
|
|
|
# Print summary
|
|
print_summary(leads)
|
|
|
|
# Export to JSON
|
|
export_to_json(leads, 'leads_export.json')
|
|
|
|
# Export to CSV
|
|
export_to_csv(leads, 'leads_export.csv')
|