Added country level statistics on the campaign level

This commit is contained in:
Jonas Linter
2025-11-10 10:26:49 +01:00
parent e577945e75
commit 3ae4ce0d83
4 changed files with 229 additions and 0 deletions

View File

@@ -465,6 +465,74 @@ class TimescaleDBClient:
ctr, cpc, cpm, actions, date_preset, date_start, date_stop
)
async def insert_campaign_insights_by_country(
self,
time: datetime,
campaign_id: str,
account_id: str,
country: str,
data: Dict[str, Any],
date_preset: str = "today",
):
"""
Insert campaign-level insights data broken down by country.
Args:
time: Timestamp for the data point
campaign_id: Campaign ID
account_id: Ad account ID
country: ISO 2-letter country code
data: Insights data dictionary from Meta API
date_preset: Date preset used
"""
query = """
INSERT INTO campaign_insights_by_country (
time, campaign_id, account_id, country, impressions, clicks, spend, reach,
ctr, cpc, cpm, actions, date_preset, date_start, date_stop, fetched_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, NOW())
ON CONFLICT (time, campaign_id, country)
DO UPDATE SET
impressions = EXCLUDED.impressions,
clicks = EXCLUDED.clicks,
spend = EXCLUDED.spend,
reach = EXCLUDED.reach,
ctr = EXCLUDED.ctr,
cpc = EXCLUDED.cpc,
cpm = EXCLUDED.cpm,
actions = EXCLUDED.actions,
date_preset = EXCLUDED.date_preset,
date_start = EXCLUDED.date_start,
date_stop = EXCLUDED.date_stop,
fetched_at = NOW()
"""
impressions = int(data.get("impressions", 0)) if data.get("impressions") else None
clicks = int(data.get("clicks", 0)) if data.get("clicks") else None
spend = float(data.get("spend", 0)) if data.get("spend") else None
reach = int(data.get("reach", 0)) if data.get("reach") else None
ctr = float(data.get("ctr", 0)) if data.get("ctr") else None
cpc = float(data.get("cpc", 0)) if data.get("cpc") else None
cpm = float(data.get("cpm", 0)) if data.get("cpm") else None
# Extract date range from Meta API response and convert to date objects
from datetime import date as Date
date_start = None
date_stop = None
if data.get("date_start"):
date_start = Date.fromisoformat(data["date_start"])
if data.get("date_stop"):
date_stop = Date.fromisoformat(data["date_stop"])
import json
actions = json.dumps(data.get("actions", [])) if data.get("actions") else None
async with self.pool.acquire() as conn:
await conn.execute(
query,
time, campaign_id, account_id, country, impressions, clicks, spend, reach,
ctr, cpc, cpm, actions, date_preset, date_start, date_stop
)
# ========================================================================
# QUERY HELPERS
# ========================================================================

View File

@@ -189,6 +189,54 @@ CREATE INDEX IF NOT EXISTS idx_adset_insights_account_time
ON adset_insights (account_id, time DESC);
-- Campaign-level insights by country (time-series data)
CREATE TABLE IF NOT EXISTS campaign_insights_by_country (
time TIMESTAMPTZ NOT NULL,
campaign_id VARCHAR(50) NOT NULL REFERENCES campaigns(campaign_id),
account_id VARCHAR(50) NOT NULL REFERENCES ad_accounts(account_id),
country VARCHAR(2) NOT NULL, -- ISO 2-letter country code
-- Core metrics
impressions BIGINT,
clicks BIGINT,
spend NUMERIC(12, 2),
reach BIGINT,
-- Calculated metrics
ctr NUMERIC(10, 6),
cpc NUMERIC(10, 4),
cpm NUMERIC(10, 4),
-- Actions
actions JSONB,
-- Metadata
date_preset VARCHAR(50),
date_start DATE, -- Actual start date of the data range from Meta API
date_stop DATE, -- Actual end date of the data range from Meta API
fetched_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (time, campaign_id, country)
);
-- Convert to hypertable
SELECT create_hypertable('campaign_insights_by_country', 'time',
if_not_exists => TRUE,
chunk_time_interval => INTERVAL '1 day'
);
CREATE INDEX IF NOT EXISTS idx_campaign_insights_by_country_campaign_time
ON campaign_insights_by_country (campaign_id, time DESC);
CREATE INDEX IF NOT EXISTS idx_campaign_insights_by_country_account_time
ON campaign_insights_by_country (account_id, time DESC);
CREATE INDEX IF NOT EXISTS idx_campaign_insights_by_country_country
ON campaign_insights_by_country (country, time DESC);
-- Compression policy for campaign_insights_by_country
SELECT add_compression_policy('campaign_insights_by_country', INTERVAL '7 days', if_not_exists => TRUE);
-- ============================================================================
-- CONTINUOUS AGGREGATES (Pre-computed rollups for dashboards)
-- ============================================================================

