diff --git a/src/meta_api_grabber/database.py b/src/meta_api_grabber/database.py index 512cb4f..d8efb87 100644 --- a/src/meta_api_grabber/database.py +++ b/src/meta_api_grabber/database.py @@ -85,16 +85,36 @@ class TimescaleDBClient: # Execute schema statement by statement for better error handling async with self.pool.acquire() as conn: - statements = [s.strip() for s in schema_sql.split(';') if s.strip()] + # Set a reasonable timeout for schema operations (5 minutes per statement) + await conn.execute("SET statement_timeout = '300s'") + + # Split statements and track their line numbers in the original file + statements = [] + current_line = 1 + for stmt in schema_sql.split(';'): + stmt_stripped = stmt.strip() + if stmt_stripped: + # Count newlines before this statement to get starting line + statements.append((stmt_stripped, current_line)) + # Update line counter (count newlines in the original chunk including ';') + current_line += stmt.count('\n') + 1 + errors = [] compression_warnings = [] - for i, stmt in enumerate(statements, 1): + for i, (stmt, line_num) in enumerate(statements, 1): if not stmt: continue + # Show progress for potentially slow operations + stmt_lower = stmt.lower() + if any(keyword in stmt_lower for keyword in ['refresh materialized view', 'drop schema', 'create index']): + print(f" Executing statement {i} (line {line_num})...") + try: await conn.execute(stmt) + except asyncio.TimeoutError: + errors.append((i, line_num, "Statement execution timed out (>5 minutes)")) except Exception as stmt_error: error_msg = str(stmt_error).lower() @@ -104,19 +124,22 @@ class TimescaleDBClient: continue elif "columnstore not enabled" in error_msg: # Track compression warnings separately - compression_warnings.append(i) + compression_warnings.append((i, line_num)) elif "'nonetype' object has no attribute 'decode'" in error_msg: # Silently ignore decode errors (usually comments/extensions) continue + elif "canceling statement due to statement timeout" in error_msg: + # Handle PostgreSQL timeout errors + errors.append((i, line_num, "Statement execution timed out (>5 minutes)")) else: # Real errors - errors.append((i, stmt_error)) + errors.append((i, line_num, stmt_error)) # Report results if errors: print(f"⚠️ {len(errors)} error(s) during schema initialization:") - for stmt_num, error in errors: - print(f" Statement {stmt_num}: {error}") + for stmt_num, line_num, error in errors: + print(f" Statement {stmt_num} (line {line_num}): {error}") if compression_warnings: print("ℹ️ Note: Data compression not available (TimescaleDB columnstore not enabled)") diff --git a/src/meta_api_grabber/db_schema.sql b/src/meta_api_grabber/db_schema.sql index 7028292..f44410e 100644 --- a/src/meta_api_grabber/db_schema.sql +++ b/src/meta_api_grabber/db_schema.sql @@ -11,6 +11,14 @@ CREATE SCHEMA public; -- Set ownership to meta_user ALTER SCHEMA public OWNER TO meta_user; +-- ============================================================================ +-- EXTENSIONS +-- ============================================================================ +-- Create TimescaleDB extension if it doesn't exist +-- This provides time_bucket() and other time-series functions + +CREATE EXTENSION IF NOT EXISTS timescaledb; + -- ============================================================================ -- METADATA TABLE -- ============================================================================ @@ -34,6 +42,17 @@ CREATE TABLE IF NOT EXISTS public.account_metadata ( ) ); +-- Permission grants for Grafana user + +GRANT USAGE ON SCHEMA public TO grafana; + +-- Grant SELECT on all existing tables and views in the schema +GRANT SELECT ON ALL TABLES IN SCHEMA public TO grafana; + +-- Grant SELECT on all future tables and views in the schema +ALTER DEFAULT PRIVILEGES IN SCHEMA public +GRANT SELECT ON TABLES TO grafana; + -- ============================================================================ @@ -52,6 +71,8 @@ CREATE VIEW campaign_insights AS SELECT date_start AS "time", account_id AS account_id, campaign_id, + campaign_name, + objective, impressions, clicks, spend, @@ -61,7 +82,6 @@ CREATE VIEW campaign_insights AS cpm, cpp, frequency, - objective, ( SELECT (jsonb_array_elements.value ->> 'value'::text)::numeric AS "numeric" FROM jsonb_array_elements(customcampaign_insights.actions) jsonb_array_elements(value) WHERE (jsonb_array_elements.value ->> 'action_type'::text) = 'link_click'::text) AS link_click, @@ -232,6 +252,23 @@ FROM campaign_insights_by_gender GROUP BY time, account_id, gender; +DROP VIEW IF EXISTS account_insights_by_device CASCADE; + +CREATE VIEW account_insights_by_device AS +SELECT + time, + account_id, + device_platform, + SUM(impressions) AS impressions, + SUM(clicks) AS clicks, + SUM(spend) AS spend, + SUM(link_click) AS link_click, + SUM(landing_page_view) AS landing_page_view, + SUM(lead) AS lead +FROM campaign_insights_by_device_flattened +GROUP BY time, account_id, device_platform; + + DROP VIEW IF EXISTS account_insights_by_age CASCADE; CREATE VIEW account_insights_by_age AS @@ -265,7 +302,7 @@ SELECT FROM campaign_insights_by_gender_and_age GROUP BY time, account_id, age, gender; - +DROP VIEW IF EXISTS account_insights; CREATE VIEW account_insights AS SELECT time, @@ -276,6 +313,7 @@ SELECT SUM(link_click) AS link_click, SUM(landing_page_view) AS landing_page_view, SUM(lead) AS lead, + SUM(reach) as reach, AVG(frequency) as frequency, avg(cpc) as cpc, avg(cpm) as cpm, @@ -285,17 +323,125 @@ SELECT FROM campaign_insights group by time, account_id; +DROP MATERIALIZED VIEW IF EXISTS ads_insights CASCADE; +-- ads view +CREATE MATERIALIZED VIEW ads_insights AS +SELECT + date_start AS time, + name as ad_name, + ins.account_id, + ins.ad_id, + ins.adset_id, + ins.campaign_id, + impressions, + reach, + clicks, + (SELECT SUM((elem.value ->> 'value')::numeric) + FROM jsonb_array_elements(ins.actions) AS elem + WHERE (elem.value ->> 'action_type') = 'link_click') AS link_click, + (SELECT SUM((elem.value ->> 'value')::numeric) + FROM jsonb_array_elements(ins.actions) AS elem + WHERE (elem.value ->> 'action_type') = 'lead') AS lead, + spend, + frequency, + cpc, + cpm, + ctr, + cpp +FROM meta.ads_insights ins + join meta.ads as a on a.id = ins.ad_id; + +-- Create indexes for efficient querying +CREATE INDEX idx_ads_insights + ON ads_insights(time, ad_id); + +CREATE UNIQUE INDEX ads_insights_unique + ON ads_insights(time, account_id, ad_id); + +REFRESH MATERIALIZED VIEW CONCURRENTLY ads_insights; + +DROP VIEW IF EXISTS adset_insights CASCADE; + +CREATE VIEW adset_insights AS +SELECT + time, + account_id, + adset_id, + campaign_id, + SUM(impressions) AS impressions, + SUM(clicks) AS clicks, + sum(link_click) as link_click, + sum(lead) as lead, + sum(landing_page_view) as landing_page_view, + SUM(spend) AS spend, + sum(reach), + AVG(frequency) as frequency, + avg(cpc) as cpc, + avg(cpm) as cpm, + avg(cpp) as cpp, + avg(ctr) as ctr + +FROM ads_insights +group by time, account_id, adset_id, campaign_id + + +DROP VIEW IF EXISTS g_account_insights CASCADE; +CREATE VIEW g_account_insights AS +SELECT + time, + account_id, + clicks, + impressions, + interactions, + cost_micros, + cost_micros / 1000000.0 as cost, + leads, + engagements, + customer_currency_code, + account_name, + + -- CTR (Click-Through Rate) + (clicks::numeric / impressions_nz) * 100 as ctr, + + -- CPM (Cost Per Mille) in micros and standard units + (cost_micros::numeric / impressions_nz) * 1000 as cpm_micros, + (cost_micros::numeric / impressions_nz) * 1000 / 1000000.0 as cpm, + + -- CPC (Cost Per Click) in micros and standard units + cost_micros::numeric / clicks_nz as cpc_micros, + cost_micros::numeric / clicks_nz / 1000000.0 as cpc, + + -- CPL (Cost Per Lead) in micros and standard units + cost_micros::numeric / leads_nz as cpl_micros, + cost_micros::numeric / leads_nz / 1000000.0 as cpl, + + -- Conversion Rate + (leads::numeric / clicks_nz) * 100 as conversion_rate, + + -- Engagement Rate + (engagements::numeric / impressions_nz) * 100 as engagement_rate + +FROM ( + SELECT + segments_date as time, + customer_id as account_id, + sum(metrics_clicks) as clicks, + sum(metrics_impressions) as impressions, + sum(metrics_interactions) as interactions, + sum(metrics_cost_micros) as cost_micros, + sum(metrics_conversions) as leads, + sum(metrics_engagements) as engagements, + customer_currency_code, + customer_descriptive_name as account_name, + -- Null-safe denominators + NULLIF(sum(metrics_clicks), 0) as clicks_nz, + NULLIF(sum(metrics_impressions), 0) as impressions_nz, + NULLIF(sum(metrics_conversions), 0) as leads_nz + FROM google.account_performance_report + GROUP BY account_id, time, customer_currency_code, account_name +) base; --- Permission grants for Grafana user -GRANT USAGE ON SCHEMA public TO grafana; - --- Grant SELECT on all existing tables and views in the schema -GRANT SELECT ON ALL TABLES IN SCHEMA public TO grafana; - --- Grant SELECT on all future tables and views in the schema -ALTER DEFAULT PRIVILEGES IN SCHEMA public -GRANT SELECT ON TABLES TO grafana; \ No newline at end of file diff --git a/src/meta_api_grabber/views/adset_insights.sql b/src/meta_api_grabber/views/adset_insights.sql index 708feb0..27ad31b 100644 --- a/src/meta_api_grabber/views/adset_insights.sql +++ b/src/meta_api_grabber/views/adset_insights.sql @@ -35,4 +35,22 @@ CREATE INDEX idx_adset_insights_flat_date ON adset_insights_flattened(date_start CREATE UNIQUE INDEX idx_adset_insights_flat_unique ON adset_insights_flattened(time, adset_id); -REFRESH MATERIALIZED VIEW CONCURRENTLY adset_insights_flattened; \ No newline at end of file +REFRESH MATERIALIZED VIEW CONCURRENTLY adset_insights_flattened; + + +SELECT + date_start as time, + account_id, + adset_id, + campaign_id, + SUM(impressions) AS impressions, + SUM(clicks) AS clicks, + SUM(spend) AS spend, + AVG(frequency) as frequency, + avg(cpc) as cpc, + avg(cpm) as cpm, + avg(cpp) as cpp, + avg(ctr) as ctr + +FROM meta.ads_insights +group by time, account_id, adset_id, campaign_id \ No newline at end of file