Compare commits
2 Commits
630f541b4f
...
68223f664a
| Author | SHA1 | Date | |
|---|---|---|---|
| 68223f664a | |||
| 750ff0d4ff |
@@ -20,6 +20,7 @@ from facebook_business.exceptions import FacebookRequestError
|
||||
from .database import TimescaleDBClient
|
||||
from .rate_limiter import MetaRateLimiter
|
||||
from .token_manager import MetaTokenManager
|
||||
from .view_manager import ViewManager
|
||||
|
||||
# Set up logger
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -84,6 +85,9 @@ class ScheduledInsightsGrabber:
|
||||
# Database client
|
||||
self.db: Optional[TimescaleDBClient] = None
|
||||
|
||||
# View manager for materialized views
|
||||
self.view_manager: Optional[ViewManager] = None
|
||||
|
||||
# Rate limiter with backoff (Meta best practices)
|
||||
self.rate_limiter = MetaRateLimiter(
|
||||
base_delay=2.0, # 2 seconds base delay
|
||||
@@ -1135,6 +1139,17 @@ class ScheduledInsightsGrabber:
|
||||
print("\n" + "-" * 60)
|
||||
self.rate_limiter.print_stats()
|
||||
|
||||
# Refresh materialized views after new data has been inserted
|
||||
if self.view_manager:
|
||||
print("\n" + "-" * 60)
|
||||
print("Refreshing materialized views...")
|
||||
try:
|
||||
await self.view_manager.refresh_all_views()
|
||||
print("✓ Materialized views refreshed successfully")
|
||||
except Exception as e:
|
||||
print(f"⚠️ Warning: Failed to refresh materialized views: {e}")
|
||||
# Don't fail the entire cycle, just log the warning
|
||||
|
||||
print("\n" + "="*60)
|
||||
print("COLLECTION CYCLE COMPLETE")
|
||||
print("="*60 + "\n")
|
||||
@@ -1164,6 +1179,10 @@ class ScheduledInsightsGrabber:
|
||||
# Initialize database schema (idempotent - safe to run multiple times)
|
||||
await self.db.initialize_schema()
|
||||
|
||||
# Initialize view manager and create/ensure views exist
|
||||
self.view_manager = ViewManager(self.db.pool)
|
||||
await self.view_manager.initialize_views()
|
||||
|
||||
# Load all accessible ad accounts
|
||||
await self.load_ad_accounts()
|
||||
|
||||
|
||||
115
src/meta_api_grabber/view_manager.py
Normal file
115
src/meta_api_grabber/view_manager.py
Normal file
@@ -0,0 +1,115 @@
|
||||
"""
|
||||
View manager for TimescaleDB materialized views.
|
||||
Handles creation, updates, and refresh of materialized views for flattened insights data.
|
||||
Views are loaded from individual SQL files in the views directory.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import pathlib
|
||||
from typing import List, Optional
|
||||
|
||||
import asyncpg
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ViewManager:
|
||||
"""Manages materialized views for insights data flattening."""
|
||||
|
||||
def __init__(self, pool: asyncpg.Pool):
|
||||
"""
|
||||
Initialize view manager with a database connection pool.
|
||||
|
||||
Args:
|
||||
pool: asyncpg connection pool
|
||||
"""
|
||||
self.pool = pool
|
||||
self.views_dir = pathlib.Path(__file__).parent / "views"
|
||||
|
||||
async def initialize_views(self) -> None:
|
||||
"""
|
||||
Initialize all materialized views at startup.
|
||||
Loads and executes SQL files from the views directory in alphabetical order.
|
||||
Creates views if they don't exist, idempotent operation.
|
||||
"""
|
||||
logger.info("Initializing materialized views...")
|
||||
|
||||
if not self.views_dir.exists():
|
||||
logger.warning(f"Views directory not found at {self.views_dir}")
|
||||
return
|
||||
|
||||
# Get all .sql files in alphabetical order for consistent execution
|
||||
view_files = sorted(self.views_dir.glob("*.sql"))
|
||||
if not view_files:
|
||||
logger.warning(f"No SQL files found in {self.views_dir}")
|
||||
return
|
||||
|
||||
async with self.pool.acquire() as conn:
|
||||
for view_file in view_files:
|
||||
logger.debug(f"Loading view file: {view_file.name}")
|
||||
await self._execute_view_file(conn, view_file)
|
||||
|
||||
logger.info("✓ Materialized views initialized successfully")
|
||||
|
||||
async def _execute_view_file(self, conn: asyncpg.Connection, view_file: pathlib.Path) -> None:
|
||||
"""
|
||||
Execute SQL statements from a view file.
|
||||
|
||||
Args:
|
||||
conn: asyncpg connection
|
||||
view_file: Path to SQL file
|
||||
"""
|
||||
with open(view_file, 'r') as f:
|
||||
view_sql = f.read()
|
||||
|
||||
statements = [s.strip() for s in view_sql.split(';') if s.strip()]
|
||||
|
||||
for i, stmt in enumerate(statements, 1):
|
||||
if not stmt:
|
||||
continue
|
||||
|
||||
try:
|
||||
await conn.execute(stmt)
|
||||
logger.debug(f"{view_file.name}: Executed statement {i}")
|
||||
except Exception as e:
|
||||
error_msg = str(e).lower()
|
||||
if "does not exist" in error_msg:
|
||||
# Could be a missing dependent view or table, log it
|
||||
logger.debug(f"{view_file.name}: View or table does not exist (statement {i})")
|
||||
else:
|
||||
# Log other errors but don't fail - could be incompatible schema changes
|
||||
logger.warning(f"{view_file.name}: Error in statement {i}: {e}")
|
||||
|
||||
async def refresh_views(self, view_names: Optional[List[str]] = None) -> None:
|
||||
"""
|
||||
Refresh specified materialized views.
|
||||
|
||||
Args:
|
||||
view_names: List of view names to refresh. If None, refreshes all views.
|
||||
"""
|
||||
if view_names is None:
|
||||
view_names = [
|
||||
"adset_insights_flattened",
|
||||
"account_insights_flattened",
|
||||
"campaign_insights_flattened",
|
||||
]
|
||||
|
||||
async with self.pool.acquire() as conn:
|
||||
for view_name in view_names:
|
||||
try:
|
||||
# Use CONCURRENTLY to avoid locking
|
||||
await conn.execute(
|
||||
f"REFRESH MATERIALIZED VIEW CONCURRENTLY {view_name};"
|
||||
)
|
||||
logger.debug(f"Refreshed materialized view: {view_name}")
|
||||
except Exception as e:
|
||||
error_msg = str(e).lower()
|
||||
# View might not exist if not initialized, that's okay
|
||||
if "does not exist" in error_msg:
|
||||
logger.debug(f"View does not exist, skipping refresh: {view_name}")
|
||||
else:
|
||||
logger.warning(f"Error refreshing view {view_name}: {e}")
|
||||
|
||||
async def refresh_all_views(self) -> None:
|
||||
"""Refresh all materialized views."""
|
||||
await self.refresh_views()
|
||||
42
src/meta_api_grabber/views/account_insights.sql
Normal file
42
src/meta_api_grabber/views/account_insights.sql
Normal file
@@ -0,0 +1,42 @@
|
||||
CREATE OR REPLACE MATERIALIZED VIEW account_insights_flattened AS
|
||||
SELECT
|
||||
time,
|
||||
account_id,
|
||||
impressions,
|
||||
clicks,
|
||||
spend,
|
||||
reach,
|
||||
frequency,
|
||||
ctr,
|
||||
cpc,
|
||||
cpm,
|
||||
cpp,
|
||||
date_preset,
|
||||
date_start,
|
||||
date_stop,
|
||||
fetched_at,
|
||||
(SELECT (value->>'value')::numeric
|
||||
FROM jsonb_array_elements(actions)
|
||||
WHERE value->>'action_type' = 'link_click') AS link_click,
|
||||
(SELECT (value->>'value')::numeric
|
||||
FROM jsonb_array_elements(actions)
|
||||
WHERE value->>'action_type' = 'landing_page_view') AS landing_page_view,
|
||||
(SELECT (value->>'value')::numeric
|
||||
FROM jsonb_array_elements(actions)
|
||||
WHERE value->>'action_type' = 'lead') AS lead,
|
||||
(SELECT (value->>'value')::numeric
|
||||
FROM jsonb_array_elements(cost_per_action_type)
|
||||
WHERE value->>'action_type' = 'link_click') AS cost_per_link_click,
|
||||
(SELECT (value->>'value')::numeric
|
||||
FROM jsonb_array_elements(cost_per_action_type)
|
||||
WHERE value->>'action_type' = 'landing_page_view') AS cost_per_landing_page_view,
|
||||
(SELECT (value->>'value')::numeric
|
||||
FROM jsonb_array_elements(cost_per_action_type)
|
||||
WHERE value->>'action_type' = 'lead') AS cost_per_lead
|
||||
FROM account_insights;
|
||||
|
||||
CREATE INDEX idx_account_insights_flat_date ON account_insights_flattened(date_start, date_stop);
|
||||
|
||||
CREATE UNIQUE INDEX idx_account_insights_flat_unique ON account_insights_flattened(time, account_id);
|
||||
|
||||
REFRESH MATERIALIZED VIEW CONCURRENTLY account_insights_flattened;
|
||||
36
src/meta_api_grabber/views/adset_insights.sql
Normal file
36
src/meta_api_grabber/views/adset_insights.sql
Normal file
@@ -0,0 +1,36 @@
|
||||
CREATE OR REPLACE MATERIALIZED VIEW adset_insights_flattened AS
|
||||
SELECT
|
||||
time,
|
||||
adset_id,
|
||||
campaign_id,
|
||||
account_id,
|
||||
impressions,
|
||||
clicks,
|
||||
spend,
|
||||
reach,
|
||||
ctr,
|
||||
cpc,
|
||||
cpm,
|
||||
date_preset,
|
||||
date_start,
|
||||
date_stop,
|
||||
fetched_at,
|
||||
(SELECT (value->>'value')::numeric
|
||||
FROM jsonb_array_elements(actions)
|
||||
WHERE value->>'action_type' = 'link_click') AS link_click,
|
||||
(SELECT (value->>'value')::numeric
|
||||
FROM jsonb_array_elements(actions)
|
||||
WHERE value->>'action_type' = 'landing_page_view') AS landing_page_view,
|
||||
(SELECT (value->>'value')::numeric
|
||||
FROM jsonb_array_elements(actions)
|
||||
WHERE value->>'action_type' = 'lead') AS lead
|
||||
FROM adset_insights;
|
||||
|
||||
-- Add indexes for common query patterns
|
||||
|
||||
CREATE INDEX idx_adset_insights_flat_campaign ON adset_insights_flattened(campaign_id);
|
||||
CREATE INDEX idx_adset_insights_flat_date ON adset_insights_flattened(date_start, date_stop);
|
||||
|
||||
|
||||
CREATE UNIQUE INDEX idx_adset_insights_flat_unique ON adset_insights_flattened(time, adset_id);
|
||||
REFRESH MATERIALIZED VIEW CONCURRENTLY adset_insights_flattened;
|
||||
36
src/meta_api_grabber/views/campaign_insights.sql
Normal file
36
src/meta_api_grabber/views/campaign_insights.sql
Normal file
@@ -0,0 +1,36 @@
|
||||
--- campaign insights
|
||||
|
||||
CREATE OR REPLACE MATERIALIZED VIEW campaign_insights_flattened AS
|
||||
SELECT
|
||||
time,
|
||||
account_id,
|
||||
campaign_id,
|
||||
impressions,
|
||||
clicks,
|
||||
spend,
|
||||
reach,
|
||||
ctr,
|
||||
cpc,
|
||||
cpm,
|
||||
date_preset,
|
||||
date_start,
|
||||
date_stop,
|
||||
fetched_at,
|
||||
(SELECT (value->>'value')::numeric
|
||||
FROM jsonb_array_elements(actions)
|
||||
WHERE value->>'action_type' = 'link_click') AS link_click,
|
||||
(SELECT (value->>'value')::numeric
|
||||
FROM jsonb_array_elements(actions)
|
||||
WHERE value->>'action_type' = 'landing_page_view') AS landing_page_view,
|
||||
(SELECT (value->>'value')::numeric
|
||||
FROM jsonb_array_elements(actions)
|
||||
WHERE value->>'action_type' = 'lead') AS lead
|
||||
|
||||
|
||||
FROM campaign_insights;
|
||||
|
||||
CREATE INDEX idx_campaign_insights_flat_date ON campaign_insights_flattened(date_start, date_stop);
|
||||
|
||||
CREATE UNIQUE INDEX idx_campaign_insights_flat_unique ON campaign_insights_flattened(time, campaign_id);
|
||||
|
||||
REFRESH MATERIALIZED VIEW CONCURRENTLY campaign_insights_flattened;
|
||||
11
src/meta_api_grabber/views/grafana_permissions.sql
Normal file
11
src/meta_api_grabber/views/grafana_permissions.sql
Normal file
@@ -0,0 +1,11 @@
|
||||
-- Grant SELECT permissions for Grafana user on flattened views
|
||||
GRANT SELECT ON account_insights_flattened TO grafana;
|
||||
GRANT SELECT ON campaign_insights_flattened TO grafana;
|
||||
GRANT SELECT ON adset_insights_flattened TO grafana;
|
||||
|
||||
-- Grant SELECT on all existing tables and views in the schema
|
||||
GRANT SELECT ON ALL TABLES IN SCHEMA public TO grafana;
|
||||
|
||||
-- Grant SELECT on all future tables and views in the schema
|
||||
ALTER DEFAULT PRIVILEGES IN SCHEMA public
|
||||
GRANT SELECT ON TABLES TO grafana;
|
||||
150
view_sql_archive/public/insights_flattened.sql
Normal file
150
view_sql_archive/public/insights_flattened.sql
Normal file
@@ -0,0 +1,150 @@
|
||||
-- Auto refreshes when new entries get added. New things can be extracted from actions if necessary.
|
||||
|
||||
CREATE MATERIALIZED VIEW adset_insights_flattened AS
|
||||
SELECT
|
||||
time,
|
||||
adset_id,
|
||||
campaign_id,
|
||||
account_id,
|
||||
impressions,
|
||||
clicks,
|
||||
spend,
|
||||
reach,
|
||||
ctr,
|
||||
cpc,
|
||||
cpm,
|
||||
date_preset,
|
||||
date_start,
|
||||
date_stop,
|
||||
fetched_at,
|
||||
(SELECT (value->>'value')::numeric
|
||||
FROM jsonb_array_elements(actions)
|
||||
WHERE value->>'action_type' = 'link_click') AS link_click,
|
||||
(SELECT (value->>'value')::numeric
|
||||
FROM jsonb_array_elements(actions)
|
||||
WHERE value->>'action_type' = 'landing_page_view') AS landing_page_view,
|
||||
(SELECT (value->>'value')::numeric
|
||||
FROM jsonb_array_elements(actions)
|
||||
WHERE value->>'action_type' = 'lead') AS lead
|
||||
FROM adset_insights;
|
||||
|
||||
-- Add indexes for common query patterns
|
||||
|
||||
CREATE INDEX idx_adset_insights_flat_campaign ON adset_insights_flattened(campaign_id);
|
||||
CREATE INDEX idx_adset_insights_flat_date ON adset_insights_flattened(date_start, date_stop);
|
||||
|
||||
|
||||
CREATE UNIQUE INDEX idx_adset_insights_flat_unique ON adset_insights_flattened(time, adset_id);
|
||||
REFRESH MATERIALIZED VIEW CONCURRENTLY adset_insights_flattened;
|
||||
|
||||
|
||||
|
||||
--- same or atleast very similar for account_insights
|
||||
|
||||
CREATE MATERIALIZED VIEW account_insights_flattened AS
|
||||
SELECT
|
||||
time,
|
||||
account_id,
|
||||
impressions,
|
||||
clicks,
|
||||
spend,
|
||||
reach,
|
||||
frequency,
|
||||
ctr,
|
||||
cpc,
|
||||
cpm,
|
||||
cpp,
|
||||
date_preset,
|
||||
date_start,
|
||||
date_stop,
|
||||
fetched_at,
|
||||
(SELECT (value->>'value')::numeric
|
||||
FROM jsonb_array_elements(actions)
|
||||
WHERE value->>'action_type' = 'link_click') AS link_click,
|
||||
(SELECT (value->>'value')::numeric
|
||||
FROM jsonb_array_elements(actions)
|
||||
WHERE value->>'action_type' = 'landing_page_view') AS landing_page_view,
|
||||
(SELECT (value->>'value')::numeric
|
||||
FROM jsonb_array_elements(actions)
|
||||
WHERE value->>'action_type' = 'lead') AS lead,
|
||||
(SELECT (value->>'value')::numeric
|
||||
FROM jsonb_array_elements(cost_per_action_type)
|
||||
WHERE value->>'action_type' = 'link_click') AS cost_per_link_click,
|
||||
(SELECT (value->>'value')::numeric
|
||||
FROM jsonb_array_elements(cost_per_action_type)
|
||||
WHERE value->>'action_type' = 'landing_page_view') AS cost_per_landing_page_view,
|
||||
(SELECT (value->>'value')::numeric
|
||||
FROM jsonb_array_elements(cost_per_action_type)
|
||||
WHERE value->>'action_type' = 'lead') AS cost_per_lead
|
||||
|
||||
FROM account_insights;
|
||||
|
||||
|
||||
CREATE INDEX idx_account_insights_flat_date ON account_insights_flattened(date_start, date_stop);
|
||||
|
||||
|
||||
CREATE UNIQUE INDEX idx_account_insights_flat_unique ON account_insights_flattened(time, account_id);
|
||||
REFRESH MATERIALIZED VIEW CONCURRENTLY account_insights_flattened;
|
||||
|
||||
|
||||
--- campaign insights
|
||||
|
||||
CREATE MATERIALIZED VIEW campaign_insights_flattened AS
|
||||
SELECT
|
||||
time,
|
||||
account_id,
|
||||
campaign_id,
|
||||
impressions,
|
||||
clicks,
|
||||
spend,
|
||||
reach,
|
||||
ctr,
|
||||
cpc,
|
||||
cpm,
|
||||
date_preset,
|
||||
date_start,
|
||||
date_stop,
|
||||
fetched_at,
|
||||
(SELECT (value->>'value')::numeric
|
||||
FROM jsonb_array_elements(actions)
|
||||
WHERE value->>'action_type' = 'link_click') AS link_click,
|
||||
(SELECT (value->>'value')::numeric
|
||||
FROM jsonb_array_elements(actions)
|
||||
WHERE value->>'action_type' = 'landing_page_view') AS landing_page_view,
|
||||
(SELECT (value->>'value')::numeric
|
||||
FROM jsonb_array_elements(actions)
|
||||
WHERE value->>'action_type' = 'lead') AS lead
|
||||
|
||||
|
||||
FROM campaign_insights;
|
||||
|
||||
CREATE INDEX idx_campaign_insights_flat_date ON campaign_insights_flattened(date_start, date_stop);
|
||||
|
||||
CREATE UNIQUE INDEX idx_campaign_insights_flat_unique ON campaign_insights_flattened(time, campaign_id);
|
||||
|
||||
REFRESH MATERIALIZED VIEW CONCURRENTLY campaign_insights_flattened;
|
||||
|
||||
|
||||
-- permissinos
|
||||
|
||||
-- Grant SELECT on the existing materialized view
|
||||
GRANT SELECT ON account_insights_flattened TO grafana;
|
||||
|
||||
GRANT SELECT ON campaign_insights_flattened TO grafana;
|
||||
|
||||
GRANT SELECT ON adset_insights_flattened TO grafana;
|
||||
|
||||
|
||||
-- Grant SELECT on all existing tables and views in the schema
|
||||
GRANT SELECT ON ALL TABLES IN SCHEMA public TO grafana;
|
||||
|
||||
-- Grant SELECT on all future tables and views in the schema
|
||||
ALTER DEFAULT PRIVILEGES IN SCHEMA public
|
||||
GRANT SELECT ON TABLES TO grafana;
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
10
view_sql_archive/view.md
Normal file
10
view_sql_archive/view.md
Normal file
@@ -0,0 +1,10 @@
|
||||
To make handling the dashboard easier its good practice to create views that make the underlying data more accessible. Since data does not get updated that frequently we can also use materialized views to speed up query performance at the cost of storage.
|
||||
|
||||
## Schemas
|
||||
|
||||
public: Contains the data from the meta_api_grabber application. All ad accounts.
|
||||
|
||||
meta: Contains data from airbyte meta api connector. Unfortunatly somewhat bugged for insights on campaign and adset level. Aggregating data from the ads level is possible but much more cumbersome and error prone then querying the stuff directly. Thats why for now I'm continuing to use the meta_api_grabber
|
||||
|
||||
google: Will contain the data from google from the airbyte connector assuming we get access and the connector is good.
|
||||
|
||||
Reference in New Issue
Block a user