View File

@@ -628,6 +628,79 @@ class ScheduledInsightsGrabber:
print(f" Ad set insights stored for {account_id} ({count} records)")
async def grab_campaign_insights_by_country(self, account_id: str, date_preset: str = "today", limit: int = 50):
"""
Grab and store campaign-level insights broken down by country.
Args:
account_id: Ad account ID
date_preset: Meta date preset
limit: Maximum number of campaigns
"""
fields = [
AdsInsights.Field.campaign_id,
AdsInsights.Field.campaign_name,
AdsInsights.Field.country,
AdsInsights.Field.impressions,
AdsInsights.Field.clicks,
AdsInsights.Field.spend,
AdsInsights.Field.ctr,
AdsInsights.Field.cpc,
AdsInsights.Field.cpm,
AdsInsights.Field.reach,
AdsInsights.Field.actions,
AdsInsights.Field.date_start,
AdsInsights.Field.date_stop,
]
params = {
"date_preset": date_preset,
"level": "campaign",
"breakdown": "country",
"limit": limit,
}
ad_account = AdAccount(account_id)
try:
insights = await self._rate_limited_request(
ad_account.get_insights,
fields=fields,
params=params,
)
except FacebookRequestError as e:
error_code = e.api_error_code()
if error_code in [190, 102]:
raise ValueError(f"Access token is invalid (error {error_code}): {e.api_error_message()}")
raise
# Get account timezone from database
account_timezone = await self._get_account_timezone(account_id)
# Store insights (metadata is automatically cached from insights data)
count = 0
for insight in insights:
campaign_id = insight.get('campaign_id')
country = insight.get('country')
if campaign_id and country:
insight_dict = dict(insight)
# Compute appropriate timestamp based on date_preset and account timezone
date_start_str = insight_dict.get("date_start")
timestamp = self._compute_timestamp(date_start_str, account_timezone)
# Insert insights - metadata is automatically cached from the insights data
await self.db.insert_campaign_insights_by_country(
time=timestamp,
campaign_id=campaign_id,
account_id=account_id,
country=country,
data=insight_dict,
date_preset=date_preset,
)
count += 1
print(f" Campaign insights by country stored for {account_id} ({count} records)")
async def grab_account_insights_for_date_range(
self,
account_id: str,
@@ -1036,6 +1109,7 @@ class ScheduledInsightsGrabber:
print("Grabbing today's insights...")
date_start = await self.grab_account_insights(account_id, date_preset="today")
await self.grab_campaign_insights(account_id, date_preset="today", limit=50)
await self.grab_campaign_insights_by_country(account_id, date_preset="today", limit=50)
await self.grab_adset_insights(account_id, date_preset="today", limit=50)
# Track today's date from first account
@@ -1088,6 +1162,7 @@ class ScheduledInsightsGrabber:
print("Grabbing yesterday's insights...")
await self.grab_account_insights(account_id, date_preset="yesterday")
await self.grab_campaign_insights(account_id, date_preset="yesterday", limit=50)
await self.grab_campaign_insights_by_country(account_id, date_preset="yesterday", limit=50)
await self.grab_adset_insights(account_id, date_preset="yesterday", limit=50)
print(f"✓ Completed yesterday's data for {account_id}")

View File

@@ -0,0 +1,38 @@
--- campaign insights by country
DROP MATERIALIZED VIEW IF EXISTS campaign_insights_by_country_flattened CASCADE;
CREATE MATERIALIZED VIEW campaign_insights_by_country_flattened AS
SELECT
time,
account_id,
campaign_id,
country,
impressions,
clicks,
spend,
reach,
ctr,
cpc,
cpm,
date_preset,
date_start,
date_stop,
fetched_at,
(SELECT (value->>'value')::numeric
FROM jsonb_array_elements(actions)
WHERE value->>'action_type' = 'link_click') AS link_click,
(SELECT (value->>'value')::numeric
FROM jsonb_array_elements(actions)
WHERE value->>'action_type' = 'landing_page_view') AS landing_page_view,
(SELECT (value->>'value')::numeric
FROM jsonb_array_elements(actions)
WHERE value->>'action_type' = 'lead') AS lead
FROM campaign_insights_by_country;
CREATE INDEX idx_campaign_insights_by_country_flat_date ON campaign_insights_by_country_flattened(date_start, date_stop);
CREATE UNIQUE INDEX idx_campaign_insights_by_country_flat_unique ON campaign_insights_by_country_flattened(time, campaign_id, country);
REFRESH MATERIALIZED VIEW CONCURRENTLY campaign_insights_by_country_flattened;