This commit is contained in:
Jonas Linter
2025-10-22 17:56:59 +02:00
parent cb26ac9132
commit b8f1128108
2 changed files with 448 additions and 3 deletions

View File

@@ -5,7 +5,7 @@ Handles time-series data with metadata caching.
import asyncio
import os
from datetime import datetime
from datetime import datetime, date
from typing import Any, Dict, List, Optional
import asyncpg
@@ -506,3 +506,32 @@ class TimescaleDBClient:
async with self.pool.acquire() as conn:
rows = await conn.fetch(query, account_id, start_time, end_time)
return [dict(row) for row in rows]
async def get_existing_dates_for_account(
self,
account_id: str,
start_date: date,
end_date: date,
) -> List[date]:
"""
Get all dates that already have data for an account.
Args:
account_id: Ad account ID
start_date: Start date (inclusive)
end_date: End date (inclusive)
Returns:
List of dates that have existing data
"""
query = """
SELECT DISTINCT date_start
FROM account_insights
WHERE account_id = $1
AND date_start IS NOT NULL
AND date_start BETWEEN $2 AND $3
ORDER BY date_start
"""
async with self.pool.acquire() as conn:
rows = await conn.fetch(query, account_id, start_date, end_date)
return [row['date_start'] for row in rows]

View File

