Ratelimiting and timescaledb changes

This commit is contained in:
Jonas Linter
2025-10-21 13:28:29 +02:00
parent 6ba8a0dba2
commit 3704390448
8 changed files with 897 additions and 52 deletions

23
.gitignore vendored
View File

@@ -1 +1,24 @@
# Environment and secrets
.env
.meta_token.json
# Data output
data/
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
# Virtual environments
venv/
env/
ENV/
# IDE
.vscode/
.idea/
*.swp
*.swo

View File

@@ -41,17 +41,24 @@ Edit `.env` and add:
- **META_AD_ACCOUNT_ID** from Meta Ads Manager (format: `act_1234567890`)
- **DATABASE_URL** is pre-configured for local Docker setup
### 4. Get Access Token
### 4. Get Long-Lived Access Token
**Option A: OAuth2 Flow (Recommended)**
**OAuth2 Flow (Recommended - Gets 60-day token)**
```bash
uv run python src/meta_api_grabber/auth.py
```
Follow the prompts to authorize and save your token.
**Option B: Manual Token**
This will:
1. Open OAuth2 authorization in your browser
2. Exchange the code for a short-lived token
3. **Automatically exchange for a long-lived token (60 days)**
4. Save token to `.env`
5. Save token metadata to `.meta_token.json` (for auto-refresh)
**Manual Token (Not Recommended)**
- Get a token from [Graph API Explorer](https://developers.facebook.com/tools/explorer/)
- Add it to `.env` as `META_ACCESS_TOKEN`
- Note: Manual tokens won't have auto-refresh capability
### 5. Start Scheduled Collection
```bash
@@ -59,6 +66,7 @@ uv run python src/meta_api_grabber/scheduled_grabber.py
```
This will:
- **Automatically refresh tokens** before they expire (checks every cycle)
- Collect data every 2 hours using the `today` date preset (recommended by Meta)
- Cache metadata (accounts, campaigns, ad sets) twice daily
- Store time-series data in TimescaleDB
@@ -87,8 +95,15 @@ uv run python src/meta_api_grabber/insights_grabber.py
```bash
uv run python src/meta_api_grabber/auth.py
```
- Interactive flow to get access token
- Saves token to `.env` automatically
- Interactive flow to get long-lived token (60 days)
- Saves token to `.env` and metadata to `.meta_token.json`
### 4. Check Token Status
```bash
uv run python src/meta_api_grabber/token_manager.py
```
- Shows token expiry and validity
- Manually refresh if needed
## Data Collected
@@ -164,12 +179,69 @@ WHERE account_id = 'act_your_id'
ORDER BY bucket;
```
## Rate Limiting
## Rate Limiting & Backoff
The system is configured to be very conservative:
- **2 seconds delay** between API requests
- **Only 1 concurrent request** at a time
The system implements Meta's best practices for rate limiting:
### Intelligent Rate Limiting
- **Monitors `x-fb-ads-insights-throttle` header** from every API response
- Tracks both app-level and account-level usage percentages
- **Auto-throttles** when usage exceeds 75%
- **Progressive delays** based on usage (75%: 2x, 85%: 3x, 90%: 5x, 95%: 10x)
### Exponential Backoff
- **Automatic retries** on rate limit errors (up to 5 attempts)
- **Exponential backoff**: 2s → 4s → 8s → 16s → 32s
- Max backoff: 5 minutes
- Recognizes Meta error codes 17 and 80004
### Conservative Defaults
- **2 seconds base delay** between API requests
- **1 concurrent request** at a time
- **Top 50 campaigns/adsets** per collection
- **2 hour intervals** between collections
- **2 hour intervals** between scheduled collections
This ensures you stay well within Meta's API rate limits.
### Best Practices Applied
Based on [Meta's official recommendations](https://developers.facebook.com/docs/marketing-api/insights/best-practices/):
- ✅ Monitor rate limit headers
- ✅ Pace queries with wait times
- ✅ Implement backoff when approaching limits
- ✅ Use date presets (e.g., 'today') instead of custom ranges
- ✅ Limit query scope and metrics
## Token Management
### Automatic Token Refresh
The system automatically manages token lifecycle:
**Token Types:**
- **Short-lived tokens**: Valid for 1-2 hours (obtained from OAuth)
- **Long-lived tokens**: Valid for 60 days (automatically exchanged)
**Auto-Refresh Logic:**
1. OAuth flow automatically exchanges for 60-day token
2. Token metadata saved to `.meta_token.json` (includes expiry)
3. Scheduled grabber checks token before each cycle
4. Auto-refreshes when < 7 days remaining
5. New token saved and API reinitialized seamlessly
**Files Created:**
- `.env` - Contains `META_ACCESS_TOKEN` (updated on refresh)
- `.meta_token.json` - Token metadata (expiry, issued_at, etc.)
- Both files are gitignored for security
**Manual Token Operations:**
Check token status:
```bash
uv run python src/meta_api_grabber/token_manager.py
```
Re-authenticate (if token expires):
```bash
uv run python src/meta_api_grabber/auth.py
```
**Long-Running Collection:**
The scheduled grabber runs indefinitely without manual intervention. Token refresh happens automatically every ~53 days (7 days before the 60-day expiry).

View File

@@ -1,11 +1,21 @@
"""
OAuth2 authentication module for Meta/Facebook API.
Handles:
- Initial OAuth2 flow for short-lived tokens
- Exchange short-lived for long-lived tokens (60 days)
- Automatic token refresh
- Token persistence
"""
import json
import os
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, Optional
import requests
from dotenv import load_dotenv
from requests_oauthlib import OAuth2Session
from requests_oauthlib.compliance_fixes import facebook_compliance_fix
@@ -91,12 +101,65 @@ class MetaOAuth2:
return token
def interactive_auth(self) -> str:
def exchange_for_long_lived_token(self, short_lived_token: str) -> Dict[str, any]:
"""
Exchange short-lived token for long-lived token (60 days).
Args:
short_lived_token: The short-lived access token from OAuth flow
Returns:
Dictionary with access_token and expires_in
"""
url = "https://graph.facebook.com/oauth/access_token"
params = {
"grant_type": "fb_exchange_token",
"client_id": self.app_id,
"client_secret": self.app_secret,
"fb_exchange_token": short_lived_token,
}
response = requests.get(url, params=params)
response.raise_for_status()
data = response.json()
return {
"access_token": data["access_token"],
"expires_in": data.get("expires_in", 5184000), # Default 60 days
"token_type": data.get("token_type", "bearer"),
}
def get_token_info(self, access_token: str) -> Dict[str, any]:
"""
Get information about an access token including expiry.
Args:
access_token: Access token to inspect
Returns:
Dictionary with token info including expires_at, is_valid, etc.
"""
url = "https://graph.facebook.com/debug_token"
params = {
"input_token": access_token,
"access_token": f"{self.app_id}|{self.app_secret}",
}
response = requests.get(url, params=params)
response.raise_for_status()
data = response.json()
return data.get("data", {})
def interactive_auth(self, exchange_for_long_lived: bool = True) -> str:
"""
Run interactive OAuth2 flow to get access token.
Args:
exchange_for_long_lived: If True, exchanges short-lived for long-lived token (60 days)
Returns:
Access token string
Access token string (long-lived if exchange_for_long_lived=True)
"""
print("\n" + "="*60)
print("META/FACEBOOK OAUTH2 AUTHENTICATION")
@@ -115,34 +178,55 @@ class MetaOAuth2:
# Wait for user to paste the redirect URL
redirect_response = input("\nPaste the full redirect URL here: ").strip()
# Fetch the token
print("\nFetching access token...")
# Fetch the short-lived token
print("\nFetching short-lived access token...")
token = self.fetch_token(redirect_response)
short_lived_token = token["access_token"]
# Exchange for long-lived token
if exchange_for_long_lived:
print("Exchanging for long-lived token (60 days)...")
long_lived_data = self.exchange_for_long_lived_token(short_lived_token)
access_token = long_lived_data["access_token"]
expires_in = long_lived_data["expires_in"]
print(f"\n✅ Long-lived token obtained!")
print(f" Valid for: {expires_in / 86400:.0f} days (~{expires_in / 3600:.0f} hours)")
else:
access_token = short_lived_token
print("\n✅ Short-lived token obtained (valid for ~1-2 hours)")
access_token = token["access_token"]
print("\n" + "="*60)
print("SUCCESS! Access token obtained.")
print("="*60)
# Optionally save to .env
self._offer_to_save_token(access_token)
# Save token with metadata
self._offer_to_save_token_with_metadata(access_token)
return access_token
def _offer_to_save_token(self, access_token: str):
"""Offer to save the access token to .env file."""
save = input("\nWould you like to save this token to .env? (y/n): ").strip().lower()
def _offer_to_save_token_with_metadata(self, access_token: str):
"""Offer to save the access token with metadata to both .env and JSON file."""
save = input("\nWould you like to save this token? (y/n): ").strip().lower()
if save == "y":
env_path = Path(".env")
# Get token info
try:
token_info = self.get_token_info(access_token)
expires_at = token_info.get("expires_at", 0)
is_valid = token_info.get("is_valid", False)
issued_at = token_info.get("issued_at", int(time.time()))
except Exception as e:
print(f"Warning: Could not get token info: {e}")
expires_at = int(time.time()) + 5184000 # Assume 60 days
is_valid = True
issued_at = int(time.time())
# Read existing .env or create new
# Save to .env
env_path = Path(".env")
if env_path.exists():
env_content = env_path.read_text()
else:
env_content = ""
# Update or add META_ACCESS_TOKEN
lines = env_content.split("\n")
token_line = f"META_ACCESS_TOKEN={access_token}"
updated = False
@@ -157,7 +241,27 @@ class MetaOAuth2:
lines.append(token_line)
env_path.write_text("\n".join(lines))
print(f"\nToken saved to {env_path}")
print(f"\nToken saved to {env_path}")
# Save metadata to JSON for token refresh logic
token_metadata = {
"access_token": access_token,
"expires_at": expires_at,
"issued_at": issued_at,
"is_valid": is_valid,
"updated_at": int(time.time()),
}
token_file = Path(".meta_token.json")
token_file.write_text(json.dumps(token_metadata, indent=2))
print(f"✅ Token metadata saved to {token_file}")
# Print expiry info
if expires_at:
expires_dt = datetime.fromtimestamp(expires_at)
days_until_expiry = (expires_dt - datetime.now()).days
print(f"\n📅 Token expires: {expires_dt.strftime('%Y-%m-%d %H:%M:%S')}")
print(f" ({days_until_expiry} days from now)")
else:
print(f"\nAccess token: {access_token}")
print("You can manually add it to .env as META_ACCESS_TOKEN")

View File

@@ -15,9 +15,19 @@ from facebook_business.adobjects.adaccount import AdAccount
from facebook_business.adobjects.adsinsights import AdsInsights
from facebook_business.api import FacebookAdsApi
from .rate_limiter import MetaRateLimiter
class MetaInsightsGrabber:
"""Async grabber for Meta ad insights with conservative rate limiting."""
"""
Async grabber for Meta ad insights with intelligent rate limiting.
Features:
- Monitors x-fb-ads-insights-throttle header
- Auto-throttles when approaching rate limits (>75%)
- Exponential backoff on rate limit errors
- Automatic retries with progressive delays
"""
def __init__(self, access_token: str = None):
"""
@@ -56,17 +66,22 @@ class MetaInsightsGrabber:
self.ad_account = AdAccount(self.ad_account_id)
# Conservative rate limiting settings
self.request_delay = 2.0 # 2 seconds between requests
self.max_concurrent_requests = 1 # Only 1 request at a time
# Rate limiter with backoff (Meta best practices)
self.rate_limiter = MetaRateLimiter(
base_delay=2.0,
throttle_threshold=75.0,
max_retry_delay=300.0,
max_retries=5,
)
async def _rate_limited_request(self, func, *args, **kwargs):
"""Execute a request with rate limiting."""
await asyncio.sleep(self.request_delay)
# Run the blocking SDK call in a thread pool
# Note: run_in_executor doesn't accept kwargs, so we use a lambda
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, lambda: func(*args, **kwargs))
"""
Execute a request with intelligent rate limiting and backoff.
Monitors x-fb-ads-insights-throttle header and auto-throttles
when usage exceeds 75%. Implements exponential backoff on errors.
"""
return await self.rate_limiter.execute_with_retry(func, *args, **kwargs)
async def get_account_insights(
self,

View File

@@ -0,0 +1,273 @@
"""
Rate limiting and backoff mechanism for Meta Marketing API.
Based on Meta's best practices:
https://developers.facebook.com/docs/marketing-api/insights/best-practices/
"""
import asyncio
import time
from typing import Any, Callable, Dict, Optional
from facebook_business.api import FacebookAdsApi
class MetaRateLimiter:
"""
Rate limiter with exponential backoff for Meta Marketing API.
Features:
- Monitors x-fb-ads-insights-throttle header
- Automatic throttling when usage > 75%
- Exponential backoff on rate limit errors
- Configurable thresholds
"""
def __init__(
self,
base_delay: float = 2.0,
throttle_threshold: float = 75.0,
max_retry_delay: float = 300.0, # 5 minutes
max_retries: int = 5,
):
"""
Initialize rate limiter.
Args:
base_delay: Base delay between requests in seconds
throttle_threshold: Throttle when usage exceeds this % (0-100)
max_retry_delay: Maximum delay for exponential backoff
max_retries: Maximum number of retries on rate limit errors
"""
self.base_delay = base_delay
self.throttle_threshold = throttle_threshold
self.max_retry_delay = max_retry_delay
self.max_retries = max_retries
# Track current usage percentages
self.app_usage_pct: float = 0.0
self.account_usage_pct: float = 0.0
self.last_check_time: float = time.time()
# Stats
self.total_requests: int = 0
self.throttled_requests: int = 0
self.rate_limit_errors: int = 0
def parse_throttle_header(self, response: Any) -> Dict[str, float]:
"""
Parse x-fb-ads-insights-throttle header from response.
Header format: {"app_id_util_pct": 25.5, "acc_id_util_pct": 10.0}
Args:
response: API response object
Returns:
Dictionary with app_id_util_pct and acc_id_util_pct
"""
try:
# Try to get the header from different response types
headers = None
# Facebook SDK response object
if hasattr(response, '_headers'):
headers = response._headers
elif hasattr(response, 'headers'):
headers = response.headers
elif hasattr(response, '_api_response'):
headers = getattr(response._api_response, 'headers', None)
if headers:
throttle_header = headers.get('x-fb-ads-insights-throttle', '')
if throttle_header:
import json
throttle_data = json.loads(throttle_header)
return {
'app_id_util_pct': float(throttle_data.get('app_id_util_pct', 0)),
'acc_id_util_pct': float(throttle_data.get('acc_id_util_pct', 0)),
}
except Exception as e:
# Silently fail - we'll use conservative defaults
pass
return {'app_id_util_pct': 0.0, 'acc_id_util_pct': 0.0}
def update_usage(self, response: Any):
"""
Update usage statistics from API response.
Args:
response: API response object
"""
throttle_info = self.parse_throttle_header(response)
self.app_usage_pct = throttle_info['app_id_util_pct']
self.account_usage_pct = throttle_info['acc_id_util_pct']
self.last_check_time = time.time()
# Log if we're approaching limits
max_usage = max(self.app_usage_pct, self.account_usage_pct)
if max_usage > self.throttle_threshold:
print(f"\n⚠️ Rate limit warning: {max_usage:.1f}% usage")
print(f" App: {self.app_usage_pct:.1f}%, Account: {self.account_usage_pct:.1f}%")
def should_throttle(self) -> bool:
"""
Check if we should throttle based on current usage.
Returns:
True if usage exceeds threshold
"""
max_usage = max(self.app_usage_pct, self.account_usage_pct)
return max_usage > self.throttle_threshold
def get_throttle_delay(self) -> float:
"""
Calculate delay based on current usage.
Returns:
Delay in seconds
"""
max_usage = max(self.app_usage_pct, self.account_usage_pct)
if max_usage < self.throttle_threshold:
return self.base_delay
# Progressive delay based on usage
# 75% = base_delay, 90% = 2x, 95% = 5x, 99% = 10x
if max_usage >= 95:
multiplier = 10.0
elif max_usage >= 90:
multiplier = 5.0
elif max_usage >= 85:
multiplier = 3.0
else: # 75-85%
multiplier = 2.0
delay = self.base_delay * multiplier
return min(delay, self.max_retry_delay)
async def wait_with_backoff(self, retry_count: int = 0):
"""
Wait with exponential backoff.
Args:
retry_count: Current retry attempt (0-indexed)
"""
if retry_count == 0:
# Normal delay based on throttle
delay = self.get_throttle_delay()
else:
# Exponential backoff: 2^retry * base_delay
delay = min(
(2 ** retry_count) * self.base_delay,
self.max_retry_delay
)
if delay > self.base_delay:
self.throttled_requests += 1
print(f"⏸️ Throttling for {delay:.1f}s (usage: {max(self.app_usage_pct, self.account_usage_pct):.1f}%)")
await asyncio.sleep(delay)
async def execute_with_retry(
self,
func: Callable,
*args,
**kwargs
) -> Any:
"""
Execute API call with automatic retry and backoff.
Args:
func: Function to execute (blocking, will be run in executor)
*args: Positional arguments for func
**kwargs: Keyword arguments for func
Returns:
Result from func
Raises:
Exception: If all retries exhausted
"""
self.total_requests += 1
for retry in range(self.max_retries):
try:
# Wait before request (with potential throttling)
await self.wait_with_backoff(retry_count=retry if retry > 0 else 0)
# Execute request in thread pool
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, lambda: func(*args, **kwargs))
# Update usage from response
self.update_usage(result)
return result
except Exception as e:
error_message = str(e).lower()
# Check if it's a rate limit error
is_rate_limit = (
'rate limit' in error_message or
'too many requests' in error_message or
'throttle' in error_message or
'error code 17' in error_message or # Meta's rate limit error code
'error code 80004' in error_message # Insights rate limit
)
if is_rate_limit:
self.rate_limit_errors += 1
if retry < self.max_retries - 1:
backoff_delay = min(
(2 ** (retry + 1)) * self.base_delay,
self.max_retry_delay
)
print(f"\n🔄 Rate limit hit! Retrying in {backoff_delay:.1f}s (attempt {retry + 1}/{self.max_retries})")
print(f" Error: {e}")
await asyncio.sleep(backoff_delay)
continue
else:
print(f"\n❌ Rate limit error - max retries exhausted")
raise
# Not a rate limit error, re-raise immediately
raise
# Should never reach here
raise Exception("Max retries exhausted")
def get_stats(self) -> Dict[str, Any]:
"""
Get current rate limiter statistics.
Returns:
Dictionary with stats
"""
return {
'total_requests': self.total_requests,
'throttled_requests': self.throttled_requests,
'rate_limit_errors': self.rate_limit_errors,
'app_usage_pct': self.app_usage_pct,
'account_usage_pct': self.account_usage_pct,
'max_usage_pct': max(self.app_usage_pct, self.account_usage_pct),
'is_throttling': self.should_throttle(),
}
def print_stats(self):
"""Print current statistics."""
stats = self.get_stats()
print("\n" + "="*60)
print("RATE LIMITER STATISTICS")
print("="*60)
print(f"Total Requests: {stats['total_requests']}")
print(f"Throttled Requests: {stats['throttled_requests']}")
print(f"Rate Limit Errors: {stats['rate_limit_errors']}")
print(f"App Usage: {stats['app_usage_pct']:.1f}%")
print(f"Account Usage: {stats['account_usage_pct']:.1f}%")
print(f"Max Usage: {stats['max_usage_pct']:.1f}%")
print(f"Currently Throttled: {stats['is_throttling']}")
print("="*60 + "\n")

View File

@@ -14,6 +14,8 @@ from facebook_business.adobjects.adsinsights import AdsInsights
from facebook_business.api import FacebookAdsApi
from .database import TimescaleDBClient
from .rate_limiter import MetaRateLimiter
from .token_manager import MetaTokenManager
class ScheduledInsightsGrabber:
@@ -24,19 +26,32 @@ class ScheduledInsightsGrabber:
- Uses 'today' date preset (recommended by Meta)
- Caches metadata (accounts, campaigns, adsets)
- Upserts time-series data
- Conservative rate limiting
- Intelligent rate limiting with exponential backoff
- Monitors x-fb-ads-insights-throttle header
- Auto-throttles when approaching limits
"""
def __init__(self, access_token: Optional[str] = None):
def __init__(self, access_token: Optional[str] = None, auto_refresh_token: bool = True):
"""
Initialize the scheduled grabber.
Args:
access_token: Optional access token. If not provided, loads from env.
auto_refresh_token: If True, automatically refreshes tokens before expiry
"""
load_dotenv()
self.access_token = access_token or os.getenv("META_ACCESS_TOKEN")
# Token manager for automatic refresh
self.token_manager = MetaTokenManager() if auto_refresh_token else None
# Get valid token (auto-refresh if needed)
if access_token:
self.access_token = access_token
elif self.token_manager:
self.access_token = self.token_manager.get_valid_token()
else:
self.access_token = os.getenv("META_ACCESS_TOKEN")
self.app_secret = os.getenv("META_APP_SECRET")
self.app_id = os.getenv("META_APP_ID")
self.ad_account_id = os.getenv("META_AD_ACCOUNT_ID")
@@ -53,25 +68,55 @@ class ScheduledInsightsGrabber:
)
# Initialize Facebook Ads API
FacebookAdsApi.init(
app_id=self.app_id,
app_secret=self.app_secret,
access_token=self.access_token,
)
self._init_api()
self.ad_account = AdAccount(self.ad_account_id)
# Database client
self.db: Optional[TimescaleDBClient] = None
# Conservative rate limiting
self.request_delay = 2.0 # 2 seconds between API requests
# Rate limiter with backoff (Meta best practices)
self.rate_limiter = MetaRateLimiter(
base_delay=2.0, # 2 seconds base delay
throttle_threshold=75.0, # Start throttling at 75% usage
max_retry_delay=300.0, # Max 5 minutes backoff
max_retries=5, # Retry up to 5 times
)
def _init_api(self):
"""Initialize or reinitialize Facebook Ads API with current token."""
FacebookAdsApi.init(
app_id=self.app_id,
app_secret=self.app_secret,
access_token=self.access_token,
)
def refresh_token_if_needed(self):
"""Check and refresh token if needed (for long-running processes)."""
if not self.token_manager:
return
try:
new_token = self.token_manager.get_valid_token()
if new_token != self.access_token:
print("🔄 Token was refreshed, reinitializing API...")
self.access_token = new_token
self._init_api()
self.ad_account = AdAccount(self.ad_account_id)
except Exception as e:
print(f"⚠️ Token refresh check failed: {e}")
async def _rate_limited_request(self, func, *args, **kwargs):
"""Execute a request with rate limiting."""
await asyncio.sleep(self.request_delay)
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, lambda: func(*args, **kwargs))
"""
Execute a request with intelligent rate limiting and backoff.
This method:
- Monitors x-fb-ads-insights-throttle header
- Auto-throttles when usage > 75%
- Implements exponential backoff on rate limit errors
- Retries automatically with progressive delays
"""
return await self.rate_limiter.execute_with_retry(func, *args, **kwargs)
async def cache_account_metadata(self):
"""
@@ -342,6 +387,9 @@ class ScheduledInsightsGrabber:
await self.grab_campaign_insights(date_preset="today", limit=50)
await self.grab_adset_insights(date_preset="today", limit=50)
# Print rate limiter statistics
self.rate_limiter.print_stats()
print("\n" + "="*60)
print("COLLECTION CYCLE COMPLETE")
print("="*60 + "\n")
@@ -376,6 +424,9 @@ class ScheduledInsightsGrabber:
while True:
cycle_count += 1
# Check and refresh token if needed (before each cycle)
self.refresh_token_if_needed()
# Refresh metadata periodically (e.g., once per day)
cache_metadata = (cycle_count % refresh_metadata_every_n_cycles == 1)

View File

@@ -0,0 +1,307 @@
"""
Token manager for automatic refresh of Meta access tokens.
Handles:
- Loading token metadata
- Checking token validity and expiry
- Automatic refresh before expiry
- Persistence of new tokens
"""
import json
import os
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
from dotenv import load_dotenv
from .auth import MetaOAuth2
class MetaTokenManager:
"""
Manages Meta access tokens with automatic refresh.
Features:
- Loads token from .env and metadata from .meta_token.json
- Checks if token is expired or about to expire
- Automatically refreshes tokens before expiry
- Persists refreshed tokens
"""
def __init__(
self,
token_file: str = ".meta_token.json",
refresh_before_days: int = 7,
):
"""
Initialize token manager.
Args:
token_file: Path to token metadata JSON file
refresh_before_days: Refresh token this many days before expiry
"""
load_dotenv()
self.token_file = Path(token_file)
self.refresh_before_days = refresh_before_days
self.oauth = MetaOAuth2()
self._current_token: Optional[str] = None
self._token_metadata: Optional[dict] = None
def load_token(self) -> Optional[str]:
"""
Load access token from environment.
Returns:
Access token or None
"""
return os.getenv("META_ACCESS_TOKEN")
def load_metadata(self) -> Optional[dict]:
"""
Load token metadata from JSON file.
Returns:
Token metadata dict or None
"""
if not self.token_file.exists():
return None
try:
return json.loads(self.token_file.read_text())
except Exception as e:
print(f"Warning: Could not load token metadata: {e}")
return None
def is_token_expired(self, metadata: dict) -> bool:
"""
Check if token is expired.
Args:
metadata: Token metadata dictionary
Returns:
True if expired
"""
expires_at = metadata.get("expires_at", 0)
if not expires_at:
return False
return time.time() >= expires_at
def should_refresh(self, metadata: dict) -> bool:
"""
Check if token should be refreshed.
Args:
metadata: Token metadata dictionary
Returns:
True if token should be refreshed
"""
expires_at = metadata.get("expires_at", 0)
if not expires_at:
return False
# Refresh if expiring within refresh_before_days
threshold = time.time() + (self.refresh_before_days * 86400)
return expires_at <= threshold
def refresh_token(self, current_token: str) -> str:
"""
Refresh token by exchanging for a new long-lived token.
Args:
current_token: Current access token
Returns:
New access token
"""
print("\n🔄 Refreshing access token...")
# Exchange current token for new long-lived token
token_data = self.oauth.exchange_for_long_lived_token(current_token)
new_token = token_data["access_token"]
expires_in = token_data["expires_in"]
print(f"✅ Token refreshed! Valid for {expires_in / 86400:.0f} days")
# Get token info
try:
token_info = self.oauth.get_token_info(new_token)
expires_at = token_info.get("expires_at", int(time.time()) + expires_in)
is_valid = token_info.get("is_valid", True)
except Exception:
expires_at = int(time.time()) + expires_in
is_valid = True
# Save new token
self._save_token(new_token, expires_at, is_valid)
return new_token
def _save_token(self, access_token: str, expires_at: int, is_valid: bool):
"""
Save token to .env and metadata to JSON.
Args:
access_token: New access token
expires_at: Expiry timestamp
is_valid: Whether token is valid
"""
# Update .env
env_path = Path(".env")
if env_path.exists():
env_content = env_path.read_text()
lines = env_content.split("\n")
updated = False
for i, line in enumerate(lines):
if line.startswith("META_ACCESS_TOKEN="):
lines[i] = f"META_ACCESS_TOKEN={access_token}"
updated = True
break
if not updated:
lines.append(f"META_ACCESS_TOKEN={access_token}")
env_path.write_text("\n".join(lines))
# Update metadata JSON
metadata = {
"access_token": access_token,
"expires_at": expires_at,
"issued_at": int(time.time()),
"is_valid": is_valid,
"updated_at": int(time.time()),
}
self.token_file.write_text(json.dumps(metadata, indent=2))
# Reload environment
load_dotenv(override=True)
def get_valid_token(self) -> str:
"""
Get a valid access token, refreshing if necessary.
Returns:
Valid access token
Raises:
ValueError: If no token available or refresh fails
"""
# Load token and metadata
token = self.load_token()
metadata = self.load_metadata()
if not token:
raise ValueError(
"No access token found. Run 'uv run python src/meta_api_grabber/auth.py' to authenticate."
)
# If no metadata, assume token is valid (but warn)
if not metadata:
print("⚠️ Warning: No token metadata found. Cannot check expiry.")
print(" Run 'uv run python src/meta_api_grabber/auth.py' to re-authenticate and save metadata.")
return token
# Check if expired
if self.is_token_expired(metadata):
print("❌ Token expired! Attempting to refresh...")
try:
return self.refresh_token(token)
except Exception as e:
raise ValueError(
f"Token expired and refresh failed: {e}\n"
"Please re-authenticate: uv run python src/meta_api_grabber/auth.py"
)
# Check if should refresh (within threshold)
if self.should_refresh(metadata):
expires_at = metadata.get("expires_at", 0)
expires_dt = datetime.fromtimestamp(expires_at)
days_left = (expires_dt - datetime.now()).days
print(f"\n⚠️ Token expiring in {days_left} days ({expires_dt.strftime('%Y-%m-%d')})")
print(f" Refreshing token to extend validity...")
try:
return self.refresh_token(token)
except Exception as e:
print(f"⚠️ Token refresh failed: {e}")
print(f" Continuing with current token ({days_left} days remaining)")
return token
# Token is valid
expires_at = metadata.get("expires_at", 0)
if expires_at:
expires_dt = datetime.fromtimestamp(expires_at)
days_left = (expires_dt - datetime.now()).days
print(f"✅ Token valid ({days_left} days remaining)")
return token
def print_token_status(self):
"""Print current token status."""
token = self.load_token()
metadata = self.load_metadata()
print("\n" + "="*60)
print("TOKEN STATUS")
print("="*60)
if not token:
print("❌ No access token found in .env")
print("\nRun: uv run python src/meta_api_grabber/auth.py")
return
print("✅ Access token found")
if not metadata:
print("⚠️ No metadata file (.meta_token.json)")
print(" Cannot determine expiry. Re-authenticate to save metadata.")
return
expires_at = metadata.get("expires_at", 0)
is_valid = metadata.get("is_valid", False)
if expires_at:
expires_dt = datetime.fromtimestamp(expires_at)
days_left = (expires_dt - datetime.now()).days
print(f"\nExpires: {expires_dt.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Days remaining: {days_left}")
if days_left < 0:
print("Status: ❌ EXPIRED")
elif days_left <= self.refresh_before_days:
print(f"Status: ⚠️ EXPIRING SOON (will auto-refresh)")
else:
print("Status: ✅ VALID")
print(f"Is valid: {'' if is_valid else ''}")
print("="*60 + "\n")
def main():
"""Check token status and refresh if needed."""
manager = MetaTokenManager()
manager.print_token_status()
try:
token = manager.get_valid_token()
print(f"\n✅ Valid token ready for use")
except ValueError as e:
print(f"\n❌ Error: {e}")
return 1
return 0
if __name__ == "__main__":
exit(main())