From 3ae4ce0d83e0b19d7e87f79f5ea8f10f1b3a8e41 Mon Sep 17 00:00:00 2001 From: Jonas Linter Date: Mon, 10 Nov 2025 10:26:49 +0100 Subject: [PATCH] Added country level statistics on the campaign level --- src/meta_api_grabber/database.py | 68 +++++++++++++++++ src/meta_api_grabber/db_schema.sql | 48 ++++++++++++ src/meta_api_grabber/scheduled_grabber.py | 75 +++++++++++++++++++ .../views/campaign_insights_by_country.sql | 38 ++++++++++ 4 files changed, 229 insertions(+) create mode 100644 src/meta_api_grabber/views/campaign_insights_by_country.sql diff --git a/src/meta_api_grabber/database.py b/src/meta_api_grabber/database.py index cd1e25a..6ea5eda 100644 --- a/src/meta_api_grabber/database.py +++ b/src/meta_api_grabber/database.py @@ -465,6 +465,74 @@ class TimescaleDBClient: ctr, cpc, cpm, actions, date_preset, date_start, date_stop ) + async def insert_campaign_insights_by_country( + self, + time: datetime, + campaign_id: str, + account_id: str, + country: str, + data: Dict[str, Any], + date_preset: str = "today", + ): + """ + Insert campaign-level insights data broken down by country. + + Args: + time: Timestamp for the data point + campaign_id: Campaign ID + account_id: Ad account ID + country: ISO 2-letter country code + data: Insights data dictionary from Meta API + date_preset: Date preset used + """ + query = """ + INSERT INTO campaign_insights_by_country ( + time, campaign_id, account_id, country, impressions, clicks, spend, reach, + ctr, cpc, cpm, actions, date_preset, date_start, date_stop, fetched_at + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, NOW()) + ON CONFLICT (time, campaign_id, country) + DO UPDATE SET + impressions = EXCLUDED.impressions, + clicks = EXCLUDED.clicks, + spend = EXCLUDED.spend, + reach = EXCLUDED.reach, + ctr = EXCLUDED.ctr, + cpc = EXCLUDED.cpc, + cpm = EXCLUDED.cpm, + actions = EXCLUDED.actions, + date_preset = EXCLUDED.date_preset, + date_start = EXCLUDED.date_start, + date_stop = EXCLUDED.date_stop, + fetched_at = NOW() + """ + + impressions = int(data.get("impressions", 0)) if data.get("impressions") else None + clicks = int(data.get("clicks", 0)) if data.get("clicks") else None + spend = float(data.get("spend", 0)) if data.get("spend") else None + reach = int(data.get("reach", 0)) if data.get("reach") else None + ctr = float(data.get("ctr", 0)) if data.get("ctr") else None + cpc = float(data.get("cpc", 0)) if data.get("cpc") else None + cpm = float(data.get("cpm", 0)) if data.get("cpm") else None + + # Extract date range from Meta API response and convert to date objects + from datetime import date as Date + date_start = None + date_stop = None + if data.get("date_start"): + date_start = Date.fromisoformat(data["date_start"]) + if data.get("date_stop"): + date_stop = Date.fromisoformat(data["date_stop"]) + + import json + actions = json.dumps(data.get("actions", [])) if data.get("actions") else None + + async with self.pool.acquire() as conn: + await conn.execute( + query, + time, campaign_id, account_id, country, impressions, clicks, spend, reach, + ctr, cpc, cpm, actions, date_preset, date_start, date_stop + ) + # ======================================================================== # QUERY HELPERS # ======================================================================== diff --git a/src/meta_api_grabber/db_schema.sql b/src/meta_api_grabber/db_schema.sql index 5d14cb2..14277b9 100644 --- a/src/meta_api_grabber/db_schema.sql +++ b/src/meta_api_grabber/db_schema.sql @@ -189,6 +189,54 @@ CREATE INDEX IF NOT EXISTS idx_adset_insights_account_time ON adset_insights (account_id, time DESC); +-- Campaign-level insights by country (time-series data) +CREATE TABLE IF NOT EXISTS campaign_insights_by_country ( + time TIMESTAMPTZ NOT NULL, + campaign_id VARCHAR(50) NOT NULL REFERENCES campaigns(campaign_id), + account_id VARCHAR(50) NOT NULL REFERENCES ad_accounts(account_id), + country VARCHAR(2) NOT NULL, -- ISO 2-letter country code + + -- Core metrics + impressions BIGINT, + clicks BIGINT, + spend NUMERIC(12, 2), + reach BIGINT, + + -- Calculated metrics + ctr NUMERIC(10, 6), + cpc NUMERIC(10, 4), + cpm NUMERIC(10, 4), + + -- Actions + actions JSONB, + + -- Metadata + date_preset VARCHAR(50), + date_start DATE, -- Actual start date of the data range from Meta API + date_stop DATE, -- Actual end date of the data range from Meta API + fetched_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + + PRIMARY KEY (time, campaign_id, country) +); + +-- Convert to hypertable +SELECT create_hypertable('campaign_insights_by_country', 'time', + if_not_exists => TRUE, + chunk_time_interval => INTERVAL '1 day' +); + +CREATE INDEX IF NOT EXISTS idx_campaign_insights_by_country_campaign_time + ON campaign_insights_by_country (campaign_id, time DESC); +CREATE INDEX IF NOT EXISTS idx_campaign_insights_by_country_account_time + ON campaign_insights_by_country (account_id, time DESC); +CREATE INDEX IF NOT EXISTS idx_campaign_insights_by_country_country + ON campaign_insights_by_country (country, time DESC); + + +-- Compression policy for campaign_insights_by_country +SELECT add_compression_policy('campaign_insights_by_country', INTERVAL '7 days', if_not_exists => TRUE); + + -- ============================================================================ -- CONTINUOUS AGGREGATES (Pre-computed rollups for dashboards) -- ============================================================================ diff --git a/src/meta_api_grabber/scheduled_grabber.py b/src/meta_api_grabber/scheduled_grabber.py index 0ffaff1..b54de8e 100644 --- a/src/meta_api_grabber/scheduled_grabber.py +++ b/src/meta_api_grabber/scheduled_grabber.py @@ -628,6 +628,79 @@ class ScheduledInsightsGrabber: print(f" Ad set insights stored for {account_id} ({count} records)") + async def grab_campaign_insights_by_country(self, account_id: str, date_preset: str = "today", limit: int = 50): + """ + Grab and store campaign-level insights broken down by country. + + Args: + account_id: Ad account ID + date_preset: Meta date preset + limit: Maximum number of campaigns + """ + fields = [ + AdsInsights.Field.campaign_id, + AdsInsights.Field.campaign_name, + AdsInsights.Field.country, + AdsInsights.Field.impressions, + AdsInsights.Field.clicks, + AdsInsights.Field.spend, + AdsInsights.Field.ctr, + AdsInsights.Field.cpc, + AdsInsights.Field.cpm, + AdsInsights.Field.reach, + AdsInsights.Field.actions, + AdsInsights.Field.date_start, + AdsInsights.Field.date_stop, + ] + + params = { + "date_preset": date_preset, + "level": "campaign", + "breakdown": "country", + "limit": limit, + } + + ad_account = AdAccount(account_id) + try: + insights = await self._rate_limited_request( + ad_account.get_insights, + fields=fields, + params=params, + ) + except FacebookRequestError as e: + error_code = e.api_error_code() + if error_code in [190, 102]: + raise ValueError(f"Access token is invalid (error {error_code}): {e.api_error_message()}") + raise + + # Get account timezone from database + account_timezone = await self._get_account_timezone(account_id) + + # Store insights (metadata is automatically cached from insights data) + count = 0 + for insight in insights: + campaign_id = insight.get('campaign_id') + country = insight.get('country') + if campaign_id and country: + insight_dict = dict(insight) + + # Compute appropriate timestamp based on date_preset and account timezone + date_start_str = insight_dict.get("date_start") + timestamp = self._compute_timestamp(date_start_str, account_timezone) + + # Insert insights - metadata is automatically cached from the insights data + await self.db.insert_campaign_insights_by_country( + time=timestamp, + campaign_id=campaign_id, + account_id=account_id, + country=country, + data=insight_dict, + date_preset=date_preset, + ) + count += 1 + + print(f" Campaign insights by country stored for {account_id} ({count} records)") + async def grab_account_insights_for_date_range( self, account_id: str, @@ -1036,6 +1109,7 @@ class ScheduledInsightsGrabber: print("Grabbing today's insights...") date_start = await self.grab_account_insights(account_id, date_preset="today") await self.grab_campaign_insights(account_id, date_preset="today", limit=50) + await self.grab_campaign_insights_by_country(account_id, date_preset="today", limit=50) await self.grab_adset_insights(account_id, date_preset="today", limit=50) # Track today's date from first account @@ -1088,6 +1162,7 @@ class ScheduledInsightsGrabber: print("Grabbing yesterday's insights...") await self.grab_account_insights(account_id, date_preset="yesterday") await self.grab_campaign_insights(account_id, date_preset="yesterday", limit=50) + await self.grab_campaign_insights_by_country(account_id, date_preset="yesterday", limit=50) await self.grab_adset_insights(account_id, date_preset="yesterday", limit=50) print(f"✓ Completed yesterday's data for {account_id}") diff --git a/src/meta_api_grabber/views/campaign_insights_by_country.sql b/src/meta_api_grabber/views/campaign_insights_by_country.sql new file mode 100644 index 0000000..d8d6b67 --- /dev/null +++ b/src/meta_api_grabber/views/campaign_insights_by_country.sql @@ -0,0 +1,38 @@ +--- campaign insights by country + +DROP MATERIALIZED VIEW IF EXISTS campaign_insights_by_country_flattened CASCADE; + +CREATE MATERIALIZED VIEW campaign_insights_by_country_flattened AS +SELECT + time, + account_id, + campaign_id, + country, + impressions, + clicks, + spend, + reach, + ctr, + cpc, + cpm, + date_preset, + date_start, + date_stop, + fetched_at, + (SELECT (value->>'value')::numeric + FROM jsonb_array_elements(actions) + WHERE value->>'action_type' = 'link_click') AS link_click, + (SELECT (value->>'value')::numeric + FROM jsonb_array_elements(actions) + WHERE value->>'action_type' = 'landing_page_view') AS landing_page_view, + (SELECT (value->>'value')::numeric + FROM jsonb_array_elements(actions) + WHERE value->>'action_type' = 'lead') AS lead + +FROM campaign_insights_by_country; + +CREATE INDEX idx_campaign_insights_by_country_flat_date ON campaign_insights_by_country_flattened(date_start, date_stop); + +CREATE UNIQUE INDEX idx_campaign_insights_by_country_flat_unique ON campaign_insights_by_country_flattened(time, campaign_id, country); + +REFRESH MATERIALIZED VIEW CONCURRENTLY campaign_insights_by_country_flattened;