Db initialization works
This commit is contained in:
41
docker-compose.yml
Normal file
41
docker-compose.yml
Normal file
@@ -0,0 +1,41 @@
|
||||
version: '3.8'
|
||||
|
||||
services:
|
||||
timescaledb:
|
||||
image: timescale/timescaledb:latest-pg16
|
||||
container_name: meta_timescaledb
|
||||
ports:
|
||||
- "5432:5432"
|
||||
environment:
|
||||
POSTGRES_DB: meta_insights
|
||||
POSTGRES_USER: meta_user
|
||||
POSTGRES_PASSWORD: meta_password
|
||||
volumes:
|
||||
- timescale_data:/var/lib/postgresql/data
|
||||
healthcheck:
|
||||
test: ["CMD-SHELL", "pg_isready -U meta_user -d meta_insights"]
|
||||
interval: 10s
|
||||
timeout: 5s
|
||||
retries: 5
|
||||
restart: unless-stopped
|
||||
|
||||
# # Optional: Grafana for visualization
|
||||
# grafana:
|
||||
# image: grafana/grafana:latest
|
||||
# container_name: meta_grafana
|
||||
# ports:
|
||||
# - "3000:3000"
|
||||
# environment:
|
||||
# GF_SECURITY_ADMIN_USER: admin
|
||||
# GF_SECURITY_ADMIN_PASSWORD: admin
|
||||
# GF_INSTALL_PLUGINS: grafana-clock-panel
|
||||
# volumes:
|
||||
# - grafana_data:/var/lib/grafana
|
||||
# depends_on:
|
||||
# timescaledb:
|
||||
# condition: service_healthy
|
||||
# restart: unless-stopped
|
||||
|
||||
volumes:
|
||||
timescale_data:
|
||||
grafana_data:
|
||||
@@ -1,7 +1,7 @@
|
||||
[project]
|
||||
name = "meta-api-grabber"
|
||||
version = "0.1.0"
|
||||
description = "Add your description here"
|
||||
description = "Meta Marketing API data grabber with TimescaleDB storage"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.13"
|
||||
dependencies = [
|
||||
@@ -13,3 +13,14 @@ dependencies = [
|
||||
"requests-oauthlib>=2.0.0",
|
||||
"sqlalchemy[asyncio]>=2.0.44",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
meta-auth = "meta_api_grabber.auth:main"
|
||||
meta-scheduled = "meta_api_grabber.scheduled_grabber:main"
|
||||
meta-insights = "meta_api_grabber.insights_grabber:main"
|
||||
meta-test-accounts = "meta_api_grabber.test_ad_accounts:main"
|
||||
meta-token = "meta_api_grabber.token_manager:main"
|
||||
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
@@ -61,6 +61,65 @@ class TimescaleDBClient:
|
||||
"""Context manager exit."""
|
||||
await self.close()
|
||||
|
||||
async def initialize_schema(self, schema_path: Optional[str] = None):
|
||||
"""
|
||||
Initialize database schema from SQL file.
|
||||
|
||||
Args:
|
||||
schema_path: Path to SQL schema file. If not provided, uses default location.
|
||||
"""
|
||||
if not schema_path:
|
||||
# Default to schema file in same directory
|
||||
import pathlib
|
||||
schema_path = pathlib.Path(__file__).parent / "db_schema.sql"
|
||||
|
||||
print(f"Initializing database schema from {schema_path}...")
|
||||
|
||||
# Read schema file
|
||||
with open(schema_path, 'r') as f:
|
||||
schema_sql = f.read()
|
||||
|
||||
# Execute schema statement by statement for better error handling
|
||||
async with self.pool.acquire() as conn:
|
||||
statements = [s.strip() for s in schema_sql.split(';') if s.strip()]
|
||||
errors = []
|
||||
compression_warnings = []
|
||||
|
||||
for i, stmt in enumerate(statements, 1):
|
||||
if not stmt:
|
||||
continue
|
||||
|
||||
try:
|
||||
await conn.execute(stmt)
|
||||
except Exception as stmt_error:
|
||||
error_msg = str(stmt_error).lower()
|
||||
|
||||
# Categorize errors
|
||||
if "already exists" in error_msg:
|
||||
# Silently ignore "already exists" - this is expected on re-runs
|
||||
continue
|
||||
elif "columnstore not enabled" in error_msg:
|
||||
# Track compression warnings separately
|
||||
compression_warnings.append(i)
|
||||
elif "'nonetype' object has no attribute 'decode'" in error_msg:
|
||||
# Silently ignore decode errors (usually comments/extensions)
|
||||
continue
|
||||
else:
|
||||
# Real errors
|
||||
errors.append((i, stmt_error))
|
||||
|
||||
# Report results
|
||||
if errors:
|
||||
print(f"⚠️ {len(errors)} error(s) during schema initialization:")
|
||||
for stmt_num, error in errors:
|
||||
print(f" Statement {stmt_num}: {error}")
|
||||
|
||||
if compression_warnings:
|
||||
print("ℹ️ Note: Data compression not available (TimescaleDB columnstore not enabled)")
|
||||
print(" This is optional - your database will work fine without it.")
|
||||
|
||||
print("✓ Database schema initialized successfully")
|
||||
|
||||
# ========================================================================
|
||||
# METADATA CACHING (Ad Accounts, Campaigns, Ad Sets)
|
||||
# ========================================================================
|
||||
|
||||
@@ -418,6 +418,9 @@ class ScheduledInsightsGrabber:
|
||||
self.db = TimescaleDBClient()
|
||||
await self.db.connect()
|
||||
|
||||
# Initialize database schema (idempotent - safe to run multiple times)
|
||||
await self.db.initialize_schema()
|
||||
|
||||
cycle_count = 0
|
||||
|
||||
try:
|
||||
@@ -446,8 +449,8 @@ class ScheduledInsightsGrabber:
|
||||
await self.db.close()
|
||||
|
||||
|
||||
async def main():
|
||||
"""Main entry point for scheduled grabber."""
|
||||
async def async_main():
|
||||
"""Async main entry point for scheduled grabber."""
|
||||
try:
|
||||
grabber = ScheduledInsightsGrabber()
|
||||
|
||||
@@ -469,7 +472,12 @@ async def main():
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
def main():
|
||||
"""Sync wrapper for entry point."""
|
||||
from datetime import timedelta
|
||||
exit_code = asyncio.run(main())
|
||||
exit_code = asyncio.run(async_main())
|
||||
exit(exit_code)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
124
src/meta_api_grabber/test_ad_accounts.py
Normal file
124
src/meta_api_grabber/test_ad_accounts.py
Normal file
@@ -0,0 +1,124 @@
|
||||
"""
|
||||
Simple test script to initialize database and grab ad_accounts metadata.
|
||||
This is useful for testing the database setup and verifying ad account access.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
from facebook_business.adobjects.adaccount import AdAccount
|
||||
from facebook_business.api import FacebookAdsApi
|
||||
|
||||
from meta_api_grabber.database import TimescaleDBClient
|
||||
|
||||
|
||||
async def test_ad_accounts():
|
||||
"""Test database initialization and ad account metadata collection."""
|
||||
load_dotenv()
|
||||
|
||||
# Get credentials from environment
|
||||
access_token = os.getenv("META_ACCESS_TOKEN")
|
||||
app_secret = os.getenv("META_APP_SECRET")
|
||||
app_id = os.getenv("META_APP_ID")
|
||||
ad_account_id = os.getenv("META_AD_ACCOUNT_ID")
|
||||
|
||||
if not all([access_token, app_secret, app_id, ad_account_id]):
|
||||
print("❌ Missing required environment variables")
|
||||
print(" Please ensure META_ACCESS_TOKEN, META_APP_SECRET, META_APP_ID,")
|
||||
print(" and META_AD_ACCOUNT_ID are set in .env")
|
||||
return 1
|
||||
|
||||
print("="*60)
|
||||
print("AD ACCOUNT TEST")
|
||||
print("="*60)
|
||||
print(f"Account ID: {ad_account_id}")
|
||||
print()
|
||||
|
||||
# Initialize Facebook Ads API
|
||||
FacebookAdsApi.init(
|
||||
app_id=app_id,
|
||||
app_secret=app_secret,
|
||||
access_token=access_token,
|
||||
)
|
||||
|
||||
# Connect to database
|
||||
print("Connecting to database...")
|
||||
db = TimescaleDBClient()
|
||||
await db.connect()
|
||||
|
||||
try:
|
||||
# Initialize schema
|
||||
print("\nInitializing database schema...")
|
||||
await db.initialize_schema()
|
||||
|
||||
# Get ad account details from Meta API
|
||||
print(f"\nFetching ad account details from Meta API...")
|
||||
ad_account = AdAccount(ad_account_id)
|
||||
account_fields = ['name', 'currency', 'timezone_name', 'account_status']
|
||||
|
||||
account_data = ad_account.api_get(fields=account_fields)
|
||||
|
||||
print("\nAd Account Details:")
|
||||
print(f" ID: {ad_account_id}")
|
||||
print(f" Name: {account_data.get('name', 'N/A')}")
|
||||
print(f" Currency: {account_data.get('currency', 'N/A')}")
|
||||
print(f" Timezone: {account_data.get('timezone_name', 'N/A')}")
|
||||
print(f" Status: {account_data.get('account_status', 'N/A')}")
|
||||
|
||||
# Store in database
|
||||
print("\nStoring ad account in database...")
|
||||
await db.upsert_ad_account(
|
||||
account_id=ad_account_id,
|
||||
account_name=account_data.get('name'),
|
||||
currency=account_data.get('currency'),
|
||||
timezone_name=account_data.get('timezone_name'),
|
||||
)
|
||||
|
||||
print("✓ Ad account successfully stored in database")
|
||||
|
||||
# Verify by querying the database
|
||||
print("\nVerifying database storage...")
|
||||
async with db.pool.acquire() as conn:
|
||||
row = await conn.fetchrow(
|
||||
"SELECT * FROM ad_accounts WHERE account_id = $1",
|
||||
ad_account_id
|
||||
)
|
||||
if row:
|
||||
print("✓ Ad account found in database:")
|
||||
print(f" Account ID: {row['account_id']}")
|
||||
print(f" Account Name: {row['account_name']}")
|
||||
print(f" Currency: {row['currency']}")
|
||||
print(f" Timezone: {row['timezone_name']}")
|
||||
print(f" Created At: {row['created_at']}")
|
||||
print(f" Updated At: {row['updated_at']}")
|
||||
else:
|
||||
print("❌ Ad account not found in database")
|
||||
|
||||
print("\n" + "="*60)
|
||||
print("TEST COMPLETED SUCCESSFULLY")
|
||||
print("="*60)
|
||||
print("\nNext steps:")
|
||||
print("1. Check your database with: docker exec -it meta_api_grabber-timescaledb-1 psql -U meta_user -d meta_insights")
|
||||
print("2. Query ad accounts: SELECT * FROM ad_accounts;")
|
||||
print("3. Run full scheduled grabber: uv run python src/meta_api_grabber/scheduled_grabber.py")
|
||||
|
||||
except Exception as e:
|
||||
print(f"\n❌ Error: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
return 1
|
||||
|
||||
finally:
|
||||
await db.close()
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
def main():
|
||||
"""Entry point for the test script."""
|
||||
exit_code = asyncio.run(test_ad_accounts())
|
||||
exit(exit_code)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user