diff --git a/src/meta_api_grabber/discover_accounts.py b/src/meta_api_grabber/discover_accounts.py new file mode 100644 index 0000000..bcfe2c1 --- /dev/null +++ b/src/meta_api_grabber/discover_accounts.py @@ -0,0 +1,140 @@ +""" +Discover all ad accounts accessible via different methods: +1. User ad accounts (personal access) +2. Business Manager ad accounts (app-level access) +""" + +import asyncio +import os +from dotenv import load_dotenv +from facebook_business.adobjects.user import User +from facebook_business.adobjects.business import Business +from facebook_business.api import FacebookAdsApi + + +def discover_accounts(): + """Discover ad accounts through multiple methods.""" + load_dotenv() + + access_token = os.getenv("META_ACCESS_TOKEN") + app_secret = os.getenv("META_APP_SECRET") + app_id = os.getenv("META_APP_ID") + + if not all([access_token, app_secret, app_id]): + print("❌ Missing required environment variables") + return 1 + + # Initialize Facebook Ads API + FacebookAdsApi.init( + app_id=app_id, + app_secret=app_secret, + access_token=access_token, + ) + + print("="*70) + print("AD ACCOUNT DISCOVERY") + print("="*70) + print() + + # Method 1: User ad accounts (what we currently use) + print("METHOD 1: User Ad Accounts (Personal Access)") + print("-"*70) + try: + me = User(fbid='me') + account_fields = ['name', 'account_id'] + user_accounts = me.get_ad_accounts(fields=account_fields) + user_account_list = list(user_accounts) + + print(f"Found {len(user_account_list)} user ad account(s):") + for acc in user_account_list: + print(f" - {acc.get('name')} ({acc.get('id')})") + print() + except Exception as e: + print(f"❌ Error getting user accounts: {e}\n") + user_account_list = [] + + # Method 2: Business Manager accounts + print("METHOD 2: Business Manager Ad Accounts (App-Level Access)") + print("-"*70) + try: + me = User(fbid='me') + # Get businesses this user/app has access to + businesses = me.get_businesses(fields=['id', 'name']) + business_list = list(businesses) + + if not business_list: + print("No Business Managers found.") + print() + print("ℹ️ To access ad accounts at the app level, you need to:") + print(" 1. Have a Meta Business Manager") + print(" 2. Add your app to the Business Manager") + print(" 3. Grant the app access to ad accounts") + print() + else: + print(f"Found {len(business_list)} Business Manager(s):") + all_business_accounts = [] + + for biz in business_list: + biz_id = biz.get('id') + biz_name = biz.get('name') + print(f"\n Business: {biz_name} ({biz_id})") + + try: + business = Business(fbid=biz_id) + # Get all ad accounts owned/managed by this business + biz_accounts = business.get_owned_ad_accounts( + fields=['id', 'name', 'account_status'] + ) + biz_account_list = list(biz_accounts) + + print(f" Ad Accounts: {len(biz_account_list)}") + for acc in biz_account_list: + print(f" - {acc.get('name')} ({acc.get('id')})") + all_business_accounts.append(acc) + + except Exception as e: + print(f" ❌ Error accessing business accounts: {e}") + + print() + print(f"Total Business Manager ad accounts: {len(all_business_accounts)}") + + except Exception as e: + print(f"❌ Error getting businesses: {e}") + print() + + # Method 3: App-level access token (for reference) + print() + print("METHOD 3: App Access Token (NOT recommended for ad accounts)") + print("-"*70) + print("App access tokens can be generated with:") + print(f" curl 'https://graph.facebook.com/oauth/access_token") + print(f" ?client_id={app_id}") + print(f" &client_secret=YOUR_SECRET") + print(f" &grant_type=client_credentials'") + print() + print("⚠️ However, app access tokens CANNOT access ad accounts directly.") + print(" The Marketing API requires user-level permissions for privacy/security.") + print() + + # Summary + print("="*70) + print("SUMMARY") + print("="*70) + print(f"User Ad Accounts: {len(user_account_list)}") + print() + print("💡 RECOMMENDATION:") + print(" The current implementation using User.get_ad_accounts() is correct.") + print(" To access MORE ad accounts, you have two options:") + print() + print(" Option 1: Use a System User token from Business Manager") + print(" (grants access to all Business Manager ad accounts)") + print() + print(" Option 2: Have other users authorize your app") + print(" (each user's token will see their ad accounts)") + print() + + return 0 + + +if __name__ == "__main__": + exit(discover_accounts()) diff --git a/src/meta_api_grabber/scheduled_grabber.py b/src/meta_api_grabber/scheduled_grabber.py index d031894..ce82909 100644 --- a/src/meta_api_grabber/scheduled_grabber.py +++ b/src/meta_api_grabber/scheduled_grabber.py @@ -5,12 +5,13 @@ Runs periodically to build time-series data for dashboards. import asyncio import os -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from typing import Optional from dotenv import load_dotenv from facebook_business.adobjects.adaccount import AdAccount from facebook_business.adobjects.adsinsights import AdsInsights +from facebook_business.adobjects.user import User from facebook_business.api import FacebookAdsApi from .database import TimescaleDBClient @@ -31,13 +32,14 @@ class ScheduledInsightsGrabber: - Auto-throttles when approaching limits """ - def __init__(self, access_token: Optional[str] = None, auto_refresh_token: bool = True): + def __init__(self, access_token: Optional[str] = None, auto_refresh_token: bool = True, max_accounts: Optional[int] = None): """ Initialize the scheduled grabber. Args: access_token: Optional access token. If not provided, loads from env. auto_refresh_token: If True, automatically refreshes tokens before expiry + max_accounts: Maximum number of ad accounts to process. None = all accounts, or set to a limit (e.g., 3) """ load_dotenv() @@ -54,7 +56,7 @@ class ScheduledInsightsGrabber: self.app_secret = os.getenv("META_APP_SECRET") self.app_id = os.getenv("META_APP_ID") - self.ad_account_id = os.getenv("META_AD_ACCOUNT_ID") + self.max_accounts = max_accounts if not self.access_token: raise ValueError( @@ -62,15 +64,16 @@ class ScheduledInsightsGrabber: "run 'uv run python src/meta_api_grabber/auth.py'" ) - if not all([self.app_secret, self.app_id, self.ad_account_id]): + if not all([self.app_secret, self.app_id]): raise ValueError( - "Missing required environment variables (META_APP_SECRET, META_APP_ID, META_AD_ACCOUNT_ID)" + "Missing required environment variables (META_APP_SECRET, META_APP_ID)" ) # Initialize Facebook Ads API self._init_api() - self.ad_account = AdAccount(self.ad_account_id) + # Will store list of ad account IDs + self.ad_account_ids = [] # Database client self.db: Optional[TimescaleDBClient] = None @@ -102,10 +105,37 @@ class ScheduledInsightsGrabber: print("🔄 Token was refreshed, reinitializing API...") self.access_token = new_token self._init_api() - self.ad_account = AdAccount(self.ad_account_id) except Exception as e: print(f"⚠️ Token refresh check failed: {e}") + async def load_ad_accounts(self): + """ + Load all accessible ad accounts from Meta API. + Respects max_accounts limit if set. + """ + print("Loading accessible ad accounts...") + me = User(fbid='me') + account_fields = ['name'] + + ad_accounts = await self._rate_limited_request( + me.get_ad_accounts, + fields=account_fields + ) + + # Convert to list and apply limit + all_accounts = list(ad_accounts) + if self.max_accounts: + all_accounts = all_accounts[:self.max_accounts] + print(f" Limiting to first {self.max_accounts} account(s)") + + self.ad_account_ids = [acc['id'] for acc in all_accounts] + + print(f" Loaded {len(self.ad_account_ids)} ad account(s):") + for acc_id in self.ad_account_ids: + print(f" - {acc_id}") + + return self.ad_account_ids + async def _rate_limited_request(self, func, *args, **kwargs): """ Execute a request with intelligent rate limiting and backoff. @@ -118,45 +148,47 @@ class ScheduledInsightsGrabber: """ return await self.rate_limiter.execute_with_retry(func, *args, **kwargs) - async def cache_account_metadata(self): + async def cache_account_metadata(self, account_id: str): """ Cache ad account metadata to database. + Args: + account_id: Ad account ID to cache + This reduces API calls by storing rarely-changing account info. """ - print("Caching account metadata...") - # Get account details + ad_account = AdAccount(account_id) account_fields = ['name', 'currency', 'timezone_name'] account_data = await self._rate_limited_request( - self.ad_account.api_get, + ad_account.api_get, fields=account_fields ) # Store in database await self.db.upsert_ad_account( - account_id=self.ad_account_id, + account_id=account_id, account_name=account_data.get('name'), currency=account_data.get('currency'), timezone_name=account_data.get('timezone_name'), ) - print(f" Account cached: {account_data.get('name')}") + print(f" Account cached: {account_data.get('name')} ({account_id})") - async def cache_campaigns_metadata(self, limit: int = 100): + async def cache_campaigns_metadata(self, account_id: str, limit: int = 100): """ Cache campaign metadata to database. Args: + account_id: Ad account ID limit: Maximum number of campaigns to cache """ - print("Caching campaign metadata...") - # Get campaigns from facebook_business.adobjects.campaign import Campaign + ad_account = AdAccount(account_id) campaigns = await self._rate_limited_request( - self.ad_account.get_campaigns, + ad_account.get_campaigns, fields=[ Campaign.Field.name, Campaign.Field.status, @@ -169,29 +201,29 @@ class ScheduledInsightsGrabber: for campaign in campaigns: await self.db.upsert_campaign( campaign_id=campaign['id'], - account_id=self.ad_account_id, + account_id=account_id, campaign_name=campaign.get('name', 'Unknown'), status=campaign.get('status'), objective=campaign.get('objective'), ) count += 1 - print(f" {count} campaigns cached") + print(f" {count} campaigns cached for {account_id}") - async def cache_adsets_metadata(self, limit: int = 100): + async def cache_adsets_metadata(self, account_id: str, limit: int = 100): """ Cache ad set metadata to database. Args: + account_id: Ad account ID limit: Maximum number of ad sets to cache """ - print("Caching ad set metadata...") - # Get ad sets from facebook_business.adobjects.adset import AdSet + ad_account = AdAccount(account_id) adsets = await self._rate_limited_request( - self.ad_account.get_ad_sets, + ad_account.get_ad_sets, fields=[ AdSet.Field.name, AdSet.Field.campaign_id, @@ -210,17 +242,16 @@ class ScheduledInsightsGrabber: ) count += 1 - print(f" {count} ad sets cached") + print(f" {count} ad sets cached for {account_id}") - async def grab_account_insights(self, date_preset: str = "today"): + async def grab_account_insights(self, account_id: str, date_preset: str = "today"): """ Grab and store account-level insights. Args: + account_id: Ad account ID date_preset: Meta date preset (default: 'today') """ - print(f"Grabbing account insights ({date_preset})...") - fields = [ AdsInsights.Field.impressions, AdsInsights.Field.clicks, @@ -240,34 +271,36 @@ class ScheduledInsightsGrabber: "level": "account", } + ad_account = AdAccount(account_id) insights = await self._rate_limited_request( - self.ad_account.get_insights, + ad_account.get_insights, fields=fields, params=params, ) # Store insights timestamp = datetime.now(timezone.utc) + count = 0 for insight in insights: await self.db.insert_account_insights( time=timestamp, - account_id=self.ad_account_id, + account_id=account_id, data=dict(insight), date_preset=date_preset, ) + count += 1 - print(f" Account insights stored ({len(list(insights))} records)") + print(f" Account insights stored for {account_id} ({count} records)") - async def grab_campaign_insights(self, date_preset: str = "today", limit: int = 50): + async def grab_campaign_insights(self, account_id: str, date_preset: str = "today", limit: int = 50): """ Grab and store campaign-level insights. Args: + account_id: Ad account ID date_preset: Meta date preset limit: Maximum number of campaigns """ - print(f"Grabbing campaign insights ({date_preset}, limit={limit})...") - fields = [ AdsInsights.Field.campaign_id, AdsInsights.Field.campaign_name, @@ -287,8 +320,9 @@ class ScheduledInsightsGrabber: "limit": limit, } + ad_account = AdAccount(account_id) insights = await self._rate_limited_request( - self.ad_account.get_insights, + ad_account.get_insights, fields=fields, params=params, ) @@ -302,24 +336,23 @@ class ScheduledInsightsGrabber: await self.db.insert_campaign_insights( time=timestamp, campaign_id=campaign_id, - account_id=self.ad_account_id, + account_id=account_id, data=dict(insight), date_preset=date_preset, ) count += 1 - print(f" Campaign insights stored ({count} records)") + print(f" Campaign insights stored for {account_id} ({count} records)") - async def grab_adset_insights(self, date_preset: str = "today", limit: int = 50): + async def grab_adset_insights(self, account_id: str, date_preset: str = "today", limit: int = 50): """ Grab and store ad set level insights. Args: + account_id: Ad account ID date_preset: Meta date preset limit: Maximum number of ad sets """ - print(f"Grabbing ad set insights ({date_preset}, limit={limit})...") - fields = [ AdsInsights.Field.adset_id, AdsInsights.Field.adset_name, @@ -340,8 +373,9 @@ class ScheduledInsightsGrabber: "limit": limit, } + ad_account = AdAccount(account_id) insights = await self._rate_limited_request( - self.ad_account.get_insights, + ad_account.get_insights, fields=fields, params=params, ) @@ -357,17 +391,17 @@ class ScheduledInsightsGrabber: time=timestamp, adset_id=adset_id, campaign_id=campaign_id, - account_id=self.ad_account_id, + account_id=account_id, data=dict(insight), date_preset=date_preset, ) count += 1 - print(f" Ad set insights stored ({count} records)") + print(f" Ad set insights stored for {account_id} ({count} records)") async def run_collection_cycle(self, cache_metadata: bool = True): """ - Run a single collection cycle. + Run a single collection cycle for all ad accounts. Args: cache_metadata: Whether to refresh metadata cache @@ -375,19 +409,38 @@ class ScheduledInsightsGrabber: print("\n" + "="*60) print(f"COLLECTION CYCLE - {datetime.now().isoformat()}") print("="*60) + print(f"Processing {len(self.ad_account_ids)} ad account(s)") + print("="*60 + "\n") - # Refresh metadata cache if requested (do this less frequently) - if cache_metadata: - await self.cache_account_metadata() - await self.cache_campaigns_metadata(limit=100) - await self.cache_adsets_metadata(limit=100) + # Loop through all ad accounts + for i, account_id in enumerate(self.ad_account_ids, 1): + print(f"\n[{i}/{len(self.ad_account_ids)}] Processing account: {account_id}") + print("-" * 60) - # Grab insights (always use 'today' for scheduled collection) - await self.grab_account_insights(date_preset="today") - await self.grab_campaign_insights(date_preset="today", limit=50) - await self.grab_adset_insights(date_preset="today", limit=50) + try: + # Refresh metadata cache if requested (do this less frequently) + if cache_metadata: + print("Caching metadata...") + await self.cache_account_metadata(account_id) + await self.cache_campaigns_metadata(account_id, limit=100) + await self.cache_adsets_metadata(account_id, limit=100) + + # Grab insights (always use 'today' for scheduled collection) + print("Grabbing insights...") + await self.grab_account_insights(account_id, date_preset="today") + await self.grab_campaign_insights(account_id, date_preset="today", limit=50) + await self.grab_adset_insights(account_id, date_preset="today", limit=50) + + print(f"✓ Completed {account_id}") + + except Exception as e: + print(f"❌ Error processing {account_id}: {e}") + import traceback + traceback.print_exc() + # Continue with next account # Print rate limiter statistics + print("\n" + "-" * 60) self.rate_limiter.print_stats() print("\n" + "="*60) @@ -400,7 +453,7 @@ class ScheduledInsightsGrabber: refresh_metadata_every_n_cycles: int = 12, ): """ - Run scheduled data collection. + Run scheduled data collection for all accessible ad accounts. Args: interval_hours: Hours between collection cycles (default: 2) @@ -409,10 +462,6 @@ class ScheduledInsightsGrabber: print("\n" + "="*60) print("SCHEDULED INSIGHTS GRABBER STARTED") print("="*60) - print(f"Account: {self.ad_account_id}") - print(f"Collection interval: {interval_hours} hours") - print(f"Metadata refresh: every {refresh_metadata_every_n_cycles} cycles") - print("="*60 + "\n") # Connect to database self.db = TimescaleDBClient() @@ -421,6 +470,19 @@ class ScheduledInsightsGrabber: # Initialize database schema (idempotent - safe to run multiple times) await self.db.initialize_schema() + # Load all accessible ad accounts + await self.load_ad_accounts() + + if not self.ad_account_ids: + print("❌ No ad accounts found. Exiting.") + return 1 + + print(f"\nCollection interval: {interval_hours} hours") + print(f"Metadata refresh: every {refresh_metadata_every_n_cycles} cycles") + if self.max_accounts: + print(f"Max accounts: {self.max_accounts}") + print("="*60 + "\n") + cycle_count = 0 try: @@ -452,7 +514,9 @@ class ScheduledInsightsGrabber: async def async_main(): """Async main entry point for scheduled grabber.""" try: - grabber = ScheduledInsightsGrabber() + # Initialize with max_accounts=3 for conservative start + # Set max_accounts=None to process all accessible accounts + grabber = ScheduledInsightsGrabber(max_accounts=3) # Run scheduled collection (every 2 hours) await grabber.run_scheduled( diff --git a/src/meta_api_grabber/test_ad_accounts.py b/src/meta_api_grabber/test_ad_accounts.py index 07fd5ff..e11be62 100644 --- a/src/meta_api_grabber/test_ad_accounts.py +++ b/src/meta_api_grabber/test_ad_accounts.py @@ -1,12 +1,14 @@ """ Simple test script to initialize database and grab ad_accounts metadata. This is useful for testing the database setup and verifying ad account access. +Grabs ALL ad accounts accessible to the token. """ import asyncio import os from dotenv import load_dotenv from facebook_business.adobjects.adaccount import AdAccount +from facebook_business.adobjects.user import User from facebook_business.api import FacebookAdsApi from meta_api_grabber.database import TimescaleDBClient @@ -20,18 +22,16 @@ async def test_ad_accounts(): access_token = os.getenv("META_ACCESS_TOKEN") app_secret = os.getenv("META_APP_SECRET") app_id = os.getenv("META_APP_ID") - ad_account_id = os.getenv("META_AD_ACCOUNT_ID") - if not all([access_token, app_secret, app_id, ad_account_id]): + if not all([access_token, app_secret, app_id]): print("❌ Missing required environment variables") - print(" Please ensure META_ACCESS_TOKEN, META_APP_SECRET, META_APP_ID,") - print(" and META_AD_ACCOUNT_ID are set in .env") + print(" Please ensure META_ACCESS_TOKEN, META_APP_SECRET, and META_APP_ID") + print(" are set in .env") return 1 print("="*60) - print("AD ACCOUNT TEST") + print("AD ACCOUNTS TEST - GRABBING ALL ACCESSIBLE ACCOUNTS") print("="*60) - print(f"Account ID: {ad_account_id}") print() # Initialize Facebook Ads API @@ -51,56 +51,58 @@ async def test_ad_accounts(): print("\nInitializing database schema...") await db.initialize_schema() - # Get ad account details from Meta API - print(f"\nFetching ad account details from Meta API...") - ad_account = AdAccount(ad_account_id) + # Get all ad accounts accessible to this token + print("\nFetching all ad accounts accessible to this token...") + me = User(fbid='me') account_fields = ['name', 'currency', 'timezone_name', 'account_status'] - account_data = ad_account.api_get(fields=account_fields) + ad_accounts = me.get_ad_accounts(fields=account_fields) - print("\nAd Account Details:") - print(f" ID: {ad_account_id}") - print(f" Name: {account_data.get('name', 'N/A')}") - print(f" Currency: {account_data.get('currency', 'N/A')}") - print(f" Timezone: {account_data.get('timezone_name', 'N/A')}") - print(f" Status: {account_data.get('account_status', 'N/A')}") + print(f"Found {len(ad_accounts)} ad account(s)\n") - # Store in database - print("\nStoring ad account in database...") - await db.upsert_ad_account( - account_id=ad_account_id, - account_name=account_data.get('name'), - currency=account_data.get('currency'), - timezone_name=account_data.get('timezone_name'), - ) + stored_count = 0 + for account in ad_accounts: + account_id = account['id'] + print(f"Ad Account {stored_count + 1}:") + print(f" ID: {account_id}") + print(f" Name: {account.get('name', 'N/A')}") + print(f" Currency: {account.get('currency', 'N/A')}") + print(f" Timezone: {account.get('timezone_name', 'N/A')}") + print(f" Status: {account.get('account_status', 'N/A')}") - print("✓ Ad account successfully stored in database") + # Store in database + await db.upsert_ad_account( + account_id=account_id, + account_name=account.get('name'), + currency=account.get('currency'), + timezone_name=account.get('timezone_name'), + ) + print(f" ✓ Stored in database\n") + stored_count += 1 # Verify by querying the database - print("\nVerifying database storage...") + print("="*60) + print(f"Verifying database storage ({stored_count} account(s))...") + print("="*60) async with db.pool.acquire() as conn: - row = await conn.fetchrow( - "SELECT * FROM ad_accounts WHERE account_id = $1", - ad_account_id - ) - if row: - print("✓ Ad account found in database:") - print(f" Account ID: {row['account_id']}") - print(f" Account Name: {row['account_name']}") - print(f" Currency: {row['currency']}") - print(f" Timezone: {row['timezone_name']}") - print(f" Created At: {row['created_at']}") - print(f" Updated At: {row['updated_at']}") + rows = await conn.fetch("SELECT * FROM ad_accounts ORDER BY account_name") + if rows: + print(f"\n✓ {len(rows)} ad account(s) found in database:\n") + for i, row in enumerate(rows, 1): + print(f"{i}. {row['account_name']} ({row['account_id']})") + print(f" Currency: {row['currency']} | Timezone: {row['timezone_name']}") + print(f" Updated: {row['updated_at']}\n") else: - print("❌ Ad account not found in database") + print("❌ No ad accounts found in database") - print("\n" + "="*60) + print("="*60) print("TEST COMPLETED SUCCESSFULLY") print("="*60) + print(f"\n✓ Successfully grabbed and stored {stored_count} ad account(s)") print("\nNext steps:") - print("1. Check your database with: docker exec -it meta_api_grabber-timescaledb-1 psql -U meta_user -d meta_insights") - print("2. Query ad accounts: SELECT * FROM ad_accounts;") - print("3. Run full scheduled grabber: uv run python src/meta_api_grabber/scheduled_grabber.py") + print("1. Check your database: docker exec meta_timescaledb psql -U meta_user -d meta_insights -c 'SELECT * FROM ad_accounts;'") + print("2. Run scheduled grabber for all accounts: meta-scheduled") + print("3. The scheduled grabber will now process all these accounts!") except Exception as e: print(f"\n❌ Error: {e}")