More new views + timescaledb stuff

This commit is contained in:
2025-11-12 21:19:55 +00:00
parent 56e1edb0db
commit 4826c4a744
3 changed files with 205 additions and 18 deletions

View File

@@ -85,16 +85,36 @@ class TimescaleDBClient:
# Execute schema statement by statement for better error handling # Execute schema statement by statement for better error handling
async with self.pool.acquire() as conn: async with self.pool.acquire() as conn:
statements = [s.strip() for s in schema_sql.split(';') if s.strip()] # Set a reasonable timeout for schema operations (5 minutes per statement)
await conn.execute("SET statement_timeout = '300s'")
# Split statements and track their line numbers in the original file
statements = []
current_line = 1
for stmt in schema_sql.split(';'):
stmt_stripped = stmt.strip()
if stmt_stripped:
# Count newlines before this statement to get starting line
statements.append((stmt_stripped, current_line))
# Update line counter (count newlines in the original chunk including ';')
current_line += stmt.count('\n') + 1
errors = [] errors = []
compression_warnings = [] compression_warnings = []
for i, stmt in enumerate(statements, 1): for i, (stmt, line_num) in enumerate(statements, 1):
if not stmt: if not stmt:
continue continue
# Show progress for potentially slow operations
stmt_lower = stmt.lower()
if any(keyword in stmt_lower for keyword in ['refresh materialized view', 'drop schema', 'create index']):
print(f" Executing statement {i} (line {line_num})...")
try: try:
await conn.execute(stmt) await conn.execute(stmt)
except asyncio.TimeoutError:
errors.append((i, line_num, "Statement execution timed out (>5 minutes)"))
except Exception as stmt_error: except Exception as stmt_error:
error_msg = str(stmt_error).lower() error_msg = str(stmt_error).lower()
@@ -104,19 +124,22 @@ class TimescaleDBClient:
continue continue
elif "columnstore not enabled" in error_msg: elif "columnstore not enabled" in error_msg:
# Track compression warnings separately # Track compression warnings separately
compression_warnings.append(i) compression_warnings.append((i, line_num))
elif "'nonetype' object has no attribute 'decode'" in error_msg: elif "'nonetype' object has no attribute 'decode'" in error_msg:
# Silently ignore decode errors (usually comments/extensions) # Silently ignore decode errors (usually comments/extensions)
continue continue
elif "canceling statement due to statement timeout" in error_msg:
# Handle PostgreSQL timeout errors
errors.append((i, line_num, "Statement execution timed out (>5 minutes)"))
else: else:
# Real errors # Real errors
errors.append((i, stmt_error)) errors.append((i, line_num, stmt_error))
# Report results # Report results
if errors: if errors:
print(f"⚠️ {len(errors)} error(s) during schema initialization:") print(f"⚠️ {len(errors)} error(s) during schema initialization:")
for stmt_num, error in errors: for stmt_num, line_num, error in errors:
print(f" Statement {stmt_num}: {error}") print(f" Statement {stmt_num} (line {line_num}): {error}")
if compression_warnings: if compression_warnings:
print(" Note: Data compression not available (TimescaleDB columnstore not enabled)") print(" Note: Data compression not available (TimescaleDB columnstore not enabled)")

View File

