From 68223f664acd0fa2d97b570a1c2b1041695e4646 Mon Sep 17 00:00:00 2001 From: Jonas Linter Date: Wed, 5 Nov 2025 15:43:01 +0000 Subject: [PATCH] Managing views in meta_api_grabber now --- src/meta_api_grabber/scheduled_grabber.py | 19 +++ src/meta_api_grabber/view_manager.py | 115 ++++++++++++++++++ .../views/account_insights.sql | 42 +++++++ src/meta_api_grabber/views/adset_insights.sql | 36 ++++++ .../views/campaign_insights.sql | 36 ++++++ .../views/grafana_permissions.sql | 11 ++ ...ts_flattend.sql => insights_flattened.sql} | 40 ++++++ 7 files changed, 299 insertions(+) create mode 100644 src/meta_api_grabber/view_manager.py create mode 100644 src/meta_api_grabber/views/account_insights.sql create mode 100644 src/meta_api_grabber/views/adset_insights.sql create mode 100644 src/meta_api_grabber/views/campaign_insights.sql create mode 100644 src/meta_api_grabber/views/grafana_permissions.sql rename view_sql_archive/public/{insights_flattend.sql => insights_flattened.sql} (76%) diff --git a/src/meta_api_grabber/scheduled_grabber.py b/src/meta_api_grabber/scheduled_grabber.py index 6e0c151..0ffaff1 100644 --- a/src/meta_api_grabber/scheduled_grabber.py +++ b/src/meta_api_grabber/scheduled_grabber.py @@ -20,6 +20,7 @@ from facebook_business.exceptions import FacebookRequestError from .database import TimescaleDBClient from .rate_limiter import MetaRateLimiter from .token_manager import MetaTokenManager +from .view_manager import ViewManager # Set up logger logger = logging.getLogger(__name__) @@ -84,6 +85,9 @@ class ScheduledInsightsGrabber: # Database client self.db: Optional[TimescaleDBClient] = None + # View manager for materialized views + self.view_manager: Optional[ViewManager] = None + # Rate limiter with backoff (Meta best practices) self.rate_limiter = MetaRateLimiter( base_delay=2.0, # 2 seconds base delay @@ -1135,6 +1139,17 @@ class ScheduledInsightsGrabber: print("\n" + "-" * 60) self.rate_limiter.print_stats() + # Refresh materialized views after new data has been inserted + if self.view_manager: + print("\n" + "-" * 60) + print("Refreshing materialized views...") + try: + await self.view_manager.refresh_all_views() + print("✓ Materialized views refreshed successfully") + except Exception as e: + print(f"⚠️ Warning: Failed to refresh materialized views: {e}") + # Don't fail the entire cycle, just log the warning + print("\n" + "="*60) print("COLLECTION CYCLE COMPLETE") print("="*60 + "\n") @@ -1164,6 +1179,10 @@ class ScheduledInsightsGrabber: # Initialize database schema (idempotent - safe to run multiple times) await self.db.initialize_schema() + # Initialize view manager and create/ensure views exist + self.view_manager = ViewManager(self.db.pool) + await self.view_manager.initialize_views() + # Load all accessible ad accounts await self.load_ad_accounts() diff --git a/src/meta_api_grabber/view_manager.py b/src/meta_api_grabber/view_manager.py new file mode 100644 index 0000000..1aec1dc --- /dev/null +++ b/src/meta_api_grabber/view_manager.py @@ -0,0 +1,115 @@ +""" +View manager for TimescaleDB materialized views. +Handles creation, updates, and refresh of materialized views for flattened insights data. +Views are loaded from individual SQL files in the views directory. +""" + +import logging +import pathlib +from typing import List, Optional + +import asyncpg + +logger = logging.getLogger(__name__) + + +class ViewManager: + """Manages materialized views for insights data flattening.""" + + def __init__(self, pool: asyncpg.Pool): + """ + Initialize view manager with a database connection pool. + + Args: + pool: asyncpg connection pool + """ + self.pool = pool + self.views_dir = pathlib.Path(__file__).parent / "views" + + async def initialize_views(self) -> None: + """ + Initialize all materialized views at startup. + Loads and executes SQL files from the views directory in alphabetical order. + Creates views if they don't exist, idempotent operation. + """ + logger.info("Initializing materialized views...") + + if not self.views_dir.exists(): + logger.warning(f"Views directory not found at {self.views_dir}") + return + + # Get all .sql files in alphabetical order for consistent execution + view_files = sorted(self.views_dir.glob("*.sql")) + if not view_files: + logger.warning(f"No SQL files found in {self.views_dir}") + return + + async with self.pool.acquire() as conn: + for view_file in view_files: + logger.debug(f"Loading view file: {view_file.name}") + await self._execute_view_file(conn, view_file) + + logger.info("✓ Materialized views initialized successfully") + + async def _execute_view_file(self, conn: asyncpg.Connection, view_file: pathlib.Path) -> None: + """ + Execute SQL statements from a view file. + + Args: + conn: asyncpg connection + view_file: Path to SQL file + """ + with open(view_file, 'r') as f: + view_sql = f.read() + + statements = [s.strip() for s in view_sql.split(';') if s.strip()] + + for i, stmt in enumerate(statements, 1): + if not stmt: + continue + + try: + await conn.execute(stmt) + logger.debug(f"{view_file.name}: Executed statement {i}") + except Exception as e: + error_msg = str(e).lower() + if "does not exist" in error_msg: + # Could be a missing dependent view or table, log it + logger.debug(f"{view_file.name}: View or table does not exist (statement {i})") + else: + # Log other errors but don't fail - could be incompatible schema changes + logger.warning(f"{view_file.name}: Error in statement {i}: {e}") + + async def refresh_views(self, view_names: Optional[List[str]] = None) -> None: + """ + Refresh specified materialized views. + + Args: + view_names: List of view names to refresh. If None, refreshes all views. + """ + if view_names is None: + view_names = [ + "adset_insights_flattened", + "account_insights_flattened", + "campaign_insights_flattened", + ] + + async with self.pool.acquire() as conn: + for view_name in view_names: + try: + # Use CONCURRENTLY to avoid locking + await conn.execute( + f"REFRESH MATERIALIZED VIEW CONCURRENTLY {view_name};" + ) + logger.debug(f"Refreshed materialized view: {view_name}") + except Exception as e: + error_msg = str(e).lower() + # View might not exist if not initialized, that's okay + if "does not exist" in error_msg: + logger.debug(f"View does not exist, skipping refresh: {view_name}") + else: + logger.warning(f"Error refreshing view {view_name}: {e}") + + async def refresh_all_views(self) -> None: + """Refresh all materialized views.""" + await self.refresh_views() diff --git a/src/meta_api_grabber/views/account_insights.sql b/src/meta_api_grabber/views/account_insights.sql new file mode 100644 index 0000000..6e714dd --- /dev/null +++ b/src/meta_api_grabber/views/account_insights.sql @@ -0,0 +1,42 @@ +CREATE OR REPLACE MATERIALIZED VIEW account_insights_flattened AS +SELECT + time, + account_id, + impressions, + clicks, + spend, + reach, + frequency, + ctr, + cpc, + cpm, + cpp, + date_preset, + date_start, + date_stop, + fetched_at, + (SELECT (value->>'value')::numeric + FROM jsonb_array_elements(actions) + WHERE value->>'action_type' = 'link_click') AS link_click, + (SELECT (value->>'value')::numeric + FROM jsonb_array_elements(actions) + WHERE value->>'action_type' = 'landing_page_view') AS landing_page_view, + (SELECT (value->>'value')::numeric + FROM jsonb_array_elements(actions) + WHERE value->>'action_type' = 'lead') AS lead, + (SELECT (value->>'value')::numeric + FROM jsonb_array_elements(cost_per_action_type) + WHERE value->>'action_type' = 'link_click') AS cost_per_link_click, + (SELECT (value->>'value')::numeric + FROM jsonb_array_elements(cost_per_action_type) + WHERE value->>'action_type' = 'landing_page_view') AS cost_per_landing_page_view, + (SELECT (value->>'value')::numeric + FROM jsonb_array_elements(cost_per_action_type) + WHERE value->>'action_type' = 'lead') AS cost_per_lead +FROM account_insights; + +CREATE INDEX idx_account_insights_flat_date ON account_insights_flattened(date_start, date_stop); + +CREATE UNIQUE INDEX idx_account_insights_flat_unique ON account_insights_flattened(time, account_id); + +REFRESH MATERIALIZED VIEW CONCURRENTLY account_insights_flattened; diff --git a/src/meta_api_grabber/views/adset_insights.sql b/src/meta_api_grabber/views/adset_insights.sql new file mode 100644 index 0000000..8b87b21 --- /dev/null +++ b/src/meta_api_grabber/views/adset_insights.sql @@ -0,0 +1,36 @@ +CREATE OR REPLACE MATERIALIZED VIEW adset_insights_flattened AS +SELECT + time, + adset_id, + campaign_id, + account_id, + impressions, + clicks, + spend, + reach, + ctr, + cpc, + cpm, + date_preset, + date_start, + date_stop, + fetched_at, + (SELECT (value->>'value')::numeric + FROM jsonb_array_elements(actions) + WHERE value->>'action_type' = 'link_click') AS link_click, + (SELECT (value->>'value')::numeric + FROM jsonb_array_elements(actions) + WHERE value->>'action_type' = 'landing_page_view') AS landing_page_view, + (SELECT (value->>'value')::numeric + FROM jsonb_array_elements(actions) + WHERE value->>'action_type' = 'lead') AS lead +FROM adset_insights; + +-- Add indexes for common query patterns + +CREATE INDEX idx_adset_insights_flat_campaign ON adset_insights_flattened(campaign_id); +CREATE INDEX idx_adset_insights_flat_date ON adset_insights_flattened(date_start, date_stop); + + +CREATE UNIQUE INDEX idx_adset_insights_flat_unique ON adset_insights_flattened(time, adset_id); +REFRESH MATERIALIZED VIEW CONCURRENTLY adset_insights_flattened; \ No newline at end of file diff --git a/src/meta_api_grabber/views/campaign_insights.sql b/src/meta_api_grabber/views/campaign_insights.sql new file mode 100644 index 0000000..8527548 --- /dev/null +++ b/src/meta_api_grabber/views/campaign_insights.sql @@ -0,0 +1,36 @@ +--- campaign insights + +CREATE OR REPLACE MATERIALIZED VIEW campaign_insights_flattened AS +SELECT + time, + account_id, + campaign_id, + impressions, + clicks, + spend, + reach, + ctr, + cpc, + cpm, + date_preset, + date_start, + date_stop, + fetched_at, + (SELECT (value->>'value')::numeric + FROM jsonb_array_elements(actions) + WHERE value->>'action_type' = 'link_click') AS link_click, + (SELECT (value->>'value')::numeric + FROM jsonb_array_elements(actions) + WHERE value->>'action_type' = 'landing_page_view') AS landing_page_view, + (SELECT (value->>'value')::numeric + FROM jsonb_array_elements(actions) + WHERE value->>'action_type' = 'lead') AS lead + + +FROM campaign_insights; + +CREATE INDEX idx_campaign_insights_flat_date ON campaign_insights_flattened(date_start, date_stop); + +CREATE UNIQUE INDEX idx_campaign_insights_flat_unique ON campaign_insights_flattened(time, campaign_id); + +REFRESH MATERIALIZED VIEW CONCURRENTLY campaign_insights_flattened; \ No newline at end of file diff --git a/src/meta_api_grabber/views/grafana_permissions.sql b/src/meta_api_grabber/views/grafana_permissions.sql new file mode 100644 index 0000000..1d564d0 --- /dev/null +++ b/src/meta_api_grabber/views/grafana_permissions.sql @@ -0,0 +1,11 @@ +-- Grant SELECT permissions for Grafana user on flattened views +GRANT SELECT ON account_insights_flattened TO grafana; +GRANT SELECT ON campaign_insights_flattened TO grafana; +GRANT SELECT ON adset_insights_flattened TO grafana; + +-- Grant SELECT on all existing tables and views in the schema +GRANT SELECT ON ALL TABLES IN SCHEMA public TO grafana; + +-- Grant SELECT on all future tables and views in the schema +ALTER DEFAULT PRIVILEGES IN SCHEMA public +GRANT SELECT ON TABLES TO grafana; diff --git a/view_sql_archive/public/insights_flattend.sql b/view_sql_archive/public/insights_flattened.sql similarity index 76% rename from view_sql_archive/public/insights_flattend.sql rename to view_sql_archive/public/insights_flattened.sql index 12b97aa..8e2855e 100644 --- a/view_sql_archive/public/insights_flattend.sql +++ b/view_sql_archive/public/insights_flattened.sql @@ -87,6 +87,44 @@ CREATE UNIQUE INDEX idx_account_insights_flat_unique ON account_insights_flatten REFRESH MATERIALIZED VIEW CONCURRENTLY account_insights_flattened; +--- campaign insights + +CREATE MATERIALIZED VIEW campaign_insights_flattened AS +SELECT + time, + account_id, + campaign_id, + impressions, + clicks, + spend, + reach, + ctr, + cpc, + cpm, + date_preset, + date_start, + date_stop, + fetched_at, + (SELECT (value->>'value')::numeric + FROM jsonb_array_elements(actions) + WHERE value->>'action_type' = 'link_click') AS link_click, + (SELECT (value->>'value')::numeric + FROM jsonb_array_elements(actions) + WHERE value->>'action_type' = 'landing_page_view') AS landing_page_view, + (SELECT (value->>'value')::numeric + FROM jsonb_array_elements(actions) + WHERE value->>'action_type' = 'lead') AS lead + + +FROM campaign_insights; + +CREATE INDEX idx_campaign_insights_flat_date ON campaign_insights_flattened(date_start, date_stop); + +CREATE UNIQUE INDEX idx_campaign_insights_flat_unique ON campaign_insights_flattened(time, campaign_id); + +REFRESH MATERIALIZED VIEW CONCURRENTLY campaign_insights_flattened; + + -- permissinos -- Grant SELECT on the existing materialized view @@ -108,3 +146,5 @@ GRANT SELECT ON TABLES TO grafana; + +