From 3704390448ea3644fa6d3bcc95355e987717da51 Mon Sep 17 00:00:00 2001 From: Jonas Linter Date: Tue, 21 Oct 2025 13:28:29 +0200 Subject: [PATCH] Ratelimiting and timescaledb changes --- .gitignore | 23 ++ README.md | 96 +++++- ...-compose.yml => docker-compose.yml.example | 0 src/meta_api_grabber/auth.py | 136 +++++++- src/meta_api_grabber/insights_grabber.py | 35 +- src/meta_api_grabber/rate_limiter.py | 273 ++++++++++++++++ src/meta_api_grabber/scheduled_grabber.py | 79 ++++- src/meta_api_grabber/token_manager.py | 307 ++++++++++++++++++ 8 files changed, 897 insertions(+), 52 deletions(-) rename docker-compose.yml => docker-compose.yml.example (100%) create mode 100644 src/meta_api_grabber/rate_limiter.py create mode 100644 src/meta_api_grabber/token_manager.py diff --git a/.gitignore b/.gitignore index 4c49bd7..73e9a6e 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,24 @@ +# Environment and secrets .env +.meta_token.json + +# Data output +data/ + +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python + +# Virtual environments +venv/ +env/ +ENV/ + +# IDE +.vscode/ +.idea/ +*.swp +*.swo diff --git a/README.md b/README.md index ce275bc..df5e5d7 100644 --- a/README.md +++ b/README.md @@ -41,17 +41,24 @@ Edit `.env` and add: - **META_AD_ACCOUNT_ID** from Meta Ads Manager (format: `act_1234567890`) - **DATABASE_URL** is pre-configured for local Docker setup -### 4. Get Access Token +### 4. Get Long-Lived Access Token -**Option A: OAuth2 Flow (Recommended)** +**OAuth2 Flow (Recommended - Gets 60-day token)** ```bash uv run python src/meta_api_grabber/auth.py ``` -Follow the prompts to authorize and save your token. -**Option B: Manual Token** +This will: +1. Open OAuth2 authorization in your browser +2. Exchange the code for a short-lived token +3. **Automatically exchange for a long-lived token (60 days)** +4. Save token to `.env` +5. Save token metadata to `.meta_token.json` (for auto-refresh) + +**Manual Token (Not Recommended)** - Get a token from [Graph API Explorer](https://developers.facebook.com/tools/explorer/) - Add it to `.env` as `META_ACCESS_TOKEN` +- Note: Manual tokens won't have auto-refresh capability ### 5. Start Scheduled Collection ```bash @@ -59,6 +66,7 @@ uv run python src/meta_api_grabber/scheduled_grabber.py ``` This will: +- **Automatically refresh tokens** before they expire (checks every cycle) - Collect data every 2 hours using the `today` date preset (recommended by Meta) - Cache metadata (accounts, campaigns, ad sets) twice daily - Store time-series data in TimescaleDB @@ -87,8 +95,15 @@ uv run python src/meta_api_grabber/insights_grabber.py ```bash uv run python src/meta_api_grabber/auth.py ``` -- Interactive flow to get access token -- Saves token to `.env` automatically +- Interactive flow to get long-lived token (60 days) +- Saves token to `.env` and metadata to `.meta_token.json` + +### 4. Check Token Status +```bash +uv run python src/meta_api_grabber/token_manager.py +``` +- Shows token expiry and validity +- Manually refresh if needed ## Data Collected @@ -164,12 +179,69 @@ WHERE account_id = 'act_your_id' ORDER BY bucket; ``` -## Rate Limiting +## Rate Limiting & Backoff -The system is configured to be very conservative: -- **2 seconds delay** between API requests -- **Only 1 concurrent request** at a time +The system implements Meta's best practices for rate limiting: + +### Intelligent Rate Limiting +- **Monitors `x-fb-ads-insights-throttle` header** from every API response +- Tracks both app-level and account-level usage percentages +- **Auto-throttles** when usage exceeds 75% +- **Progressive delays** based on usage (75%: 2x, 85%: 3x, 90%: 5x, 95%: 10x) + +### Exponential Backoff +- **Automatic retries** on rate limit errors (up to 5 attempts) +- **Exponential backoff**: 2s → 4s → 8s → 16s → 32s +- Max backoff: 5 minutes +- Recognizes Meta error codes 17 and 80004 + +### Conservative Defaults +- **2 seconds base delay** between API requests +- **1 concurrent request** at a time - **Top 50 campaigns/adsets** per collection -- **2 hour intervals** between collections +- **2 hour intervals** between scheduled collections -This ensures you stay well within Meta's API rate limits. +### Best Practices Applied +Based on [Meta's official recommendations](https://developers.facebook.com/docs/marketing-api/insights/best-practices/): +- ✅ Monitor rate limit headers +- ✅ Pace queries with wait times +- ✅ Implement backoff when approaching limits +- ✅ Use date presets (e.g., 'today') instead of custom ranges +- ✅ Limit query scope and metrics + +## Token Management + +### Automatic Token Refresh + +The system automatically manages token lifecycle: + +**Token Types:** +- **Short-lived tokens**: Valid for 1-2 hours (obtained from OAuth) +- **Long-lived tokens**: Valid for 60 days (automatically exchanged) + +**Auto-Refresh Logic:** +1. OAuth flow automatically exchanges for 60-day token +2. Token metadata saved to `.meta_token.json` (includes expiry) +3. Scheduled grabber checks token before each cycle +4. Auto-refreshes when < 7 days remaining +5. New token saved and API reinitialized seamlessly + +**Files Created:** +- `.env` - Contains `META_ACCESS_TOKEN` (updated on refresh) +- `.meta_token.json` - Token metadata (expiry, issued_at, etc.) +- Both files are gitignored for security + +**Manual Token Operations:** + +Check token status: +```bash +uv run python src/meta_api_grabber/token_manager.py +``` + +Re-authenticate (if token expires): +```bash +uv run python src/meta_api_grabber/auth.py +``` + +**Long-Running Collection:** +The scheduled grabber runs indefinitely without manual intervention. Token refresh happens automatically every ~53 days (7 days before the 60-day expiry). diff --git a/docker-compose.yml b/docker-compose.yml.example similarity index 100% rename from docker-compose.yml rename to docker-compose.yml.example diff --git a/src/meta_api_grabber/auth.py b/src/meta_api_grabber/auth.py index 61ac89f..0e8c3d5 100644 --- a/src/meta_api_grabber/auth.py +++ b/src/meta_api_grabber/auth.py @@ -1,11 +1,21 @@ """ OAuth2 authentication module for Meta/Facebook API. + +Handles: +- Initial OAuth2 flow for short-lived tokens +- Exchange short-lived for long-lived tokens (60 days) +- Automatic token refresh +- Token persistence """ +import json import os +import time +from datetime import datetime, timedelta from pathlib import Path from typing import Dict, Optional +import requests from dotenv import load_dotenv from requests_oauthlib import OAuth2Session from requests_oauthlib.compliance_fixes import facebook_compliance_fix @@ -91,12 +101,65 @@ class MetaOAuth2: return token - def interactive_auth(self) -> str: + def exchange_for_long_lived_token(self, short_lived_token: str) -> Dict[str, any]: + """ + Exchange short-lived token for long-lived token (60 days). + + Args: + short_lived_token: The short-lived access token from OAuth flow + + Returns: + Dictionary with access_token and expires_in + """ + url = "https://graph.facebook.com/oauth/access_token" + params = { + "grant_type": "fb_exchange_token", + "client_id": self.app_id, + "client_secret": self.app_secret, + "fb_exchange_token": short_lived_token, + } + + response = requests.get(url, params=params) + response.raise_for_status() + + data = response.json() + return { + "access_token": data["access_token"], + "expires_in": data.get("expires_in", 5184000), # Default 60 days + "token_type": data.get("token_type", "bearer"), + } + + def get_token_info(self, access_token: str) -> Dict[str, any]: + """ + Get information about an access token including expiry. + + Args: + access_token: Access token to inspect + + Returns: + Dictionary with token info including expires_at, is_valid, etc. + """ + url = "https://graph.facebook.com/debug_token" + params = { + "input_token": access_token, + "access_token": f"{self.app_id}|{self.app_secret}", + } + + response = requests.get(url, params=params) + response.raise_for_status() + + data = response.json() + return data.get("data", {}) + + def interactive_auth(self, exchange_for_long_lived: bool = True) -> str: """ Run interactive OAuth2 flow to get access token. + Args: + exchange_for_long_lived: If True, exchanges short-lived for long-lived token (60 days) + Returns: - Access token string + Access token string (long-lived if exchange_for_long_lived=True) """ print("\n" + "="*60) print("META/FACEBOOK OAUTH2 AUTHENTICATION") @@ -115,34 +178,55 @@ class MetaOAuth2: # Wait for user to paste the redirect URL redirect_response = input("\nPaste the full redirect URL here: ").strip() - # Fetch the token - print("\nFetching access token...") + # Fetch the short-lived token + print("\nFetching short-lived access token...") token = self.fetch_token(redirect_response) + short_lived_token = token["access_token"] + + # Exchange for long-lived token + if exchange_for_long_lived: + print("Exchanging for long-lived token (60 days)...") + long_lived_data = self.exchange_for_long_lived_token(short_lived_token) + access_token = long_lived_data["access_token"] + expires_in = long_lived_data["expires_in"] + + print(f"\n✅ Long-lived token obtained!") + print(f" Valid for: {expires_in / 86400:.0f} days (~{expires_in / 3600:.0f} hours)") + else: + access_token = short_lived_token + print("\n✅ Short-lived token obtained (valid for ~1-2 hours)") - access_token = token["access_token"] - print("\n" + "="*60) - print("SUCCESS! Access token obtained.") print("="*60) - # Optionally save to .env - self._offer_to_save_token(access_token) + # Save token with metadata + self._offer_to_save_token_with_metadata(access_token) return access_token - def _offer_to_save_token(self, access_token: str): - """Offer to save the access token to .env file.""" - save = input("\nWould you like to save this token to .env? (y/n): ").strip().lower() + def _offer_to_save_token_with_metadata(self, access_token: str): + """Offer to save the access token with metadata to both .env and JSON file.""" + save = input("\nWould you like to save this token? (y/n): ").strip().lower() if save == "y": - env_path = Path(".env") + # Get token info + try: + token_info = self.get_token_info(access_token) + expires_at = token_info.get("expires_at", 0) + is_valid = token_info.get("is_valid", False) + issued_at = token_info.get("issued_at", int(time.time())) + except Exception as e: + print(f"Warning: Could not get token info: {e}") + expires_at = int(time.time()) + 5184000 # Assume 60 days + is_valid = True + issued_at = int(time.time()) - # Read existing .env or create new + # Save to .env + env_path = Path(".env") if env_path.exists(): env_content = env_path.read_text() else: env_content = "" - # Update or add META_ACCESS_TOKEN lines = env_content.split("\n") token_line = f"META_ACCESS_TOKEN={access_token}" updated = False @@ -157,7 +241,27 @@ class MetaOAuth2: lines.append(token_line) env_path.write_text("\n".join(lines)) - print(f"\nToken saved to {env_path}") + print(f"\n✅ Token saved to {env_path}") + + # Save metadata to JSON for token refresh logic + token_metadata = { + "access_token": access_token, + "expires_at": expires_at, + "issued_at": issued_at, + "is_valid": is_valid, + "updated_at": int(time.time()), + } + + token_file = Path(".meta_token.json") + token_file.write_text(json.dumps(token_metadata, indent=2)) + print(f"✅ Token metadata saved to {token_file}") + + # Print expiry info + if expires_at: + expires_dt = datetime.fromtimestamp(expires_at) + days_until_expiry = (expires_dt - datetime.now()).days + print(f"\n📅 Token expires: {expires_dt.strftime('%Y-%m-%d %H:%M:%S')}") + print(f" ({days_until_expiry} days from now)") else: print(f"\nAccess token: {access_token}") print("You can manually add it to .env as META_ACCESS_TOKEN") diff --git a/src/meta_api_grabber/insights_grabber.py b/src/meta_api_grabber/insights_grabber.py index a91e5f0..485e57a 100644 --- a/src/meta_api_grabber/insights_grabber.py +++ b/src/meta_api_grabber/insights_grabber.py @@ -15,9 +15,19 @@ from facebook_business.adobjects.adaccount import AdAccount from facebook_business.adobjects.adsinsights import AdsInsights from facebook_business.api import FacebookAdsApi +from .rate_limiter import MetaRateLimiter + class MetaInsightsGrabber: - """Async grabber for Meta ad insights with conservative rate limiting.""" + """ + Async grabber for Meta ad insights with intelligent rate limiting. + + Features: + - Monitors x-fb-ads-insights-throttle header + - Auto-throttles when approaching rate limits (>75%) + - Exponential backoff on rate limit errors + - Automatic retries with progressive delays + """ def __init__(self, access_token: str = None): """ @@ -56,17 +66,22 @@ class MetaInsightsGrabber: self.ad_account = AdAccount(self.ad_account_id) - # Conservative rate limiting settings - self.request_delay = 2.0 # 2 seconds between requests - self.max_concurrent_requests = 1 # Only 1 request at a time + # Rate limiter with backoff (Meta best practices) + self.rate_limiter = MetaRateLimiter( + base_delay=2.0, + throttle_threshold=75.0, + max_retry_delay=300.0, + max_retries=5, + ) async def _rate_limited_request(self, func, *args, **kwargs): - """Execute a request with rate limiting.""" - await asyncio.sleep(self.request_delay) - # Run the blocking SDK call in a thread pool - # Note: run_in_executor doesn't accept kwargs, so we use a lambda - loop = asyncio.get_event_loop() - return await loop.run_in_executor(None, lambda: func(*args, **kwargs)) + """ + Execute a request with intelligent rate limiting and backoff. + + Monitors x-fb-ads-insights-throttle header and auto-throttles + when usage exceeds 75%. Implements exponential backoff on errors. + """ + return await self.rate_limiter.execute_with_retry(func, *args, **kwargs) async def get_account_insights( self, diff --git a/src/meta_api_grabber/rate_limiter.py b/src/meta_api_grabber/rate_limiter.py new file mode 100644 index 0000000..1063624 --- /dev/null +++ b/src/meta_api_grabber/rate_limiter.py @@ -0,0 +1,273 @@ +""" +Rate limiting and backoff mechanism for Meta Marketing API. + +Based on Meta's best practices: +https://developers.facebook.com/docs/marketing-api/insights/best-practices/ +""" + +import asyncio +import time +from typing import Any, Callable, Dict, Optional + +from facebook_business.api import FacebookAdsApi + + +class MetaRateLimiter: + """ + Rate limiter with exponential backoff for Meta Marketing API. + + Features: + - Monitors x-fb-ads-insights-throttle header + - Automatic throttling when usage > 75% + - Exponential backoff on rate limit errors + - Configurable thresholds + """ + + def __init__( + self, + base_delay: float = 2.0, + throttle_threshold: float = 75.0, + max_retry_delay: float = 300.0, # 5 minutes + max_retries: int = 5, + ): + """ + Initialize rate limiter. + + Args: + base_delay: Base delay between requests in seconds + throttle_threshold: Throttle when usage exceeds this % (0-100) + max_retry_delay: Maximum delay for exponential backoff + max_retries: Maximum number of retries on rate limit errors + """ + self.base_delay = base_delay + self.throttle_threshold = throttle_threshold + self.max_retry_delay = max_retry_delay + self.max_retries = max_retries + + # Track current usage percentages + self.app_usage_pct: float = 0.0 + self.account_usage_pct: float = 0.0 + self.last_check_time: float = time.time() + + # Stats + self.total_requests: int = 0 + self.throttled_requests: int = 0 + self.rate_limit_errors: int = 0 + + def parse_throttle_header(self, response: Any) -> Dict[str, float]: + """ + Parse x-fb-ads-insights-throttle header from response. + + Header format: {"app_id_util_pct": 25.5, "acc_id_util_pct": 10.0} + + Args: + response: API response object + + Returns: + Dictionary with app_id_util_pct and acc_id_util_pct + """ + try: + # Try to get the header from different response types + headers = None + + # Facebook SDK response object + if hasattr(response, '_headers'): + headers = response._headers + elif hasattr(response, 'headers'): + headers = response.headers + elif hasattr(response, '_api_response'): + headers = getattr(response._api_response, 'headers', None) + + if headers: + throttle_header = headers.get('x-fb-ads-insights-throttle', '') + if throttle_header: + import json + throttle_data = json.loads(throttle_header) + return { + 'app_id_util_pct': float(throttle_data.get('app_id_util_pct', 0)), + 'acc_id_util_pct': float(throttle_data.get('acc_id_util_pct', 0)), + } + except Exception as e: + # Silently fail - we'll use conservative defaults + pass + + return {'app_id_util_pct': 0.0, 'acc_id_util_pct': 0.0} + + def update_usage(self, response: Any): + """ + Update usage statistics from API response. + + Args: + response: API response object + """ + throttle_info = self.parse_throttle_header(response) + self.app_usage_pct = throttle_info['app_id_util_pct'] + self.account_usage_pct = throttle_info['acc_id_util_pct'] + self.last_check_time = time.time() + + # Log if we're approaching limits + max_usage = max(self.app_usage_pct, self.account_usage_pct) + if max_usage > self.throttle_threshold: + print(f"\n⚠️ Rate limit warning: {max_usage:.1f}% usage") + print(f" App: {self.app_usage_pct:.1f}%, Account: {self.account_usage_pct:.1f}%") + + def should_throttle(self) -> bool: + """ + Check if we should throttle based on current usage. + + Returns: + True if usage exceeds threshold + """ + max_usage = max(self.app_usage_pct, self.account_usage_pct) + return max_usage > self.throttle_threshold + + def get_throttle_delay(self) -> float: + """ + Calculate delay based on current usage. + + Returns: + Delay in seconds + """ + max_usage = max(self.app_usage_pct, self.account_usage_pct) + + if max_usage < self.throttle_threshold: + return self.base_delay + + # Progressive delay based on usage + # 75% = base_delay, 90% = 2x, 95% = 5x, 99% = 10x + if max_usage >= 95: + multiplier = 10.0 + elif max_usage >= 90: + multiplier = 5.0 + elif max_usage >= 85: + multiplier = 3.0 + else: # 75-85% + multiplier = 2.0 + + delay = self.base_delay * multiplier + return min(delay, self.max_retry_delay) + + async def wait_with_backoff(self, retry_count: int = 0): + """ + Wait with exponential backoff. + + Args: + retry_count: Current retry attempt (0-indexed) + """ + if retry_count == 0: + # Normal delay based on throttle + delay = self.get_throttle_delay() + else: + # Exponential backoff: 2^retry * base_delay + delay = min( + (2 ** retry_count) * self.base_delay, + self.max_retry_delay + ) + + if delay > self.base_delay: + self.throttled_requests += 1 + print(f"⏸️ Throttling for {delay:.1f}s (usage: {max(self.app_usage_pct, self.account_usage_pct):.1f}%)") + + await asyncio.sleep(delay) + + async def execute_with_retry( + self, + func: Callable, + *args, + **kwargs + ) -> Any: + """ + Execute API call with automatic retry and backoff. + + Args: + func: Function to execute (blocking, will be run in executor) + *args: Positional arguments for func + **kwargs: Keyword arguments for func + + Returns: + Result from func + + Raises: + Exception: If all retries exhausted + """ + self.total_requests += 1 + + for retry in range(self.max_retries): + try: + # Wait before request (with potential throttling) + await self.wait_with_backoff(retry_count=retry if retry > 0 else 0) + + # Execute request in thread pool + loop = asyncio.get_event_loop() + result = await loop.run_in_executor(None, lambda: func(*args, **kwargs)) + + # Update usage from response + self.update_usage(result) + + return result + + except Exception as e: + error_message = str(e).lower() + + # Check if it's a rate limit error + is_rate_limit = ( + 'rate limit' in error_message or + 'too many requests' in error_message or + 'throttle' in error_message or + 'error code 17' in error_message or # Meta's rate limit error code + 'error code 80004' in error_message # Insights rate limit + ) + + if is_rate_limit: + self.rate_limit_errors += 1 + + if retry < self.max_retries - 1: + backoff_delay = min( + (2 ** (retry + 1)) * self.base_delay, + self.max_retry_delay + ) + print(f"\n🔄 Rate limit hit! Retrying in {backoff_delay:.1f}s (attempt {retry + 1}/{self.max_retries})") + print(f" Error: {e}") + await asyncio.sleep(backoff_delay) + continue + else: + print(f"\n❌ Rate limit error - max retries exhausted") + raise + + # Not a rate limit error, re-raise immediately + raise + + # Should never reach here + raise Exception("Max retries exhausted") + + def get_stats(self) -> Dict[str, Any]: + """ + Get current rate limiter statistics. + + Returns: + Dictionary with stats + """ + return { + 'total_requests': self.total_requests, + 'throttled_requests': self.throttled_requests, + 'rate_limit_errors': self.rate_limit_errors, + 'app_usage_pct': self.app_usage_pct, + 'account_usage_pct': self.account_usage_pct, + 'max_usage_pct': max(self.app_usage_pct, self.account_usage_pct), + 'is_throttling': self.should_throttle(), + } + + def print_stats(self): + """Print current statistics.""" + stats = self.get_stats() + print("\n" + "="*60) + print("RATE LIMITER STATISTICS") + print("="*60) + print(f"Total Requests: {stats['total_requests']}") + print(f"Throttled Requests: {stats['throttled_requests']}") + print(f"Rate Limit Errors: {stats['rate_limit_errors']}") + print(f"App Usage: {stats['app_usage_pct']:.1f}%") + print(f"Account Usage: {stats['account_usage_pct']:.1f}%") + print(f"Max Usage: {stats['max_usage_pct']:.1f}%") + print(f"Currently Throttled: {stats['is_throttling']}") + print("="*60 + "\n") diff --git a/src/meta_api_grabber/scheduled_grabber.py b/src/meta_api_grabber/scheduled_grabber.py index b770965..5c6964f 100644 --- a/src/meta_api_grabber/scheduled_grabber.py +++ b/src/meta_api_grabber/scheduled_grabber.py @@ -14,6 +14,8 @@ from facebook_business.adobjects.adsinsights import AdsInsights from facebook_business.api import FacebookAdsApi from .database import TimescaleDBClient +from .rate_limiter import MetaRateLimiter +from .token_manager import MetaTokenManager class ScheduledInsightsGrabber: @@ -24,19 +26,32 @@ class ScheduledInsightsGrabber: - Uses 'today' date preset (recommended by Meta) - Caches metadata (accounts, campaigns, adsets) - Upserts time-series data - - Conservative rate limiting + - Intelligent rate limiting with exponential backoff + - Monitors x-fb-ads-insights-throttle header + - Auto-throttles when approaching limits """ - def __init__(self, access_token: Optional[str] = None): + def __init__(self, access_token: Optional[str] = None, auto_refresh_token: bool = True): """ Initialize the scheduled grabber. Args: access_token: Optional access token. If not provided, loads from env. + auto_refresh_token: If True, automatically refreshes tokens before expiry """ load_dotenv() - self.access_token = access_token or os.getenv("META_ACCESS_TOKEN") + # Token manager for automatic refresh + self.token_manager = MetaTokenManager() if auto_refresh_token else None + + # Get valid token (auto-refresh if needed) + if access_token: + self.access_token = access_token + elif self.token_manager: + self.access_token = self.token_manager.get_valid_token() + else: + self.access_token = os.getenv("META_ACCESS_TOKEN") + self.app_secret = os.getenv("META_APP_SECRET") self.app_id = os.getenv("META_APP_ID") self.ad_account_id = os.getenv("META_AD_ACCOUNT_ID") @@ -53,25 +68,55 @@ class ScheduledInsightsGrabber: ) # Initialize Facebook Ads API - FacebookAdsApi.init( - app_id=self.app_id, - app_secret=self.app_secret, - access_token=self.access_token, - ) + self._init_api() self.ad_account = AdAccount(self.ad_account_id) # Database client self.db: Optional[TimescaleDBClient] = None - # Conservative rate limiting - self.request_delay = 2.0 # 2 seconds between API requests + # Rate limiter with backoff (Meta best practices) + self.rate_limiter = MetaRateLimiter( + base_delay=2.0, # 2 seconds base delay + throttle_threshold=75.0, # Start throttling at 75% usage + max_retry_delay=300.0, # Max 5 minutes backoff + max_retries=5, # Retry up to 5 times + ) + + def _init_api(self): + """Initialize or reinitialize Facebook Ads API with current token.""" + FacebookAdsApi.init( + app_id=self.app_id, + app_secret=self.app_secret, + access_token=self.access_token, + ) + + def refresh_token_if_needed(self): + """Check and refresh token if needed (for long-running processes).""" + if not self.token_manager: + return + + try: + new_token = self.token_manager.get_valid_token() + if new_token != self.access_token: + print("🔄 Token was refreshed, reinitializing API...") + self.access_token = new_token + self._init_api() + self.ad_account = AdAccount(self.ad_account_id) + except Exception as e: + print(f"⚠️ Token refresh check failed: {e}") async def _rate_limited_request(self, func, *args, **kwargs): - """Execute a request with rate limiting.""" - await asyncio.sleep(self.request_delay) - loop = asyncio.get_event_loop() - return await loop.run_in_executor(None, lambda: func(*args, **kwargs)) + """ + Execute a request with intelligent rate limiting and backoff. + + This method: + - Monitors x-fb-ads-insights-throttle header + - Auto-throttles when usage > 75% + - Implements exponential backoff on rate limit errors + - Retries automatically with progressive delays + """ + return await self.rate_limiter.execute_with_retry(func, *args, **kwargs) async def cache_account_metadata(self): """ @@ -342,6 +387,9 @@ class ScheduledInsightsGrabber: await self.grab_campaign_insights(date_preset="today", limit=50) await self.grab_adset_insights(date_preset="today", limit=50) + # Print rate limiter statistics + self.rate_limiter.print_stats() + print("\n" + "="*60) print("COLLECTION CYCLE COMPLETE") print("="*60 + "\n") @@ -376,6 +424,9 @@ class ScheduledInsightsGrabber: while True: cycle_count += 1 + # Check and refresh token if needed (before each cycle) + self.refresh_token_if_needed() + # Refresh metadata periodically (e.g., once per day) cache_metadata = (cycle_count % refresh_metadata_every_n_cycles == 1) diff --git a/src/meta_api_grabber/token_manager.py b/src/meta_api_grabber/token_manager.py new file mode 100644 index 0000000..8b7c182 --- /dev/null +++ b/src/meta_api_grabber/token_manager.py @@ -0,0 +1,307 @@ +""" +Token manager for automatic refresh of Meta access tokens. + +Handles: +- Loading token metadata +- Checking token validity and expiry +- Automatic refresh before expiry +- Persistence of new tokens +""" + +import json +import os +import time +from datetime import datetime, timedelta +from pathlib import Path +from typing import Optional + +from dotenv import load_dotenv + +from .auth import MetaOAuth2 + + +class MetaTokenManager: + """ + Manages Meta access tokens with automatic refresh. + + Features: + - Loads token from .env and metadata from .meta_token.json + - Checks if token is expired or about to expire + - Automatically refreshes tokens before expiry + - Persists refreshed tokens + """ + + def __init__( + self, + token_file: str = ".meta_token.json", + refresh_before_days: int = 7, + ): + """ + Initialize token manager. + + Args: + token_file: Path to token metadata JSON file + refresh_before_days: Refresh token this many days before expiry + """ + load_dotenv() + + self.token_file = Path(token_file) + self.refresh_before_days = refresh_before_days + self.oauth = MetaOAuth2() + + self._current_token: Optional[str] = None + self._token_metadata: Optional[dict] = None + + def load_token(self) -> Optional[str]: + """ + Load access token from environment. + + Returns: + Access token or None + """ + return os.getenv("META_ACCESS_TOKEN") + + def load_metadata(self) -> Optional[dict]: + """ + Load token metadata from JSON file. + + Returns: + Token metadata dict or None + """ + if not self.token_file.exists(): + return None + + try: + return json.loads(self.token_file.read_text()) + except Exception as e: + print(f"Warning: Could not load token metadata: {e}") + return None + + def is_token_expired(self, metadata: dict) -> bool: + """ + Check if token is expired. + + Args: + metadata: Token metadata dictionary + + Returns: + True if expired + """ + expires_at = metadata.get("expires_at", 0) + if not expires_at: + return False + + return time.time() >= expires_at + + def should_refresh(self, metadata: dict) -> bool: + """ + Check if token should be refreshed. + + Args: + metadata: Token metadata dictionary + + Returns: + True if token should be refreshed + """ + expires_at = metadata.get("expires_at", 0) + if not expires_at: + return False + + # Refresh if expiring within refresh_before_days + threshold = time.time() + (self.refresh_before_days * 86400) + return expires_at <= threshold + + def refresh_token(self, current_token: str) -> str: + """ + Refresh token by exchanging for a new long-lived token. + + Args: + current_token: Current access token + + Returns: + New access token + """ + print("\n🔄 Refreshing access token...") + + # Exchange current token for new long-lived token + token_data = self.oauth.exchange_for_long_lived_token(current_token) + new_token = token_data["access_token"] + expires_in = token_data["expires_in"] + + print(f"✅ Token refreshed! Valid for {expires_in / 86400:.0f} days") + + # Get token info + try: + token_info = self.oauth.get_token_info(new_token) + expires_at = token_info.get("expires_at", int(time.time()) + expires_in) + is_valid = token_info.get("is_valid", True) + except Exception: + expires_at = int(time.time()) + expires_in + is_valid = True + + # Save new token + self._save_token(new_token, expires_at, is_valid) + + return new_token + + def _save_token(self, access_token: str, expires_at: int, is_valid: bool): + """ + Save token to .env and metadata to JSON. + + Args: + access_token: New access token + expires_at: Expiry timestamp + is_valid: Whether token is valid + """ + # Update .env + env_path = Path(".env") + if env_path.exists(): + env_content = env_path.read_text() + lines = env_content.split("\n") + + updated = False + for i, line in enumerate(lines): + if line.startswith("META_ACCESS_TOKEN="): + lines[i] = f"META_ACCESS_TOKEN={access_token}" + updated = True + break + + if not updated: + lines.append(f"META_ACCESS_TOKEN={access_token}") + + env_path.write_text("\n".join(lines)) + + # Update metadata JSON + metadata = { + "access_token": access_token, + "expires_at": expires_at, + "issued_at": int(time.time()), + "is_valid": is_valid, + "updated_at": int(time.time()), + } + + self.token_file.write_text(json.dumps(metadata, indent=2)) + + # Reload environment + load_dotenv(override=True) + + def get_valid_token(self) -> str: + """ + Get a valid access token, refreshing if necessary. + + Returns: + Valid access token + + Raises: + ValueError: If no token available or refresh fails + """ + # Load token and metadata + token = self.load_token() + metadata = self.load_metadata() + + if not token: + raise ValueError( + "No access token found. Run 'uv run python src/meta_api_grabber/auth.py' to authenticate." + ) + + # If no metadata, assume token is valid (but warn) + if not metadata: + print("⚠️ Warning: No token metadata found. Cannot check expiry.") + print(" Run 'uv run python src/meta_api_grabber/auth.py' to re-authenticate and save metadata.") + return token + + # Check if expired + if self.is_token_expired(metadata): + print("❌ Token expired! Attempting to refresh...") + try: + return self.refresh_token(token) + except Exception as e: + raise ValueError( + f"Token expired and refresh failed: {e}\n" + "Please re-authenticate: uv run python src/meta_api_grabber/auth.py" + ) + + # Check if should refresh (within threshold) + if self.should_refresh(metadata): + expires_at = metadata.get("expires_at", 0) + expires_dt = datetime.fromtimestamp(expires_at) + days_left = (expires_dt - datetime.now()).days + + print(f"\n⚠️ Token expiring in {days_left} days ({expires_dt.strftime('%Y-%m-%d')})") + print(f" Refreshing token to extend validity...") + + try: + return self.refresh_token(token) + except Exception as e: + print(f"⚠️ Token refresh failed: {e}") + print(f" Continuing with current token ({days_left} days remaining)") + return token + + # Token is valid + expires_at = metadata.get("expires_at", 0) + if expires_at: + expires_dt = datetime.fromtimestamp(expires_at) + days_left = (expires_dt - datetime.now()).days + print(f"✅ Token valid ({days_left} days remaining)") + + return token + + def print_token_status(self): + """Print current token status.""" + token = self.load_token() + metadata = self.load_metadata() + + print("\n" + "="*60) + print("TOKEN STATUS") + print("="*60) + + if not token: + print("❌ No access token found in .env") + print("\nRun: uv run python src/meta_api_grabber/auth.py") + return + + print("✅ Access token found") + + if not metadata: + print("⚠️ No metadata file (.meta_token.json)") + print(" Cannot determine expiry. Re-authenticate to save metadata.") + return + + expires_at = metadata.get("expires_at", 0) + is_valid = metadata.get("is_valid", False) + + if expires_at: + expires_dt = datetime.fromtimestamp(expires_at) + days_left = (expires_dt - datetime.now()).days + + print(f"\nExpires: {expires_dt.strftime('%Y-%m-%d %H:%M:%S')}") + print(f"Days remaining: {days_left}") + + if days_left < 0: + print("Status: ❌ EXPIRED") + elif days_left <= self.refresh_before_days: + print(f"Status: ⚠️ EXPIRING SOON (will auto-refresh)") + else: + print("Status: ✅ VALID") + + print(f"Is valid: {'✅' if is_valid else '❌'}") + print("="*60 + "\n") + + +def main(): + """Check token status and refresh if needed.""" + manager = MetaTokenManager() + manager.print_token_status() + + try: + token = manager.get_valid_token() + print(f"\n✅ Valid token ready for use") + except ValueError as e: + print(f"\n❌ Error: {e}") + return 1 + + return 0 + + +if __name__ == "__main__": + exit(main())