Created a script to update the csv imports that don't have the date

This commit is contained in:
Jonas Linter
2025-11-20 11:14:07 +01:00
parent 8308be3e49
commit ce1fd140c9
7 changed files with 318968 additions and 1158 deletions

View File

@@ -30,6 +30,7 @@ class Lead:
anrede: Optional[str] = None # Salutation
land: Optional[str] = None # Country
privacy: Optional[bool] = None
received_date: Optional[str] = None
def parse_mbox_file(filepath: str) -> List[Lead]:
@@ -64,12 +65,48 @@ def parse_mbox_file(filepath: str) -> List[Lead]:
# Extract lead data from body
lead = parse_email_body(body)
# Extract received date from headers
try:
lead.received_date = extract_received_date(headers)
except ValueError as e:
print(f"WARNING: {e}")
raise
if lead.name or lead.mail: # Only add if we have some data
leads.append(lead)
return leads
def extract_received_date(headers: str) -> Optional[str]:
"""
Extract the Date header from email headers and convert to ISO format.
Args:
headers: Email headers section
Returns:
ISO format date string from the Date header, or None if not found
Raises:
ValueError: If Date header cannot be parsed to ISO format
"""
from email.utils import parsedate_to_datetime
for line in headers.split('\n'):
if line.startswith('Date:'):
# Extract everything after "Date: "
date_value = line[6:].strip()
try:
# Parse the RFC 2822 date format and convert to ISO format
dt = parsedate_to_datetime(date_value)
return dt.isoformat()
except (TypeError, ValueError) as e:
# Raise exception so parsing failures are caught and reported
raise ValueError(f"Failed to parse date '{date_value}': {e}")
return None
def parse_email_body(body: str) -> Lead:
"""
Parse the body of an email to extract lead information.
@@ -172,7 +209,8 @@ def export_to_csv(leads: List[Lead], output_file: str) -> None:
'device',
'anrede',
'land',
'privacy'
'privacy',
'received_date'
]
with open(output_file, 'w', newline='', encoding='utf-8') as f:

36
fetch_and_update_leads.py Normal file
View File

@@ -0,0 +1,36 @@
import psycopg2
from psycopg2.extras import RealDictCursor
import json
import csv
from datetime import datetime
# Database connection
conn = psycopg2.connect(
dbname="meta_insights",
user="meta_user",
password="meta_password",
host="localhost",
port=5555
)
# Set search path to the schema
cursor = conn.cursor(cursor_factory=RealDictCursor)
cursor.execute("SET search_path TO alpinebits")
# Fetch the data
cursor.execute("""
select r.id, r.created_at, r.customer_id, r.unique_id,
c.given_name, c.email
from reservations as r
join customers as c on c.id = r.customer_id
where unique_id like 'csv_%'
order by r.created_at desc
""")
rows = cursor.fetchall()
print(f"Found {len(rows)} rows to update")
for row in rows:
print(f" - {row['given_name']} ({row['email']}): {row['created_at']}")
cursor.close()
conn.close()

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

View File

@@ -9,25 +9,25 @@ select sum(room.total_revenue::float)
where con.reservation_id is not null and room.total_revenue is not null
and res.start_date <= room.arrival_date + INTERVAL '7 days'
;
```
```
select res.created_at, con.reservation_date, res.start_date, room.arrival_date,res.end_date,
select res.created_at,directly_attributable ,con.reservation_date, res.start_date, room.arrival_date,res.end_date,
room.departure_date, reservation_type, booking_channel, advertising_medium,
guest_first_name,guest_last_name, total_revenue,
guest_first_name,guest_last_name, total_revenue,is_regular,
room.room_status
from alpinebits.conversions as con
join alpinebits.conversion_rooms as room on room.conversion_id = con.id
join alpinebits.reservations as res on res.id = con.reservation_id
join alpinebits.conversion_guests as guest on guest.guest_id = con.guest_id
where con.reservation_id is not null and room.total_revenue is not null
and res.start_date <= room.arrival_date + INTERVAL '7 days'
order by reservation_date;
@@ -64,3 +64,20 @@ select round(sum(room.total_revenue::numeric)::numeric, 3), con.advertising_medi
```
```
select sum(room.total_revenue::float), is_regular
from alpinebits.conversions as con
join alpinebits.conversion_rooms as room on room.conversion_id = con.id
join alpinebits.reservations as res on res.id = con.reservation_id
join alpinebits.conversion_guests as g on g.guest_id = con.guest_id
where room.total_revenue is not null
and directly_attributable = true
group by is_regular
;
```

101
update_csv_import_dates.py Normal file
View File

@@ -0,0 +1,101 @@
#!/usr/bin/env python3
"""
Update the created_at timestamps for CSV-imported leads with the new email receive dates.
"""
import asyncio
import csv
from datetime import datetime
from sqlalchemy import text, select
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from src.alpine_bits_python.config_loader import load_config
from src.alpine_bits_python.db import Reservation, Customer
async def main():
# Load config
config = load_config()
db_url = config["database"]["url"]
schema = config["database"]["schema"]
# Create async engine
engine = create_async_engine(db_url)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async with engine.begin() as conn:
await conn.execute(text(f"SET search_path TO {schema}"))
# Load the CSV with the new dates
csv_dates = {}
try:
with open("leads_export.csv", "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
name = row.get("name", "").strip()
lastname = row.get("lastname", "").strip()
email = row.get("mail", "").strip()
received_date = row.get("received_date", "").strip()
if email and received_date:
# Use email as primary key since it's unique
csv_dates[email.lower()] = {
"name": name,
"lastname": lastname,
"received_date": received_date,
}
except FileNotFoundError:
print("ERROR: leads_export.csv not found. Run extract_leads.py first.")
return
print(f"Loaded {len(csv_dates)} date entries from CSV")
# Fetch CSV-imported reservations
async with async_session() as session:
async with engine.begin() as conn:
await conn.execute(text(f"SET search_path TO {schema}"))
# Query for CSV imports
result = await session.execute(
select(Reservation, Customer).join(
Customer, Reservation.customer_id == Customer.id
).where(Reservation.unique_id.like("csv_%"))
)
rows = result.all()
print(f"\nFound {len(rows)} CSV-imported reservations to update")
updated = 0
failed = 0
for reservation, customer in rows:
email = customer.email_address
if email and email.lower() in csv_dates:
new_date_str = csv_dates[email.lower()]["received_date"]
try:
# Parse ISO format date
new_date = datetime.fromisoformat(new_date_str)
old_date = reservation.created_at
print(f" Updating: {customer.given_name} ({email})")
print(f" Old: {old_date}")
print(f" New: {new_date}")
reservation.created_at = new_date
updated += 1
except ValueError as e:
print(f" FAILED to parse date for {email}: {e}")
failed += 1
elif email:
print(f" WARNING: No CSV date found for {customer.given_name} ({email})")
print(f"\nSummary: {updated} updated, {failed} failed")
if updated > 0:
await session.commit()
print("Changes committed to database")
else:
print("No changes made")
await engine.dispose()
if __name__ == "__main__":
asyncio.run(main())