Files
meta_api_grabber/explore_schemas.py

111 lines
3.5 KiB
Python

#!/usr/bin/env python3
"""
Script to explore available tables and their structures in meta, google, and alpinebits schemas.
Saves output to schema_info.txt file.
"""
import os
import asyncio
import asyncpg
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
DB_URL = os.getenv("DATABASE_URL")
OUTPUT_FILE = "schema_info.txt"
async def get_connection():
"""Create database connection"""
return await asyncpg.connect(DB_URL)
async def get_schema_info(schema_name, output_file):
"""Get all tables and their columns for a schema"""
conn = await get_connection()
header = f"\n{'='*80}\nSCHEMA: {schema_name}\n{'='*80}\n"
print(header, end="")
output_file.write(header)
# Get all tables in schema
query = """
SELECT table_name
FROM information_schema.tables
WHERE table_schema = $1
ORDER BY table_name
"""
tables = await conn.fetch(query, schema_name)
if not tables:
msg = f"No tables found in schema '{schema_name}'\n"
print(msg, end="")
output_file.write(msg)
await conn.close()
return
for row in tables:
table_name = row['table_name']
table_header = f"\nTable: {table_name}\n" + "-" * 80 + "\n"
print(table_header, end="")
output_file.write(table_header)
# Get columns for this table
col_query = """
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_schema = $1 AND table_name = $2
ORDER BY ordinal_position
"""
columns = await conn.fetch(col_query, schema_name, table_name)
for col in columns:
null_str = "NULL" if col['is_nullable'] == "YES" else "NOT NULL"
col_line = f" - {col['column_name']}: {col['data_type']} ({null_str})\n"
print(col_line, end="")
output_file.write(col_line)
# Get row count
try:
count_query = f"SELECT COUNT(*) FROM {schema_name}.{table_name}"
count = await conn.fetchval(count_query)
count_line = f" Rows: {count}\n"
print(count_line, end="")
output_file.write(count_line)
except Exception as e:
count_line = f" Rows: (unable to count)\n"
print(count_line, end="")
output_file.write(count_line)
# Show sample data
try:
sample_query = f"SELECT * FROM {schema_name}.{table_name} LIMIT 1"
sample = await conn.fetchrow(sample_query)
if sample:
sample_line = f" Sample: {dict(sample)}\n"
print(sample_line, end="")
output_file.write(sample_line)
except Exception as e:
sample_line = f" Sample: (unable to fetch)\n"
print(sample_line, end="")
output_file.write(sample_line)
await conn.close()
async def main():
schemas = ["meta", "google", "alpinebits"]
with open(OUTPUT_FILE, "w") as output_file:
for schema in schemas:
try:
await get_schema_info(schema, output_file)
except Exception as e:
error_msg = f"\nError accessing schema '{schema}': {e}\n"
print(error_msg, end="")
output_file.write(error_msg)
completion_msg = f"\n{'='*80}\nExploration complete! Output saved to {OUTPUT_FILE}\n"
print(completion_msg, end="")
output_file.write(completion_msg)
if __name__ == "__main__":
asyncio.run(main())