From b8f11281085d7e7d9307c1e9e7c857f5a01f4018 Mon Sep 17 00:00:00 2001 From: Jonas Linter Date: Wed, 22 Oct 2025 17:56:59 +0200 Subject: [PATCH] Backfill --- src/meta_api_grabber/database.py | 31 +- src/meta_api_grabber/scheduled_grabber.py | 420 +++++++++++++++++++++- 2 files changed, 448 insertions(+), 3 deletions(-) diff --git a/src/meta_api_grabber/database.py b/src/meta_api_grabber/database.py index cb54720..d923a72 100644 --- a/src/meta_api_grabber/database.py +++ b/src/meta_api_grabber/database.py @@ -5,7 +5,7 @@ Handles time-series data with metadata caching. import asyncio import os -from datetime import datetime +from datetime import datetime, date from typing import Any, Dict, List, Optional import asyncpg @@ -506,3 +506,32 @@ class TimescaleDBClient: async with self.pool.acquire() as conn: rows = await conn.fetch(query, account_id, start_time, end_time) return [dict(row) for row in rows] + + async def get_existing_dates_for_account( + self, + account_id: str, + start_date: date, + end_date: date, + ) -> List[date]: + """ + Get all dates that already have data for an account. + + Args: + account_id: Ad account ID + start_date: Start date (inclusive) + end_date: End date (inclusive) + + Returns: + List of dates that have existing data + """ + query = """ + SELECT DISTINCT date_start + FROM account_insights + WHERE account_id = $1 + AND date_start IS NOT NULL + AND date_start BETWEEN $2 AND $3 + ORDER BY date_start + """ + async with self.pool.acquire() as conn: + rows = await conn.fetch(query, account_id, start_date, end_date) + return [row['date_start'] for row in rows] diff --git a/src/meta_api_grabber/scheduled_grabber.py b/src/meta_api_grabber/scheduled_grabber.py index 54b791c..6bdaafa 100644 --- a/src/meta_api_grabber/scheduled_grabber.py +++ b/src/meta_api_grabber/scheduled_grabber.py @@ -600,7 +600,375 @@ class ScheduledInsightsGrabber: print(f" Ad set insights stored for {account_id} ({count} records)") - async def run_collection_cycle(self, cache_metadata: bool = True): + async def grab_account_insights_for_date_range( + self, + account_id: str, + start_date: date, + end_date: date, + ) -> int: + """ + Grab and store account-level insights for a specific date range. + + Args: + account_id: Ad account ID + start_date: Start date (inclusive) + end_date: End date (inclusive) + + Returns: + Number of records stored + """ + fields = [ + AdsInsights.Field.impressions, + AdsInsights.Field.clicks, + AdsInsights.Field.spend, + AdsInsights.Field.cpc, + AdsInsights.Field.cpm, + AdsInsights.Field.ctr, + AdsInsights.Field.cpp, + AdsInsights.Field.reach, + AdsInsights.Field.frequency, + AdsInsights.Field.actions, + AdsInsights.Field.cost_per_action_type, + AdsInsights.Field.date_start, + AdsInsights.Field.date_stop, + ] + + # Use time_range instead of date_preset for custom date ranges + params = { + "time_range": { + "since": start_date.isoformat(), + "until": end_date.isoformat(), + }, + "level": "account", + "time_increment": 1, # Daily breakdown + } + + ad_account = AdAccount(account_id) + try: + insights = await self._rate_limited_request( + ad_account.get_insights, + fields=fields, + params=params, + ) + except FacebookRequestError as e: + error_code = e.api_error_code() + if error_code in [190, 102]: + raise ValueError(f"Access token is invalid (error {error_code}): {e.api_error_message()}") + raise + + # Get account timezone from database + account_timezone = await self._get_account_timezone(account_id) + + # Store insights + count = 0 + for insight in insights: + insight_dict = dict(insight) + + # Compute appropriate timestamp based on date_start and account timezone + date_start_str = insight_dict.get("date_start") + timestamp = self._compute_timestamp(date_start_str, account_timezone) + + await self.db.insert_account_insights( + time=timestamp, + account_id=account_id, + data=insight_dict, + date_preset="custom", # Indicate this was a custom date range + ) + count += 1 + + return count + + async def grab_campaign_insights_for_date_range( + self, + account_id: str, + start_date: date, + end_date: date, + limit: int = 50, + ) -> int: + """ + Grab and store campaign-level insights for a specific date range. + + Args: + account_id: Ad account ID + start_date: Start date (inclusive) + end_date: End date (inclusive) + limit: Maximum number of campaigns + + Returns: + Number of records stored + """ + fields = [ + AdsInsights.Field.campaign_id, + AdsInsights.Field.campaign_name, + AdsInsights.Field.impressions, + AdsInsights.Field.clicks, + AdsInsights.Field.spend, + AdsInsights.Field.ctr, + AdsInsights.Field.cpc, + AdsInsights.Field.cpm, + AdsInsights.Field.reach, + AdsInsights.Field.actions, + AdsInsights.Field.date_start, + AdsInsights.Field.date_stop, + ] + + params = { + "time_range": { + "since": start_date.isoformat(), + "until": end_date.isoformat(), + }, + "level": "campaign", + "time_increment": 1, # Daily breakdown + "limit": limit, + } + + ad_account = AdAccount(account_id) + try: + insights = await self._rate_limited_request( + ad_account.get_insights, + fields=fields, + params=params, + ) + except FacebookRequestError as e: + error_code = e.api_error_code() + if error_code in [190, 102]: + raise ValueError(f"Access token is invalid (error {error_code}): {e.api_error_message()}") + raise + + # Get account timezone from database + account_timezone = await self._get_account_timezone(account_id) + + # Store insights + count = 0 + for insight in insights: + campaign_id = insight.get('campaign_id') + if campaign_id: + insight_dict = dict(insight) + + date_start_str = insight_dict.get("date_start") + timestamp = self._compute_timestamp(date_start_str, account_timezone) + + await self.db.insert_campaign_insights( + time=timestamp, + campaign_id=campaign_id, + account_id=account_id, + data=insight_dict, + date_preset="custom", + cache_metadata=True, + ) + count += 1 + + return count + + async def grab_adset_insights_for_date_range( + self, + account_id: str, + start_date: date, + end_date: date, + limit: int = 50, + ) -> int: + """ + Grab and store ad set level insights for a specific date range. + + Args: + account_id: Ad account ID + start_date: Start date (inclusive) + end_date: End date (inclusive) + limit: Maximum number of ad sets + + Returns: + Number of records stored + """ + fields = [ + AdsInsights.Field.adset_id, + AdsInsights.Field.adset_name, + AdsInsights.Field.campaign_id, + AdsInsights.Field.impressions, + AdsInsights.Field.clicks, + AdsInsights.Field.spend, + AdsInsights.Field.ctr, + AdsInsights.Field.cpc, + AdsInsights.Field.cpm, + AdsInsights.Field.reach, + AdsInsights.Field.actions, + AdsInsights.Field.date_start, + AdsInsights.Field.date_stop, + ] + + params = { + "time_range": { + "since": start_date.isoformat(), + "until": end_date.isoformat(), + }, + "level": "adset", + "time_increment": 1, # Daily breakdown + "limit": limit, + } + + ad_account = AdAccount(account_id) + try: + insights = await self._rate_limited_request( + ad_account.get_insights, + fields=fields, + params=params, + ) + except FacebookRequestError as e: + error_code = e.api_error_code() + if error_code in [190, 102]: + raise ValueError(f"Access token is invalid (error {error_code}): {e.api_error_message()}") + raise + + # Get account timezone from database + account_timezone = await self._get_account_timezone(account_id) + + # Store insights + count = 0 + for insight in insights: + adset_id = insight.get('adset_id') + campaign_id = insight.get('campaign_id') + if adset_id and campaign_id: + insight_dict = dict(insight) + + date_start_str = insight_dict.get("date_start") + timestamp = self._compute_timestamp(date_start_str, account_timezone) + + await self.db.insert_adset_insights( + time=timestamp, + adset_id=adset_id, + campaign_id=campaign_id, + account_id=account_id, + data=insight_dict, + date_preset="custom", + cache_metadata=True, + ) + count += 1 + + return count + + async def backfill_missing_data( + self, + account_id: str, + lookback_days: int = 14, + batch_size: int = 7, + ) -> Dict[str, int]: + """ + Backfill missing data for the last N days. + + This method: + 1. Determines the date range based on account timezone + 2. Checks which dates already exist in the database + 3. Batches missing dates into groups for efficient API calls + 4. Fetches and stores missing data + + Args: + account_id: Ad account ID + lookback_days: Number of days to look back (default: 14) + batch_size: Number of days to fetch per API call (default: 7) + + Returns: + Dictionary with counts of records stored per level + """ + # Get account timezone + account_timezone = await self._get_account_timezone(account_id) + account_tz = ZoneInfo(account_timezone) + + # Calculate today in account timezone + now_local = datetime.now(account_tz) + today_local = now_local.date() + + # Calculate date range + end_date = today_local - timedelta(days=1) # Yesterday + start_date = end_date - timedelta(days=lookback_days - 1) + + print(f" Checking for missing data from {start_date} to {end_date} (account timezone: {account_timezone})") + + # Get existing dates from database + existing_dates = await self.db.get_existing_dates_for_account( + account_id=account_id, + start_date=start_date, + end_date=end_date, + ) + + # Calculate missing dates + all_dates = {start_date + timedelta(days=i) for i in range((end_date - start_date).days + 1)} + existing_dates_set = set(existing_dates) + missing_dates = sorted(all_dates - existing_dates_set) + + if not missing_dates: + print(f" No missing dates found - all data present") + return {"account": 0, "campaign": 0, "adset": 0} + + print(f" Found {len(missing_dates)} missing dates: {missing_dates[0]} to {missing_dates[-1]}") + + # Batch missing dates into groups + total_counts = {"account": 0, "campaign": 0, "adset": 0} + batches = [] + current_batch_start = None + previous_date = None + + for missing_date in missing_dates: + if current_batch_start is None: + # Start new batch + current_batch_start = missing_date + previous_date = missing_date + elif (missing_date - current_batch_start).days >= batch_size or (missing_date - previous_date).days > 1: + # Batch is full or dates are not consecutive - save batch and start new one + batches.append((current_batch_start, previous_date)) + current_batch_start = missing_date + previous_date = missing_date + else: + # Continue current batch + previous_date = missing_date + + # Add final batch + if current_batch_start is not None: + batches.append((current_batch_start, previous_date)) + + print(f" Batched into {len(batches)} API calls") + + # Fetch data for each batch + for i, (batch_start, batch_end) in enumerate(batches, 1): + days_in_batch = (batch_end - batch_start).days + 1 + print(f" Batch {i}/{len(batches)}: {batch_start} to {batch_end} ({days_in_batch} days)") + + try: + # Fetch account insights + account_count = await self.grab_account_insights_for_date_range( + account_id=account_id, + start_date=batch_start, + end_date=batch_end, + ) + total_counts["account"] += account_count + + # Fetch campaign insights + campaign_count = await self.grab_campaign_insights_for_date_range( + account_id=account_id, + start_date=batch_start, + end_date=batch_end, + limit=50, + ) + total_counts["campaign"] += campaign_count + + # Fetch adset insights + adset_count = await self.grab_adset_insights_for_date_range( + account_id=account_id, + start_date=batch_start, + end_date=batch_end, + limit=50, + ) + total_counts["adset"] += adset_count + + print(f" Stored: {account_count} account, {campaign_count} campaign, {adset_count} adset records") + + except Exception as e: + print(f" Error fetching batch: {e}") + import traceback + traceback.print_exc() + + print(f" Backfill complete: {total_counts['account']} account, {total_counts['campaign']} campaign, {total_counts['adset']} adset records") + return total_counts + + async def run_collection_cycle(self, cache_metadata: bool = True, run_backfill: bool = False): """ Run a single collection cycle for all ad accounts. @@ -608,6 +976,7 @@ class ScheduledInsightsGrabber: cache_metadata: Whether to fetch and cache full metadata (status, objective, etc.) from separate API calls. Campaign/adset names are always cached automatically from insights data to prevent foreign key violations. + run_backfill: Whether to run backfill for missing historical data (typically only at startup) """ print("\n" + "="*60) print(f"COLLECTION CYCLE - {datetime.now().isoformat()}") @@ -619,6 +988,9 @@ class ScheduledInsightsGrabber: new_day_detected = False today_date_start = None + # Track accounts that had data today (for backfill eligibility) + accounts_with_data = [] + # Loop through all ad accounts for i, account_id in enumerate(self.ad_account_ids, 1): print(f"\n[{i}/{len(self.ad_account_ids)}] Processing account: {account_id}") @@ -642,6 +1014,10 @@ class ScheduledInsightsGrabber: if today_date_start is None and date_start is not None: today_date_start = date_start + # Track accounts with data for backfill + if date_start is not None: + accounts_with_data.append(account_id) + print(f"✓ Completed today's data for {account_id}") except ValueError as e: @@ -700,6 +1076,37 @@ class ScheduledInsightsGrabber: self.yesterday_last_fetched = datetime.now(timezone.utc) print(f"\n✓ Yesterday data fetch completed at {self.yesterday_last_fetched.isoformat()}") + # Run backfill if requested and we have accounts with data + if run_backfill and accounts_with_data: + print("\n" + "="*60) + print("BACKFILLING MISSING HISTORICAL DATA") + print("="*60 + "\n") + + for i, account_id in enumerate(accounts_with_data, 1): + print(f"\n[{i}/{len(accounts_with_data)}] Backfilling for: {account_id}") + print("-" * 60) + + try: + # Backfill last 14 days + await self.backfill_missing_data( + account_id=account_id, + lookback_days=14, + batch_size=7, + ) + print(f"✓ Backfill completed for {account_id}") + + except ValueError as e: + print(f"❌ Fatal error - Token validation failed: {e}") + raise + except Exception as e: + print(f"❌ Error during backfill for {account_id}: {e}") + import traceback + traceback.print_exc() + + print("\n" + "="*60) + print("BACKFILL COMPLETE") + print("="*60 + "\n") + # Print rate limiter statistics print("\n" + "-" * 60) self.rate_limiter.print_stats() @@ -712,6 +1119,7 @@ class ScheduledInsightsGrabber: self, interval_hours: float = 2.0, refresh_metadata_every_n_cycles: int = 12, + backfill_on_startup: bool = True, ): """ Run scheduled data collection for all accessible ad accounts. @@ -719,6 +1127,7 @@ class ScheduledInsightsGrabber: Args: interval_hours: Hours between collection cycles (default: 2) refresh_metadata_every_n_cycles: Refresh metadata every N cycles (default: 12 = once per day if interval=2h) + backfill_on_startup: Whether to backfill missing historical data on startup (default: True) """ print("\n" + "="*60) print("SCHEDULED INSIGHTS GRABBER STARTED") @@ -740,6 +1149,7 @@ class ScheduledInsightsGrabber: print(f"\nCollection interval: {interval_hours} hours") print(f"Metadata refresh: every {refresh_metadata_every_n_cycles} cycles") + print(f"Backfill on startup: {backfill_on_startup}") if self.max_accounts: print(f"Max accounts: {self.max_accounts}") print("="*60 + "\n") @@ -756,8 +1166,14 @@ class ScheduledInsightsGrabber: # Refresh metadata periodically (e.g., once per day) cache_metadata = (cycle_count % refresh_metadata_every_n_cycles == 1) + # Run backfill only on first cycle if requested + run_backfill = (cycle_count == 1 and backfill_on_startup) + # Run collection - await self.run_collection_cycle(cache_metadata=cache_metadata) + await self.run_collection_cycle( + cache_metadata=cache_metadata, + run_backfill=run_backfill, + ) # Wait for next cycle wait_seconds = interval_hours * 3600