works but need to do more research on how the token should be exactly handled

This commit is contained in:
Jonas Linter
2025-10-21 14:00:14 +02:00
parent b6585b280f
commit 3fe923f5a7
3 changed files with 307 additions and 101 deletions

View File

@@ -0,0 +1,140 @@
"""
Discover all ad accounts accessible via different methods:
1. User ad accounts (personal access)
2. Business Manager ad accounts (app-level access)
"""
import asyncio
import os
from dotenv import load_dotenv
from facebook_business.adobjects.user import User
from facebook_business.adobjects.business import Business
from facebook_business.api import FacebookAdsApi
def discover_accounts():
"""Discover ad accounts through multiple methods."""
load_dotenv()
access_token = os.getenv("META_ACCESS_TOKEN")
app_secret = os.getenv("META_APP_SECRET")
app_id = os.getenv("META_APP_ID")
if not all([access_token, app_secret, app_id]):
print("❌ Missing required environment variables")
return 1
# Initialize Facebook Ads API
FacebookAdsApi.init(
app_id=app_id,
app_secret=app_secret,
access_token=access_token,
)
print("="*70)
print("AD ACCOUNT DISCOVERY")
print("="*70)
print()
# Method 1: User ad accounts (what we currently use)
print("METHOD 1: User Ad Accounts (Personal Access)")
print("-"*70)
try:
me = User(fbid='me')
account_fields = ['name', 'account_id']
user_accounts = me.get_ad_accounts(fields=account_fields)
user_account_list = list(user_accounts)
print(f"Found {len(user_account_list)} user ad account(s):")
for acc in user_account_list:
print(f" - {acc.get('name')} ({acc.get('id')})")
print()
except Exception as e:
print(f"❌ Error getting user accounts: {e}\n")
user_account_list = []
# Method 2: Business Manager accounts
print("METHOD 2: Business Manager Ad Accounts (App-Level Access)")
print("-"*70)
try:
me = User(fbid='me')
# Get businesses this user/app has access to
businesses = me.get_businesses(fields=['id', 'name'])
business_list = list(businesses)
if not business_list:
print("No Business Managers found.")
print()
print(" To access ad accounts at the app level, you need to:")
print(" 1. Have a Meta Business Manager")
print(" 2. Add your app to the Business Manager")
print(" 3. Grant the app access to ad accounts")
print()
else:
print(f"Found {len(business_list)} Business Manager(s):")
all_business_accounts = []
for biz in business_list:
biz_id = biz.get('id')
biz_name = biz.get('name')
print(f"\n Business: {biz_name} ({biz_id})")
try:
business = Business(fbid=biz_id)
# Get all ad accounts owned/managed by this business
biz_accounts = business.get_owned_ad_accounts(
fields=['id', 'name', 'account_status']
)
biz_account_list = list(biz_accounts)
print(f" Ad Accounts: {len(biz_account_list)}")
for acc in biz_account_list:
print(f" - {acc.get('name')} ({acc.get('id')})")
all_business_accounts.append(acc)
except Exception as e:
print(f" ❌ Error accessing business accounts: {e}")
print()
print(f"Total Business Manager ad accounts: {len(all_business_accounts)}")
except Exception as e:
print(f"❌ Error getting businesses: {e}")
print()
# Method 3: App-level access token (for reference)
print()
print("METHOD 3: App Access Token (NOT recommended for ad accounts)")
print("-"*70)
print("App access tokens can be generated with:")
print(f" curl 'https://graph.facebook.com/oauth/access_token")
print(f" ?client_id={app_id}")
print(f" &client_secret=YOUR_SECRET")
print(f" &grant_type=client_credentials'")
print()
print("⚠️ However, app access tokens CANNOT access ad accounts directly.")
print(" The Marketing API requires user-level permissions for privacy/security.")
print()
# Summary
print("="*70)
print("SUMMARY")
print("="*70)
print(f"User Ad Accounts: {len(user_account_list)}")
print()
print("💡 RECOMMENDATION:")
print(" The current implementation using User.get_ad_accounts() is correct.")
print(" To access MORE ad accounts, you have two options:")
print()
print(" Option 1: Use a System User token from Business Manager")
print(" (grants access to all Business Manager ad accounts)")
print()
print(" Option 2: Have other users authorize your app")
print(" (each user's token will see their ad accounts)")
print()
return 0
if __name__ == "__main__":
exit(discover_accounts())