@@ -600,7 +600,375 @@ class ScheduledInsightsGrabber:
print(f" Ad set insights stored for {account_id} ({count} records)")
async def run_collection_cycle(self, cache_metadata: bool = True):
async def grab_account_insights_for_date_range(
self,
account_id: str,
start_date: date,
end_date: date,
) -> int:
"""
Grab and store account-level insights for a specific date range.
Args:
account_id: Ad account ID
start_date: Start date (inclusive)
end_date: End date (inclusive)
Returns:
Number of records stored
"""
fields = [
AdsInsights.Field.impressions,
AdsInsights.Field.clicks,
AdsInsights.Field.spend,
AdsInsights.Field.cpc,
AdsInsights.Field.cpm,
AdsInsights.Field.ctr,
AdsInsights.Field.cpp,
AdsInsights.Field.reach,
AdsInsights.Field.frequency,
AdsInsights.Field.actions,
AdsInsights.Field.cost_per_action_type,
AdsInsights.Field.date_start,
AdsInsights.Field.date_stop,
]
# Use time_range instead of date_preset for custom date ranges
params = {
"time_range": {
"since": start_date.isoformat(),
"until": end_date.isoformat(),
},
"level": "account",
"time_increment": 1, # Daily breakdown
}
ad_account = AdAccount(account_id)
try:
insights = await self._rate_limited_request(
ad_account.get_insights,
fields=fields,
params=params,
)
except FacebookRequestError as e:
error_code = e.api_error_code()
if error_code in [190, 102]:
raise ValueError(f"Access token is invalid (error {error_code}): {e.api_error_message()}")
raise
# Get account timezone from database
account_timezone = await self._get_account_timezone(account_id)
# Store insights
count = 0
for insight in insights:
insight_dict = dict(insight)
# Compute appropriate timestamp based on date_start and account timezone
date_start_str = insight_dict.get("date_start")
timestamp = self._compute_timestamp(date_start_str, account_timezone)
await self.db.insert_account_insights(
time=timestamp,
account_id=account_id,
data=insight_dict,
date_preset="custom", # Indicate this was a custom date range
)
count += 1
return count
async def grab_campaign_insights_for_date_range(
self,
account_id: str,
start_date: date,
end_date: date,
limit: int = 50,
) -> int:
"""
Grab and store campaign-level insights for a specific date range.
Args:
account_id: Ad account ID
start_date: Start date (inclusive)
end_date: End date (inclusive)
limit: Maximum number of campaigns
Returns:
Number of records stored
"""
fields = [
AdsInsights.Field.campaign_id,
AdsInsights.Field.campaign_name,
AdsInsights.Field.impressions,
AdsInsights.Field.clicks,
AdsInsights.Field.spend,
AdsInsights.Field.ctr,
AdsInsights.Field.cpc,
AdsInsights.Field.cpm,
AdsInsights.Field.reach,
AdsInsights.Field.actions,
AdsInsights.Field.date_start,
AdsInsights.Field.date_stop,
]
params = {
"time_range": {
"since": start_date.isoformat(),
"until": end_date.isoformat(),
},
"level": "campaign",
"time_increment": 1, # Daily breakdown
"limit": limit,
}
ad_account = AdAccount(account_id)
try:
insights = await self._rate_limited_request(
ad_account.get_insights,
fields=fields,
params=params,
)
except FacebookRequestError as e:
error_code = e.api_error_code()
if error_code in [190, 102]:
raise ValueError(f"Access token is invalid (error {error_code}): {e.api_error_message()}")
raise
# Get account timezone from database
account_timezone = await self._get_account_timezone(account_id)
# Store insights
count = 0
for insight in insights:
campaign_id = insight.get('campaign_id')
if campaign_id:
insight_dict = dict(insight)
date_start_str = insight_dict.get("date_start")
timestamp = self._compute_timestamp(date_start_str, account_timezone)
await self.db.insert_campaign_insights(
time=timestamp,
campaign_id=campaign_id,
account_id=account_id,
data=insight_dict,
date_preset="custom",
cache_metadata=True,
)
count += 1
return count
async def grab_adset_insights_for_date_range(
self,
account_id: str,
start_date: date,
end_date: date,
limit: int = 50,
) -> int:
"""
Grab and store ad set level insights for a specific date range.
Args:
account_id: Ad account ID
start_date: Start date (inclusive)
end_date: End date (inclusive)
limit: Maximum number of ad sets
Returns:
Number of records stored
"""
fields = [
AdsInsights.Field.adset_id,
AdsInsights.Field.adset_name,
AdsInsights.Field.campaign_id,
AdsInsights.Field.impressions,
AdsInsights.Field.clicks,
AdsInsights.Field.spend,
AdsInsights.Field.ctr,
AdsInsights.Field.cpc,
AdsInsights.Field.cpm,
AdsInsights.Field.reach,
AdsInsights.Field.actions,
AdsInsights.Field.date_start,
AdsInsights.Field.date_stop,
]
params = {
"time_range": {
"since": start_date.isoformat(),
"until": end_date.isoformat(),
},
"level": "adset",
"time_increment": 1, # Daily breakdown
"limit": limit,
}
ad_account = AdAccount(account_id)
try:
insights = await self._rate_limited_request(
ad_account.get_insights,
fields=fields,
params=params,
)
except FacebookRequestError as e:
error_code = e.api_error_code()
if error_code in [190, 102]:
raise ValueError(f"Access token is invalid (error {error_code}): {e.api_error_message()}")
raise
# Get account timezone from database
account_timezone = await self._get_account_timezone(account_id)
# Store insights
count = 0
for insight in insights:
adset_id = insight.get('adset_id')
campaign_id = insight.get('campaign_id')
if adset_id and campaign_id:
insight_dict = dict(insight)
date_start_str = insight_dict.get("date_start")
timestamp = self._compute_timestamp(date_start_str, account_timezone)
await self.db.insert_adset_insights(
time=timestamp,
adset_id=adset_id,
campaign_id=campaign_id,
account_id=account_id,
data=insight_dict,
date_preset="custom",
cache_metadata=True,
)
count += 1
return count
async def backfill_missing_data(
self,
account_id: str,
lookback_days: int = 14,
batch_size: int = 7,
) -> Dict[str, int]:
"""
Backfill missing data for the last N days.
This method:
1. Determines the date range based on account timezone
2. Checks which dates already exist in the database
3. Batches missing dates into groups for efficient API calls
4. Fetches and stores missing data
Args:
account_id: Ad account ID
lookback_days: Number of days to look back (default: 14)
batch_size: Number of days to fetch per API call (default: 7)
Returns:
Dictionary with counts of records stored per level
"""
# Get account timezone
account_timezone = await self._get_account_timezone(account_id)
account_tz = ZoneInfo(account_timezone)
# Calculate today in account timezone
now_local = datetime.now(account_tz)
today_local = now_local.date()
# Calculate date range
end_date = today_local - timedelta(days=1) # Yesterday
start_date = end_date - timedelta(days=lookback_days - 1)
print(f" Checking for missing data from {start_date} to {end_date} (account timezone: {account_timezone})")
# Get existing dates from database
existing_dates = await self.db.get_existing_dates_for_account(
account_id=account_id,
start_date=start_date,
end_date=end_date,
)
# Calculate missing dates
all_dates = {start_date + timedelta(days=i) for i in range((end_date - start_date).days + 1)}
existing_dates_set = set(existing_dates)
missing_dates = sorted(all_dates - existing_dates_set)
if not missing_dates:
print(f" No missing dates found - all data present")
return {"account": 0, "campaign": 0, "adset": 0}
print(f" Found {len(missing_dates)} missing dates: {missing_dates[0]} to {missing_dates[-1]}")
# Batch missing dates into groups
total_counts = {"account": 0, "campaign": 0, "adset": 0}
batches = []
current_batch_start = None
previous_date = None
for missing_date in missing_dates:
if current_batch_start is None:
# Start new batch
current_batch_start = missing_date
previous_date = missing_date
elif (missing_date - current_batch_start).days >= batch_size or (missing_date - previous_date).days > 1:
# Batch is full or dates are not consecutive - save batch and start new one
batches.append((current_batch_start, previous_date))
current_batch_start = missing_date
previous_date = missing_date
else:
# Continue current batch
previous_date = missing_date
# Add final batch
if current_batch_start is not None:
batches.append((current_batch_start, previous_date))
print(f" Batched into {len(batches)} API calls")
# Fetch data for each batch
for i, (batch_start, batch_end) in enumerate(batches, 1):
days_in_batch = (batch_end - batch_start).days + 1
print(f" Batch {i}/{len(batches)}: {batch_start} to {batch_end} ({days_in_batch} days)")
try:
# Fetch account insights
account_count = await self.grab_account_insights_for_date_range(
account_id=account_id,
start_date=batch_start,
end_date=batch_end,
)
total_counts["account"] += account_count
# Fetch campaign insights
campaign_count = await self.grab_campaign_insights_for_date_range(
account_id=account_id,
start_date=batch_start,
end_date=batch_end,
limit=50,
)
total_counts["campaign"] += campaign_count
# Fetch adset insights
adset_count = await self.grab_adset_insights_for_date_range(
account_id=account_id,
start_date=batch_start,
end_date=batch_end,
limit=50,
)
total_counts["adset"] += adset_count
print(f" Stored: {account_count} account, {campaign_count} campaign, {adset_count} adset records")
except Exception as e:
print(f" Error fetching batch: {e}")
import traceback
traceback.print_exc()
print(f" Backfill complete: {total_counts['account']} account, {total_counts['campaign']} campaign, {total_counts['adset']} adset records")
return total_counts
async def run_collection_cycle(self, cache_metadata: bool = True, run_backfill: bool = False):
"""
Run a single collection cycle for all ad accounts.
@@ -608,6 +976,7 @@ class ScheduledInsightsGrabber:
cache_metadata: Whether to fetch and cache full metadata (status, objective, etc.)
from separate API calls. Campaign/adset names are always cached
automatically from insights data to prevent foreign key violations.
run_backfill: Whether to run backfill for missing historical data (typically only at startup)
"""
print("\n" + "="*60)
print(f"COLLECTION CYCLE - {datetime.now().isoformat()}")
@@ -619,6 +988,9 @@ class ScheduledInsightsGrabber:
new_day_detected = False
today_date_start = None
# Track accounts that had data today (for backfill eligibility)
accounts_with_data = []
# Loop through all ad accounts
for i, account_id in enumerate(self.ad_account_ids, 1):
print(f"\n[{i}/{len(self.ad_account_ids)}] Processing account: {account_id}")
@@ -642,6 +1014,10 @@ class ScheduledInsightsGrabber:
if today_date_start is None and date_start is not None:
today_date_start = date_start
# Track accounts with data for backfill
if date_start is not None:
accounts_with_data.append(account_id)
print(f"✓ Completed today's data for {account_id}")
except ValueError as e:
@@ -700,6 +1076,37 @@ class ScheduledInsightsGrabber:
self.yesterday_last_fetched = datetime.now(timezone.utc)
print(f"\n✓ Yesterday data fetch completed at {self.yesterday_last_fetched.isoformat()}")
# Run backfill if requested and we have accounts with data
if run_backfill and accounts_with_data:
print("\n" + "="*60)
print("BACKFILLING MISSING HISTORICAL DATA")
print("="*60 + "\n")
for i, account_id in enumerate(accounts_with_data, 1):
print(f"\n[{i}/{len(accounts_with_data)}] Backfilling for: {account_id}")
print("-" * 60)
try:
# Backfill last 14 days
await self.backfill_missing_data(
account_id=account_id,
lookback_days=14,
batch_size=7,
)
print(f"✓ Backfill completed for {account_id}")
except ValueError as e:
print(f"❌ Fatal error - Token validation failed: {e}")
raise
except Exception as e:
print(f"❌ Error during backfill for {account_id}: {e}")
import traceback
traceback.print_exc()
print("\n" + "="*60)
print("BACKFILL COMPLETE")
print("="*60 + "\n")
# Print rate limiter statistics
print("\n" + "-" * 60)
self.rate_limiter.print_stats()
@@ -712,6 +1119,7 @@ class ScheduledInsightsGrabber:
self,
interval_hours: float = 2.0,
refresh_metadata_every_n_cycles: int = 12,
backfill_on_startup: bool = True,
):
"""
Run scheduled data collection for all accessible ad accounts.
@@ -719,6 +1127,7 @@ class ScheduledInsightsGrabber:
Args:
interval_hours: Hours between collection cycles (default: 2)
refresh_metadata_every_n_cycles: Refresh metadata every N cycles (default: 12 = once per day if interval=2h)
backfill_on_startup: Whether to backfill missing historical data on startup (default: True)
"""
print("\n" + "="*60)
print("SCHEDULED INSIGHTS GRABBER STARTED")
@@ -740,6 +1149,7 @@ class ScheduledInsightsGrabber:
print(f"\nCollection interval: {interval_hours} hours")
print(f"Metadata refresh: every {refresh_metadata_every_n_cycles} cycles")
print(f"Backfill on startup: {backfill_on_startup}")
if self.max_accounts:
print(f"Max accounts: {self.max_accounts}")
print("="*60 + "\n")
@@ -756,8 +1166,14 @@ class ScheduledInsightsGrabber:
# Refresh metadata periodically (e.g., once per day)
cache_metadata = (cycle_count % refresh_metadata_every_n_cycles == 1)
# Run backfill only on first cycle if requested
run_backfill = (cycle_count == 1 and backfill_on_startup)
# Run collection
await self.run_collection_cycle(cache_metadata=cache_metadata)
await self.run_collection_cycle(
cache_metadata=cache_metadata,
run_backfill=run_backfill,
)
# Wait for next cycle
wait_seconds = interval_hours * 3600