Managing views in meta_api_grabber now

This commit is contained in:
2025-11-05 15:43:01 +00:00
parent 750ff0d4ff
commit 68223f664a
7 changed files with 299 additions and 0 deletions

View File

@@ -20,6 +20,7 @@ from facebook_business.exceptions import FacebookRequestError
from .database import TimescaleDBClient
from .rate_limiter import MetaRateLimiter
from .token_manager import MetaTokenManager
from .view_manager import ViewManager
# Set up logger
logger = logging.getLogger(__name__)
@@ -84,6 +85,9 @@ class ScheduledInsightsGrabber:
# Database client
self.db: Optional[TimescaleDBClient] = None
# View manager for materialized views
self.view_manager: Optional[ViewManager] = None
# Rate limiter with backoff (Meta best practices)
self.rate_limiter = MetaRateLimiter(
base_delay=2.0, # 2 seconds base delay
@@ -1135,6 +1139,17 @@ class ScheduledInsightsGrabber:
print("\n" + "-" * 60)
self.rate_limiter.print_stats()
# Refresh materialized views after new data has been inserted
if self.view_manager:
print("\n" + "-" * 60)
print("Refreshing materialized views...")
try:
await self.view_manager.refresh_all_views()
print("✓ Materialized views refreshed successfully")
except Exception as e:
print(f"⚠️ Warning: Failed to refresh materialized views: {e}")
# Don't fail the entire cycle, just log the warning
print("\n" + "="*60)
print("COLLECTION CYCLE COMPLETE")
print("="*60 + "\n")
@@ -1164,6 +1179,10 @@ class ScheduledInsightsGrabber:
# Initialize database schema (idempotent - safe to run multiple times)
await self.db.initialize_schema()
# Initialize view manager and create/ensure views exist
self.view_manager = ViewManager(self.db.pool)
await self.view_manager.initialize_views()
# Load all accessible ad accounts
await self.load_ad_accounts()

View File

@@ -0,0 +1,115 @@
"""
View manager for TimescaleDB materialized views.
Handles creation, updates, and refresh of materialized views for flattened insights data.
Views are loaded from individual SQL files in the views directory.
"""
import logging
import pathlib
from typing import List, Optional
import asyncpg
logger = logging.getLogger(__name__)
class ViewManager:
"""Manages materialized views for insights data flattening."""
def __init__(self, pool: asyncpg.Pool):
"""
Initialize view manager with a database connection pool.
Args:
pool: asyncpg connection pool
"""
self.pool = pool
self.views_dir = pathlib.Path(__file__).parent / "views"
async def initialize_views(self) -> None:
"""
Initialize all materialized views at startup.
Loads and executes SQL files from the views directory in alphabetical order.
Creates views if they don't exist, idempotent operation.
"""
logger.info("Initializing materialized views...")
if not self.views_dir.exists():
logger.warning(f"Views directory not found at {self.views_dir}")
return
# Get all .sql files in alphabetical order for consistent execution
view_files = sorted(self.views_dir.glob("*.sql"))
if not view_files:
logger.warning(f"No SQL files found in {self.views_dir}")
return
async with self.pool.acquire() as conn:
for view_file in view_files:
logger.debug(f"Loading view file: {view_file.name}")
await self._execute_view_file(conn, view_file)
logger.info("✓ Materialized views initialized successfully")
async def _execute_view_file(self, conn: asyncpg.Connection, view_file: pathlib.Path) -> None:
"""
Execute SQL statements from a view file.
Args:
conn: asyncpg connection
view_file: Path to SQL file
"""
with open(view_file, 'r') as f:
view_sql = f.read()
statements = [s.strip() for s in view_sql.split(';') if s.strip()]
for i, stmt in enumerate(statements, 1):
if not stmt:
continue
try:
await conn.execute(stmt)
logger.debug(f"{view_file.name}: Executed statement {i}")
except Exception as e:
error_msg = str(e).lower()
if "does not exist" in error_msg:
# Could be a missing dependent view or table, log it
logger.debug(f"{view_file.name}: View or table does not exist (statement {i})")
else:
# Log other errors but don't fail - could be incompatible schema changes
logger.warning(f"{view_file.name}: Error in statement {i}: {e}")
async def refresh_views(self, view_names: Optional[List[str]] = None) -> None:
"""
Refresh specified materialized views.
Args:
view_names: List of view names to refresh. If None, refreshes all views.
"""
if view_names is None:
view_names = [
"adset_insights_flattened",
"account_insights_flattened",
"campaign_insights_flattened",
]
async with self.pool.acquire() as conn:
for view_name in view_names:
try:
# Use CONCURRENTLY to avoid locking
await conn.execute(
f"REFRESH MATERIALIZED VIEW CONCURRENTLY {view_name};"
)
logger.debug(f"Refreshed materialized view: {view_name}")
except Exception as e:
error_msg = str(e).lower()
# View might not exist if not initialized, that's okay
if "does not exist" in error_msg:
logger.debug(f"View does not exist, skipping refresh: {view_name}")
else:
logger.warning(f"Error refreshing view {view_name}: {e}")
async def refresh_all_views(self) -> None:
"""Refresh all materialized views."""
await self.refresh_views()

View File

@@ -0,0 +1,42 @@
CREATE OR REPLACE MATERIALIZED VIEW account_insights_flattened AS
SELECT
time,
account_id,
impressions,
clicks,
spend,
reach,
frequency,
ctr,
cpc,
cpm,
cpp,
date_preset,
date_start,
date_stop,
fetched_at,
(SELECT (value->>'value')::numeric
FROM jsonb_array_elements(actions)
WHERE value->>'action_type' = 'link_click') AS link_click,
(SELECT (value->>'value')::numeric
FROM jsonb_array_elements(actions)
WHERE value->>'action_type' = 'landing_page_view') AS landing_page_view,
(SELECT (value->>'value')::numeric
FROM jsonb_array_elements(actions)
WHERE value->>'action_type' = 'lead') AS lead,
(SELECT (value->>'value')::numeric
FROM jsonb_array_elements(cost_per_action_type)
WHERE value->>'action_type' = 'link_click') AS cost_per_link_click,
(SELECT (value->>'value')::numeric
FROM jsonb_array_elements(cost_per_action_type)
WHERE value->>'action_type' = 'landing_page_view') AS cost_per_landing_page_view,
(SELECT (value->>'value')::numeric
FROM jsonb_array_elements(cost_per_action_type)
WHERE value->>'action_type' = 'lead') AS cost_per_lead
FROM account_insights;
CREATE INDEX idx_account_insights_flat_date ON account_insights_flattened(date_start, date_stop);
CREATE UNIQUE INDEX idx_account_insights_flat_unique ON account_insights_flattened(time, account_id);
REFRESH MATERIALIZED VIEW CONCURRENTLY account_insights_flattened;

View File

@@ -0,0 +1,36 @@
CREATE OR REPLACE MATERIALIZED VIEW adset_insights_flattened AS
SELECT
time,
adset_id,
campaign_id,
account_id,
impressions,
clicks,
spend,
reach,
ctr,
cpc,
cpm,
date_preset,
date_start,
date_stop,
fetched_at,
(SELECT (value->>'value')::numeric
FROM jsonb_array_elements(actions)
WHERE value->>'action_type' = 'link_click') AS link_click,
(SELECT (value->>'value')::numeric
FROM jsonb_array_elements(actions)
WHERE value->>'action_type' = 'landing_page_view') AS landing_page_view,
(SELECT (value->>'value')::numeric
FROM jsonb_array_elements(actions)
WHERE value->>'action_type' = 'lead') AS lead
FROM adset_insights;
-- Add indexes for common query patterns
CREATE INDEX idx_adset_insights_flat_campaign ON adset_insights_flattened(campaign_id);
CREATE INDEX idx_adset_insights_flat_date ON adset_insights_flattened(date_start, date_stop);
CREATE UNIQUE INDEX idx_adset_insights_flat_unique ON adset_insights_flattened(time, adset_id);
REFRESH MATERIALIZED VIEW CONCURRENTLY adset_insights_flattened;

View File

@@ -0,0 +1,36 @@
--- campaign insights
CREATE OR REPLACE MATERIALIZED VIEW campaign_insights_flattened AS
SELECT
time,
account_id,
campaign_id,
impressions,
clicks,
spend,
reach,
ctr,
cpc,
cpm,
date_preset,
date_start,
date_stop,
fetched_at,
(SELECT (value->>'value')::numeric
FROM jsonb_array_elements(actions)
WHERE value->>'action_type' = 'link_click') AS link_click,
(SELECT (value->>'value')::numeric
FROM jsonb_array_elements(actions)
WHERE value->>'action_type' = 'landing_page_view') AS landing_page_view,
(SELECT (value->>'value')::numeric
FROM jsonb_array_elements(actions)
WHERE value->>'action_type' = 'lead') AS lead
FROM campaign_insights;
CREATE INDEX idx_campaign_insights_flat_date ON campaign_insights_flattened(date_start, date_stop);
CREATE UNIQUE INDEX idx_campaign_insights_flat_unique ON campaign_insights_flattened(time, campaign_id);
REFRESH MATERIALIZED VIEW CONCURRENTLY campaign_insights_flattened;

View File

@@ -0,0 +1,11 @@
-- Grant SELECT permissions for Grafana user on flattened views
GRANT SELECT ON account_insights_flattened TO grafana;
GRANT SELECT ON campaign_insights_flattened TO grafana;
GRANT SELECT ON adset_insights_flattened TO grafana;
-- Grant SELECT on all existing tables and views in the schema
GRANT SELECT ON ALL TABLES IN SCHEMA public TO grafana;
-- Grant SELECT on all future tables and views in the schema
ALTER DEFAULT PRIVILEGES IN SCHEMA public
GRANT SELECT ON TABLES TO grafana;

View File

@@ -87,6 +87,44 @@ CREATE UNIQUE INDEX idx_account_insights_flat_unique ON account_insights_flatten
REFRESH MATERIALIZED VIEW CONCURRENTLY account_insights_flattened;
--- campaign insights
CREATE MATERIALIZED VIEW campaign_insights_flattened AS
SELECT
time,
account_id,
campaign_id,
impressions,
clicks,
spend,
reach,
ctr,
cpc,
cpm,
date_preset,
date_start,
date_stop,
fetched_at,
(SELECT (value->>'value')::numeric
FROM jsonb_array_elements(actions)
WHERE value->>'action_type' = 'link_click') AS link_click,
(SELECT (value->>'value')::numeric
FROM jsonb_array_elements(actions)
WHERE value->>'action_type' = 'landing_page_view') AS landing_page_view,
(SELECT (value->>'value')::numeric
FROM jsonb_array_elements(actions)
WHERE value->>'action_type' = 'lead') AS lead
FROM campaign_insights;
CREATE INDEX idx_campaign_insights_flat_date ON campaign_insights_flattened(date_start, date_stop);
CREATE UNIQUE INDEX idx_campaign_insights_flat_unique ON campaign_insights_flattened(time, campaign_id);
REFRESH MATERIALIZED VIEW CONCURRENTLY campaign_insights_flattened;
-- permissinos
-- Grant SELECT on the existing materialized view
@@ -108,3 +146,5 @@ GRANT SELECT ON TABLES TO grafana;