View File

@@ -5,12 +5,13 @@ Runs periodically to build time-series data for dashboards.
import asyncio
import os
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from typing import Optional
from dotenv import load_dotenv
from facebook_business.adobjects.adaccount import AdAccount
from facebook_business.adobjects.adsinsights import AdsInsights
from facebook_business.adobjects.user import User
from facebook_business.api import FacebookAdsApi
from .database import TimescaleDBClient
@@ -31,13 +32,14 @@ class ScheduledInsightsGrabber:
- Auto-throttles when approaching limits
"""
def __init__(self, access_token: Optional[str] = None, auto_refresh_token: bool = True):
def __init__(self, access_token: Optional[str] = None, auto_refresh_token: bool = True, max_accounts: Optional[int] = None):
"""
Initialize the scheduled grabber.
Args:
access_token: Optional access token. If not provided, loads from env.
auto_refresh_token: If True, automatically refreshes tokens before expiry
max_accounts: Maximum number of ad accounts to process. None = all accounts, or set to a limit (e.g., 3)
"""
load_dotenv()
@@ -54,7 +56,7 @@ class ScheduledInsightsGrabber:
self.app_secret = os.getenv("META_APP_SECRET")
self.app_id = os.getenv("META_APP_ID")
self.ad_account_id = os.getenv("META_AD_ACCOUNT_ID")
self.max_accounts = max_accounts
if not self.access_token:
raise ValueError(
@@ -62,15 +64,16 @@ class ScheduledInsightsGrabber:
"run 'uv run python src/meta_api_grabber/auth.py'"
)
if not all([self.app_secret, self.app_id, self.ad_account_id]):
if not all([self.app_secret, self.app_id]):
raise ValueError(
"Missing required environment variables (META_APP_SECRET, META_APP_ID, META_AD_ACCOUNT_ID)"
"Missing required environment variables (META_APP_SECRET, META_APP_ID)"
)
# Initialize Facebook Ads API
self._init_api()
self.ad_account = AdAccount(self.ad_account_id)
# Will store list of ad account IDs
self.ad_account_ids = []
# Database client
self.db: Optional[TimescaleDBClient] = None
@@ -102,10 +105,37 @@ class ScheduledInsightsGrabber:
print("🔄 Token was refreshed, reinitializing API...")
self.access_token = new_token
self._init_api()
self.ad_account = AdAccount(self.ad_account_id)
except Exception as e:
print(f"⚠️ Token refresh check failed: {e}")
async def load_ad_accounts(self):
"""
Load all accessible ad accounts from Meta API.
Respects max_accounts limit if set.
"""
print("Loading accessible ad accounts...")
me = User(fbid='me')
account_fields = ['name']
ad_accounts = await self._rate_limited_request(
me.get_ad_accounts,
fields=account_fields
)
# Convert to list and apply limit
all_accounts = list(ad_accounts)
if self.max_accounts:
all_accounts = all_accounts[:self.max_accounts]
print(f" Limiting to first {self.max_accounts} account(s)")
self.ad_account_ids = [acc['id'] for acc in all_accounts]
print(f" Loaded {len(self.ad_account_ids)} ad account(s):")
for acc_id in self.ad_account_ids:
print(f" - {acc_id}")
return self.ad_account_ids
async def _rate_limited_request(self, func, *args, **kwargs):
"""
Execute a request with intelligent rate limiting and backoff.
@@ -118,45 +148,47 @@ class ScheduledInsightsGrabber:
"""
return await self.rate_limiter.execute_with_retry(func, *args, **kwargs)
async def cache_account_metadata(self):
async def cache_account_metadata(self, account_id: str):
"""
Cache ad account metadata to database.
Args:
account_id: Ad account ID to cache
This reduces API calls by storing rarely-changing account info.
"""
print("Caching account metadata...")
# Get account details
ad_account = AdAccount(account_id)
account_fields = ['name', 'currency', 'timezone_name']
account_data = await self._rate_limited_request(
self.ad_account.api_get,
ad_account.api_get,
fields=account_fields
)
# Store in database
await self.db.upsert_ad_account(
account_id=self.ad_account_id,
account_id=account_id,
account_name=account_data.get('name'),
currency=account_data.get('currency'),
timezone_name=account_data.get('timezone_name'),
)
print(f" Account cached: {account_data.get('name')}")
print(f" Account cached: {account_data.get('name')} ({account_id})")
async def cache_campaigns_metadata(self, limit: int = 100):
async def cache_campaigns_metadata(self, account_id: str, limit: int = 100):
"""
Cache campaign metadata to database.
Args:
account_id: Ad account ID
limit: Maximum number of campaigns to cache
"""
print("Caching campaign metadata...")
# Get campaigns
from facebook_business.adobjects.campaign import Campaign
ad_account = AdAccount(account_id)
campaigns = await self._rate_limited_request(
self.ad_account.get_campaigns,
ad_account.get_campaigns,
fields=[
Campaign.Field.name,
Campaign.Field.status,
@@ -169,29 +201,29 @@ class ScheduledInsightsGrabber:
for campaign in campaigns:
await self.db.upsert_campaign(
campaign_id=campaign['id'],
account_id=self.ad_account_id,
account_id=account_id,
campaign_name=campaign.get('name', 'Unknown'),
status=campaign.get('status'),
objective=campaign.get('objective'),
)
count += 1
print(f" {count} campaigns cached")
print(f" {count} campaigns cached for {account_id}")
async def cache_adsets_metadata(self, limit: int = 100):
async def cache_adsets_metadata(self, account_id: str, limit: int = 100):
"""
Cache ad set metadata to database.
Args:
account_id: Ad account ID
limit: Maximum number of ad sets to cache
"""
print("Caching ad set metadata...")
# Get ad sets
from facebook_business.adobjects.adset import AdSet
ad_account = AdAccount(account_id)
adsets = await self._rate_limited_request(
self.ad_account.get_ad_sets,
ad_account.get_ad_sets,
fields=[
AdSet.Field.name,
AdSet.Field.campaign_id,
@@ -210,17 +242,16 @@ class ScheduledInsightsGrabber:
)
count += 1
print(f" {count} ad sets cached")
print(f" {count} ad sets cached for {account_id}")
async def grab_account_insights(self, date_preset: str = "today"):
async def grab_account_insights(self, account_id: str, date_preset: str = "today"):
"""
Grab and store account-level insights.
Args:
account_id: Ad account ID
date_preset: Meta date preset (default: 'today')
"""
print(f"Grabbing account insights ({date_preset})...")
fields = [
AdsInsights.Field.impressions,
AdsInsights.Field.clicks,
@@ -240,34 +271,36 @@ class ScheduledInsightsGrabber:
"level": "account",
}
ad_account = AdAccount(account_id)
insights = await self._rate_limited_request(
self.ad_account.get_insights,
ad_account.get_insights,
fields=fields,
params=params,
)
# Store insights
timestamp = datetime.now(timezone.utc)
count = 0
for insight in insights:
await self.db.insert_account_insights(
time=timestamp,
account_id=self.ad_account_id,
account_id=account_id,
data=dict(insight),
date_preset=date_preset,
)
count += 1
print(f" Account insights stored ({len(list(insights))} records)")
print(f" Account insights stored for {account_id} ({count} records)")
async def grab_campaign_insights(self, date_preset: str = "today", limit: int = 50):
async def grab_campaign_insights(self, account_id: str, date_preset: str = "today", limit: int = 50):
"""
Grab and store campaign-level insights.
Args:
account_id: Ad account ID
date_preset: Meta date preset
limit: Maximum number of campaigns
"""
print(f"Grabbing campaign insights ({date_preset}, limit={limit})...")
fields = [
AdsInsights.Field.campaign_id,
AdsInsights.Field.campaign_name,
@@ -287,8 +320,9 @@ class ScheduledInsightsGrabber:
"limit": limit,
}
ad_account = AdAccount(account_id)
insights = await self._rate_limited_request(
self.ad_account.get_insights,
ad_account.get_insights,
fields=fields,
params=params,
)
@@ -302,24 +336,23 @@ class ScheduledInsightsGrabber:
await self.db.insert_campaign_insights(
time=timestamp,
campaign_id=campaign_id,
account_id=self.ad_account_id,
account_id=account_id,
data=dict(insight),
date_preset=date_preset,
)
count += 1
print(f" Campaign insights stored ({count} records)")
print(f" Campaign insights stored for {account_id} ({count} records)")
async def grab_adset_insights(self, date_preset: str = "today", limit: int = 50):
async def grab_adset_insights(self, account_id: str, date_preset: str = "today", limit: int = 50):
"""
Grab and store ad set level insights.
Args:
account_id: Ad account ID
date_preset: Meta date preset
limit: Maximum number of ad sets
"""
print(f"Grabbing ad set insights ({date_preset}, limit={limit})...")
fields = [
AdsInsights.Field.adset_id,
AdsInsights.Field.adset_name,
@@ -340,8 +373,9 @@ class ScheduledInsightsGrabber:
"limit": limit,
}
ad_account = AdAccount(account_id)
insights = await self._rate_limited_request(
self.ad_account.get_insights,
ad_account.get_insights,
fields=fields,
params=params,
)
@@ -357,17 +391,17 @@ class ScheduledInsightsGrabber:
time=timestamp,
adset_id=adset_id,
campaign_id=campaign_id,
account_id=self.ad_account_id,
account_id=account_id,
data=dict(insight),
date_preset=date_preset,
)
count += 1
print(f" Ad set insights stored ({count} records)")
print(f" Ad set insights stored for {account_id} ({count} records)")
async def run_collection_cycle(self, cache_metadata: bool = True):
"""
Run a single collection cycle.
Run a single collection cycle for all ad accounts.
Args:
cache_metadata: Whether to refresh metadata cache
@@ -375,19 +409,38 @@ class ScheduledInsightsGrabber:
print("\n" + "="*60)
print(f"COLLECTION CYCLE - {datetime.now().isoformat()}")
print("="*60)
print(f"Processing {len(self.ad_account_ids)} ad account(s)")
print("="*60 + "\n")
# Loop through all ad accounts
for i, account_id in enumerate(self.ad_account_ids, 1):
print(f"\n[{i}/{len(self.ad_account_ids)}] Processing account: {account_id}")
print("-" * 60)
try:
# Refresh metadata cache if requested (do this less frequently)
if cache_metadata:
await self.cache_account_metadata()
await self.cache_campaigns_metadata(limit=100)
await self.cache_adsets_metadata(limit=100)
print("Caching metadata...")
await self.cache_account_metadata(account_id)
await self.cache_campaigns_metadata(account_id, limit=100)
await self.cache_adsets_metadata(account_id, limit=100)
# Grab insights (always use 'today' for scheduled collection)
await self.grab_account_insights(date_preset="today")
await self.grab_campaign_insights(date_preset="today", limit=50)
await self.grab_adset_insights(date_preset="today", limit=50)
print("Grabbing insights...")
await self.grab_account_insights(account_id, date_preset="today")
await self.grab_campaign_insights(account_id, date_preset="today", limit=50)
await self.grab_adset_insights(account_id, date_preset="today", limit=50)
print(f"✓ Completed {account_id}")
except Exception as e:
print(f"❌ Error processing {account_id}: {e}")
import traceback
traceback.print_exc()
# Continue with next account
# Print rate limiter statistics
print("\n" + "-" * 60)
self.rate_limiter.print_stats()
print("\n" + "="*60)
@@ -400,7 +453,7 @@ class ScheduledInsightsGrabber:
refresh_metadata_every_n_cycles: int = 12,
):
"""
Run scheduled data collection.
Run scheduled data collection for all accessible ad accounts.
Args:
interval_hours: Hours between collection cycles (default: 2)
@@ -409,10 +462,6 @@ class ScheduledInsightsGrabber:
print("\n" + "="*60)
print("SCHEDULED INSIGHTS GRABBER STARTED")
print("="*60)
print(f"Account: {self.ad_account_id}")
print(f"Collection interval: {interval_hours} hours")
print(f"Metadata refresh: every {refresh_metadata_every_n_cycles} cycles")
print("="*60 + "\n")
# Connect to database
self.db = TimescaleDBClient()
@@ -421,6 +470,19 @@ class ScheduledInsightsGrabber:
# Initialize database schema (idempotent - safe to run multiple times)
await self.db.initialize_schema()
# Load all accessible ad accounts
await self.load_ad_accounts()
if not self.ad_account_ids:
print("❌ No ad accounts found. Exiting.")
return 1
print(f"\nCollection interval: {interval_hours} hours")
print(f"Metadata refresh: every {refresh_metadata_every_n_cycles} cycles")
if self.max_accounts:
print(f"Max accounts: {self.max_accounts}")
print("="*60 + "\n")
cycle_count = 0
try:
@@ -452,7 +514,9 @@ class ScheduledInsightsGrabber:
async def async_main():
"""Async main entry point for scheduled grabber."""
try:
grabber = ScheduledInsightsGrabber()
# Initialize with max_accounts=3 for conservative start
# Set max_accounts=None to process all accessible accounts
grabber = ScheduledInsightsGrabber(max_accounts=3)
# Run scheduled collection (every 2 hours)
await grabber.run_scheduled(

View File

@@ -1,12 +1,14 @@
"""
Simple test script to initialize database and grab ad_accounts metadata.
This is useful for testing the database setup and verifying ad account access.
Grabs ALL ad accounts accessible to the token.
"""
import asyncio
import os
from dotenv import load_dotenv
from facebook_business.adobjects.adaccount import AdAccount
from facebook_business.adobjects.user import User
from facebook_business.api import FacebookAdsApi
from meta_api_grabber.database import TimescaleDBClient
@@ -20,18 +22,16 @@ async def test_ad_accounts():
access_token = os.getenv("META_ACCESS_TOKEN")
app_secret = os.getenv("META_APP_SECRET")
app_id = os.getenv("META_APP_ID")
ad_account_id = os.getenv("META_AD_ACCOUNT_ID")
if not all([access_token, app_secret, app_id, ad_account_id]):
if not all([access_token, app_secret, app_id]):
print("❌ Missing required environment variables")
print(" Please ensure META_ACCESS_TOKEN, META_APP_SECRET, META_APP_ID,")
print(" and META_AD_ACCOUNT_ID are set in .env")
print(" Please ensure META_ACCESS_TOKEN, META_APP_SECRET, and META_APP_ID")
print(" are set in .env")
return 1
print("="*60)
print("AD ACCOUNT TEST")
print("AD ACCOUNTS TEST - GRABBING ALL ACCESSIBLE ACCOUNTS")
print("="*60)
print(f"Account ID: {ad_account_id}")
print()
# Initialize Facebook Ads API
@@ -51,56 +51,58 @@ async def test_ad_accounts():
print("\nInitializing database schema...")
await db.initialize_schema()
# Get ad account details from Meta API
print(f"\nFetching ad account details from Meta API...")
ad_account = AdAccount(ad_account_id)
# Get all ad accounts accessible to this token
print("\nFetching all ad accounts accessible to this token...")
me = User(fbid='me')
account_fields = ['name', 'currency', 'timezone_name', 'account_status']
account_data = ad_account.api_get(fields=account_fields)
ad_accounts = me.get_ad_accounts(fields=account_fields)
print("\nAd Account Details:")
print(f" ID: {ad_account_id}")
print(f" Name: {account_data.get('name', 'N/A')}")
print(f" Currency: {account_data.get('currency', 'N/A')}")
print(f" Timezone: {account_data.get('timezone_name', 'N/A')}")
print(f" Status: {account_data.get('account_status', 'N/A')}")
print(f"Found {len(ad_accounts)} ad account(s)\n")
stored_count = 0
for account in ad_accounts:
account_id = account['id']
print(f"Ad Account {stored_count + 1}:")
print(f" ID: {account_id}")
print(f" Name: {account.get('name', 'N/A')}")
print(f" Currency: {account.get('currency', 'N/A')}")
print(f" Timezone: {account.get('timezone_name', 'N/A')}")
print(f" Status: {account.get('account_status', 'N/A')}")
# Store in database
print("\nStoring ad account in database...")
await db.upsert_ad_account(
account_id=ad_account_id,
account_name=account_data.get('name'),
currency=account_data.get('currency'),
timezone_name=account_data.get('timezone_name'),
account_id=account_id,
account_name=account.get('name'),
currency=account.get('currency'),
timezone_name=account.get('timezone_name'),
)
print("✓ Ad account successfully stored in database")
print(f" ✓ Stored in database\n")
stored_count += 1
# Verify by querying the database
print("\nVerifying database storage...")
print("="*60)
print(f"Verifying database storage ({stored_count} account(s))...")
print("="*60)
async with db.pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT * FROM ad_accounts WHERE account_id = $1",
ad_account_id
)
if row:
print("✓ Ad account found in database:")
print(f" Account ID: {row['account_id']}")
print(f" Account Name: {row['account_name']}")
print(f" Currency: {row['currency']}")
print(f" Timezone: {row['timezone_name']}")
print(f" Created At: {row['created_at']}")
print(f" Updated At: {row['updated_at']}")
rows = await conn.fetch("SELECT * FROM ad_accounts ORDER BY account_name")
if rows:
print(f"\n{len(rows)} ad account(s) found in database:\n")
for i, row in enumerate(rows, 1):
print(f"{i}. {row['account_name']} ({row['account_id']})")
print(f" Currency: {row['currency']} | Timezone: {row['timezone_name']}")
print(f" Updated: {row['updated_at']}\n")
else:
print("Ad account not found in database")
print("No ad accounts found in database")
print("\n" + "="*60)
print("="*60)
print("TEST COMPLETED SUCCESSFULLY")
print("="*60)
print(f"\n✓ Successfully grabbed and stored {stored_count} ad account(s)")
print("\nNext steps:")
print("1. Check your database with: docker exec -it meta_api_grabber-timescaledb-1 psql -U meta_user -d meta_insights")
print("2. Query ad accounts: SELECT * FROM ad_accounts;")
print("3. Run full scheduled grabber: uv run python src/meta_api_grabber/scheduled_grabber.py")
print("1. Check your database: docker exec meta_timescaledb psql -U meta_user -d meta_insights -c 'SELECT * FROM ad_accounts;'")
print("2. Run scheduled grabber for all accounts: meta-scheduled")
print("3. The scheduled grabber will now process all these accounts!")
except Exception as e:
print(f"\n❌ Error: {e}")