@@ -11,6 +11,14 @@ CREATE SCHEMA public;
-- Set ownership to meta_user -- Set ownership to meta_user
ALTER SCHEMA public OWNER TO meta_user; ALTER SCHEMA public OWNER TO meta_user;
-- ============================================================================
-- EXTENSIONS
-- ============================================================================
-- Create TimescaleDB extension if it doesn't exist
-- This provides time_bucket() and other time-series functions
CREATE EXTENSION IF NOT EXISTS timescaledb;
-- ============================================================================ -- ============================================================================
-- METADATA TABLE -- METADATA TABLE
-- ============================================================================ -- ============================================================================
@@ -34,6 +42,17 @@ CREATE TABLE IF NOT EXISTS public.account_metadata (
) )
); );
-- Permission grants for Grafana user
GRANT USAGE ON SCHEMA public TO grafana;
-- Grant SELECT on all existing tables and views in the schema
GRANT SELECT ON ALL TABLES IN SCHEMA public TO grafana;
-- Grant SELECT on all future tables and views in the schema
ALTER DEFAULT PRIVILEGES IN SCHEMA public
GRANT SELECT ON TABLES TO grafana;
-- ============================================================================ -- ============================================================================
@@ -52,6 +71,8 @@ CREATE VIEW campaign_insights AS
SELECT date_start AS "time", SELECT date_start AS "time",
account_id AS account_id, account_id AS account_id,
campaign_id, campaign_id,
campaign_name,
objective,
impressions, impressions,
clicks, clicks,
spend, spend,
@@ -61,7 +82,6 @@ CREATE VIEW campaign_insights AS
cpm, cpm,
cpp, cpp,
frequency, frequency,
objective,
( SELECT (jsonb_array_elements.value ->> 'value'::text)::numeric AS "numeric" ( SELECT (jsonb_array_elements.value ->> 'value'::text)::numeric AS "numeric"
FROM jsonb_array_elements(customcampaign_insights.actions) jsonb_array_elements(value) FROM jsonb_array_elements(customcampaign_insights.actions) jsonb_array_elements(value)
WHERE (jsonb_array_elements.value ->> 'action_type'::text) = 'link_click'::text) AS link_click, WHERE (jsonb_array_elements.value ->> 'action_type'::text) = 'link_click'::text) AS link_click,
@@ -232,6 +252,23 @@ FROM campaign_insights_by_gender
GROUP BY time, account_id, gender; GROUP BY time, account_id, gender;
DROP VIEW IF EXISTS account_insights_by_device CASCADE;
CREATE VIEW account_insights_by_device AS
SELECT
time,
account_id,
device_platform,
SUM(impressions) AS impressions,
SUM(clicks) AS clicks,
SUM(spend) AS spend,
SUM(link_click) AS link_click,
SUM(landing_page_view) AS landing_page_view,
SUM(lead) AS lead
FROM campaign_insights_by_device_flattened
GROUP BY time, account_id, device_platform;
DROP VIEW IF EXISTS account_insights_by_age CASCADE; DROP VIEW IF EXISTS account_insights_by_age CASCADE;
CREATE VIEW account_insights_by_age AS CREATE VIEW account_insights_by_age AS
@@ -265,7 +302,7 @@ SELECT
FROM campaign_insights_by_gender_and_age FROM campaign_insights_by_gender_and_age
GROUP BY time, account_id, age, gender; GROUP BY time, account_id, age, gender;
DROP VIEW IF EXISTS account_insights;
CREATE VIEW account_insights AS CREATE VIEW account_insights AS
SELECT SELECT
time, time,
@@ -276,6 +313,7 @@ SELECT
SUM(link_click) AS link_click, SUM(link_click) AS link_click,
SUM(landing_page_view) AS landing_page_view, SUM(landing_page_view) AS landing_page_view,
SUM(lead) AS lead, SUM(lead) AS lead,
SUM(reach) as reach,
AVG(frequency) as frequency, AVG(frequency) as frequency,
avg(cpc) as cpc, avg(cpc) as cpc,
avg(cpm) as cpm, avg(cpm) as cpm,
@@ -285,17 +323,125 @@ SELECT
FROM campaign_insights FROM campaign_insights
group by time, account_id; group by time, account_id;
DROP MATERIALIZED VIEW IF EXISTS ads_insights CASCADE;
-- ads view
CREATE MATERIALIZED VIEW ads_insights AS
SELECT
date_start AS time,
name as ad_name,
ins.account_id,
ins.ad_id,
ins.adset_id,
ins.campaign_id,
impressions,
reach,
clicks,
(SELECT SUM((elem.value ->> 'value')::numeric)
FROM jsonb_array_elements(ins.actions) AS elem
WHERE (elem.value ->> 'action_type') = 'link_click') AS link_click,
(SELECT SUM((elem.value ->> 'value')::numeric)
FROM jsonb_array_elements(ins.actions) AS elem
WHERE (elem.value ->> 'action_type') = 'lead') AS lead,
spend,
frequency,
cpc,
cpm,
ctr,
cpp
FROM meta.ads_insights ins
join meta.ads as a on a.id = ins.ad_id;
-- Create indexes for efficient querying
CREATE INDEX idx_ads_insights
ON ads_insights(time, ad_id);
CREATE UNIQUE INDEX ads_insights_unique
ON ads_insights(time, account_id, ad_id);
REFRESH MATERIALIZED VIEW CONCURRENTLY ads_insights;
DROP VIEW IF EXISTS adset_insights CASCADE;
CREATE VIEW adset_insights AS
SELECT
time,
account_id,
adset_id,
campaign_id,
SUM(impressions) AS impressions,
SUM(clicks) AS clicks,
sum(link_click) as link_click,
sum(lead) as lead,
sum(landing_page_view) as landing_page_view,
SUM(spend) AS spend,
sum(reach),
AVG(frequency) as frequency,
avg(cpc) as cpc,
avg(cpm) as cpm,
avg(cpp) as cpp,
avg(ctr) as ctr
FROM ads_insights
group by time, account_id, adset_id, campaign_id
DROP VIEW IF EXISTS g_account_insights CASCADE;
CREATE VIEW g_account_insights AS
SELECT
time,
account_id,
clicks,
impressions,
interactions,
cost_micros,
cost_micros / 1000000.0 as cost,
leads,
engagements,
customer_currency_code,
account_name,
-- CTR (Click-Through Rate)
(clicks::numeric / impressions_nz) * 100 as ctr,
-- CPM (Cost Per Mille) in micros and standard units
(cost_micros::numeric / impressions_nz) * 1000 as cpm_micros,
(cost_micros::numeric / impressions_nz) * 1000 / 1000000.0 as cpm,
-- CPC (Cost Per Click) in micros and standard units
cost_micros::numeric / clicks_nz as cpc_micros,
cost_micros::numeric / clicks_nz / 1000000.0 as cpc,
-- CPL (Cost Per Lead) in micros and standard units
cost_micros::numeric / leads_nz as cpl_micros,
cost_micros::numeric / leads_nz / 1000000.0 as cpl,
-- Conversion Rate
(leads::numeric / clicks_nz) * 100 as conversion_rate,
-- Engagement Rate
(engagements::numeric / impressions_nz) * 100 as engagement_rate
FROM (
SELECT
segments_date as time,
customer_id as account_id,
sum(metrics_clicks) as clicks,
sum(metrics_impressions) as impressions,
sum(metrics_interactions) as interactions,
sum(metrics_cost_micros) as cost_micros,
sum(metrics_conversions) as leads,
sum(metrics_engagements) as engagements,
customer_currency_code,
customer_descriptive_name as account_name,
-- Null-safe denominators
NULLIF(sum(metrics_clicks), 0) as clicks_nz,
NULLIF(sum(metrics_impressions), 0) as impressions_nz,
NULLIF(sum(metrics_conversions), 0) as leads_nz
FROM google.account_performance_report
GROUP BY account_id, time, customer_currency_code, account_name
) base;
-- Permission grants for Grafana user
GRANT USAGE ON SCHEMA public TO grafana;
-- Grant SELECT on all existing tables and views in the schema
GRANT SELECT ON ALL TABLES IN SCHEMA public TO grafana;
-- Grant SELECT on all future tables and views in the schema
ALTER DEFAULT PRIVILEGES IN SCHEMA public
GRANT SELECT ON TABLES TO grafana;

View File

@@ -35,4 +35,22 @@ CREATE INDEX idx_adset_insights_flat_date ON adset_insights_flattened(date_start
CREATE UNIQUE INDEX idx_adset_insights_flat_unique ON adset_insights_flattened(time, adset_id); CREATE UNIQUE INDEX idx_adset_insights_flat_unique ON adset_insights_flattened(time, adset_id);
REFRESH MATERIALIZED VIEW CONCURRENTLY adset_insights_flattened; REFRESH MATERIALIZED VIEW CONCURRENTLY adset_insights_flattened;
SELECT
date_start as time,
account_id,
adset_id,
campaign_id,
SUM(impressions) AS impressions,
SUM(clicks) AS clicks,
SUM(spend) AS spend,
AVG(frequency) as frequency,
avg(cpc) as cpc,
avg(cpm) as cpm,
avg(cpp) as cpp,
avg(ctr) as ctr
FROM meta.ads_insights
group by time, account_id, adset_id, campaign_id