- Fixed field extraction logic in test_field_schema_validation.py to properly parse methods with docstrings - Previous regex was too greedy and matched across multiple method definitions - Now uses proper parenthesis and docstring matching to isolate method bodies - Correctly handles both 'fields = [...]' and 'fields = common_fields + [...]' patterns - Updated db_schema.sql to include missing columns: - campaign_insights: added frequency, cpp, cost_per_action_type columns - adset_insights: added account_currency column - campaign_insights_by_country: added frequency, cpp, cost_per_action_type columns - All field schema validation tests now pass - Test dynamically extracts fields from scheduled_grabber.py source code - Compares against actual database schema from db_schema.sql - Properly filters metadata-only fields (campaign_id, campaign_name, etc.) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
361 lines
14 KiB
Python
361 lines
14 KiB
Python
"""
|
|
Integration test that validates all fields requested by grab_* methods exist in the database schema.
|
|
|
|
This test:
|
|
1. Parses the SQL schema file (db_schema.sql) to extract actual table columns
|
|
2. Reads scheduled_grabber.py to find which methods call which tables
|
|
3. Verifies that all requested fields exist in the actual database schema
|
|
"""
|
|
|
|
import re
|
|
import pathlib
|
|
from typing import Dict, Set, List
|
|
|
|
import pytest
|
|
|
|
|
|
def parse_sql_schema() -> Dict[str, Set[str]]:
|
|
"""
|
|
Parse db_schema.sql to extract table columns.
|
|
|
|
Returns:
|
|
Dictionary mapping table names to sets of column names
|
|
"""
|
|
schema_file = pathlib.Path(__file__).parent.parent / "src" / "meta_api_grabber" / "db_schema.sql"
|
|
|
|
if not schema_file.exists():
|
|
raise FileNotFoundError(f"Schema file not found: {schema_file}")
|
|
|
|
with open(schema_file, 'r') as f:
|
|
content = f.read()
|
|
|
|
tables = {}
|
|
|
|
# Parse CREATE TABLE statements
|
|
# Pattern: CREATE TABLE IF NOT EXISTS table_name (...)
|
|
create_table_pattern = r'CREATE TABLE IF NOT EXISTS (\w+)\s*\((.*?)\);'
|
|
|
|
for match in re.finditer(create_table_pattern, content, re.DOTALL):
|
|
table_name = match.group(1)
|
|
table_body = match.group(2)
|
|
|
|
# Extract column names (first word before space/comma)
|
|
# Pattern: column_name TYPE ...
|
|
column_pattern = r'^\s*(\w+)\s+\w+'
|
|
columns = set()
|
|
|
|
for line in table_body.split('\n'):
|
|
line = line.strip()
|
|
if not line or line.startswith('--') or line.startswith('PRIMARY') or line.startswith('FOREIGN') or line.startswith('CONSTRAINT'):
|
|
continue
|
|
|
|
col_match = re.match(column_pattern, line)
|
|
if col_match:
|
|
columns.add(col_match.group(1))
|
|
|
|
if columns:
|
|
tables[table_name] = columns
|
|
|
|
return tables
|
|
|
|
|
|
def get_field_name(field_str: str) -> str:
|
|
"""
|
|
Extract field name from AdsInsights.Field.xxx notation.
|
|
|
|
Example: 'impressions' from 'AdsInsights.Field.impressions'
|
|
"""
|
|
if '.' in field_str:
|
|
return field_str.split('.')[-1]
|
|
return field_str
|
|
|
|
|
|
def extract_fields_from_grabber_source() -> Dict[str, List[str]]:
|
|
"""
|
|
Extract field lists from grab_* methods by reading scheduled_grabber.py source.
|
|
|
|
Returns:
|
|
Dictionary mapping method names to lists of field names
|
|
"""
|
|
grabber_file = pathlib.Path(__file__).parent.parent / "src" / "meta_api_grabber" / "scheduled_grabber.py"
|
|
|
|
if not grabber_file.exists():
|
|
raise FileNotFoundError(f"scheduled_grabber.py not found: {grabber_file}")
|
|
|
|
with open(grabber_file, 'r') as f:
|
|
source = f.read()
|
|
|
|
methods_to_table = {
|
|
'grab_account_insights': 'account_insights',
|
|
'grab_campaign_insights': 'campaign_insights',
|
|
'grab_adset_insights': 'adset_insights',
|
|
'grab_campaign_insights_by_country': 'campaign_insights_by_country',
|
|
}
|
|
|
|
result = {}
|
|
|
|
for method_name in methods_to_table.keys():
|
|
# Find the method definition by looking for: async def method_name(...)
|
|
method_pattern = rf'async def {method_name}\s*\('
|
|
method_match = re.search(method_pattern, source)
|
|
|
|
if not method_match:
|
|
continue
|
|
|
|
# Get the position after the method name pattern
|
|
start_pos = method_match.end()
|
|
|
|
# Now find where the method body actually starts (after the closing paren and docstring)
|
|
# Skip to the opening paren
|
|
open_paren_pos = start_pos - 1
|
|
|
|
# Count parentheses to find the closing paren of the function signature
|
|
paren_count = 1
|
|
pos = open_paren_pos + 1
|
|
while pos < len(source) and paren_count > 0:
|
|
if source[pos] == '(':
|
|
paren_count += 1
|
|
elif source[pos] == ')':
|
|
paren_count -= 1
|
|
pos += 1
|
|
|
|
# Now pos is after the closing paren. Find the colon
|
|
colon_pos = source.find(':', pos)
|
|
|
|
# Skip past any docstring if present
|
|
after_colon = source[colon_pos + 1:colon_pos + 10].lstrip()
|
|
if after_colon.startswith('"""') or after_colon.startswith("'''"):
|
|
quote_type = '"""' if after_colon.startswith('"""') else "'''"
|
|
docstring_start = source.find(quote_type, colon_pos)
|
|
docstring_end = source.find(quote_type, docstring_start + 3) + 3
|
|
method_body_start = docstring_end
|
|
else:
|
|
method_body_start = colon_pos + 1
|
|
|
|
# Find the next method definition to know where this method ends
|
|
next_method_pattern = r'async def \w+\s*\('
|
|
next_match = re.search(next_method_pattern, source[method_body_start:])
|
|
|
|
if next_match:
|
|
method_body_end = method_body_start + next_match.start()
|
|
else:
|
|
# Last method - use rest of file
|
|
method_body_end = len(source)
|
|
|
|
method_body = source[method_body_start:method_body_end]
|
|
|
|
# Extract fields from the method body
|
|
# Look for: fields = [...] or fields = common_fields + [...]
|
|
|
|
# First check if this method uses common_fields
|
|
uses_common_fields = 'common_fields' in method_body[:500]
|
|
|
|
if uses_common_fields:
|
|
# Pattern: fields = common_fields + [...]
|
|
fields_pattern = r'fields\s*=\s*common_fields\s*\+\s*\[(.*?)\]'
|
|
fields_match = re.search(fields_pattern, method_body, re.DOTALL)
|
|
if fields_match:
|
|
fields_str = fields_match.group(1)
|
|
# Extract individual field names
|
|
field_pattern = r'AdsInsights\.Field\.(\w+)'
|
|
fields = re.findall(field_pattern, fields_str)
|
|
|
|
# Also get common_fields from the module level
|
|
common_pattern = r'common_fields\s*=\s*\[(.*?)\]'
|
|
common_match = re.search(common_pattern, source, re.DOTALL)
|
|
if common_match:
|
|
common_str = common_match.group(1)
|
|
common_fields_list = re.findall(field_pattern, common_str)
|
|
fields = common_fields_list + fields
|
|
|
|
result[method_name] = fields
|
|
else:
|
|
# Pattern: fields = [...]
|
|
# Use bracket matching to find the correct field list
|
|
fields_keyword_pos = method_body.find('fields =')
|
|
|
|
if fields_keyword_pos != -1:
|
|
# Find the opening bracket after fields =
|
|
bracket_pos = method_body.find('[', fields_keyword_pos)
|
|
if bracket_pos != -1:
|
|
# Count brackets to find the matching closing bracket
|
|
bracket_count = 0
|
|
end_pos = bracket_pos
|
|
for i, char in enumerate(method_body[bracket_pos:]):
|
|
if char == '[':
|
|
bracket_count += 1
|
|
elif char == ']':
|
|
bracket_count -= 1
|
|
if bracket_count == 0:
|
|
end_pos = bracket_pos + i
|
|
break
|
|
|
|
fields_str = method_body[bracket_pos + 1:end_pos]
|
|
field_pattern = r'AdsInsights\.Field\.(\w+)'
|
|
fields = re.findall(field_pattern, fields_str)
|
|
result[method_name] = fields
|
|
|
|
return result
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def schema_columns():
|
|
"""Parse and cache the schema columns."""
|
|
return parse_sql_schema()
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def extracted_fields_by_method():
|
|
"""Extract and cache the fields from each grab_* method."""
|
|
return extract_fields_from_grabber_source()
|
|
|
|
|
|
# Mapping of method names to their insight table names
|
|
METHOD_TO_TABLE = {
|
|
'grab_account_insights': 'account_insights',
|
|
'grab_campaign_insights': 'campaign_insights',
|
|
'grab_adset_insights': 'adset_insights',
|
|
'grab_campaign_insights_by_country': 'campaign_insights_by_country',
|
|
}
|
|
|
|
# Fields that are IDs/names stored in metadata tables, not in the insights table
|
|
METADATA_ONLY_FIELDS = {
|
|
'campaign_id', 'campaign_name',
|
|
'adset_id', 'adset_name',
|
|
}
|
|
|
|
|
|
class TestFieldSchemaValidation:
|
|
"""Validate that all API field requests have corresponding database columns."""
|
|
|
|
def test_grab_account_insights_fields(self, schema_columns, extracted_fields_by_method):
|
|
"""Test that grab_account_insights fields exist in schema."""
|
|
method_name = 'grab_account_insights'
|
|
table_name = METHOD_TO_TABLE[method_name]
|
|
|
|
assert method_name in extracted_fields_by_method, f"Could not extract fields from {method_name}"
|
|
|
|
extracted_fields = set(extracted_fields_by_method[method_name])
|
|
table_cols = schema_columns.get(table_name, set())
|
|
assert table_cols, f"Table {table_name} not found in schema"
|
|
|
|
missing = extracted_fields - table_cols
|
|
assert not missing, \
|
|
f"{table_name} table missing columns: {missing}\n" \
|
|
f"Method requests: {sorted(extracted_fields)}\n" \
|
|
f"Available: {sorted(table_cols)}"
|
|
|
|
print(f"✓ {method_name} → {table_name}: {len(extracted_fields)} fields validated")
|
|
|
|
def test_grab_campaign_insights_fields(self, schema_columns, extracted_fields_by_method):
|
|
"""Test that grab_campaign_insights fields exist in schema."""
|
|
method_name = 'grab_campaign_insights'
|
|
table_name = METHOD_TO_TABLE[method_name]
|
|
|
|
assert method_name in extracted_fields_by_method, f"Could not extract fields from {method_name}"
|
|
|
|
extracted_fields = set(extracted_fields_by_method[method_name])
|
|
table_cols = schema_columns.get(table_name, set())
|
|
assert table_cols, f"Table {table_name} not found in schema"
|
|
|
|
# Remove ID/name fields (stored in metadata tables, not insights table)
|
|
insight_only_fields = extracted_fields - METADATA_ONLY_FIELDS
|
|
|
|
missing = insight_only_fields - table_cols
|
|
assert not missing, \
|
|
f"{table_name} table missing columns: {missing}\n" \
|
|
f"Method requests: {sorted(extracted_fields)}\n" \
|
|
f"Available: {sorted(table_cols)}"
|
|
|
|
print(f"✓ {method_name} → {table_name}: {len(extracted_fields)} fields validated")
|
|
|
|
def test_grab_adset_insights_fields(self, schema_columns, extracted_fields_by_method):
|
|
"""Test that grab_adset_insights fields exist in schema."""
|
|
method_name = 'grab_adset_insights'
|
|
table_name = METHOD_TO_TABLE[method_name]
|
|
|
|
assert method_name in extracted_fields_by_method, f"Could not extract fields from {method_name}"
|
|
|
|
extracted_fields = set(extracted_fields_by_method[method_name])
|
|
table_cols = schema_columns.get(table_name, set())
|
|
assert table_cols, f"Table {table_name} not found in schema"
|
|
|
|
# Remove ID/name fields (stored in metadata tables, not insights table)
|
|
insight_only_fields = extracted_fields - METADATA_ONLY_FIELDS
|
|
|
|
missing = insight_only_fields - table_cols
|
|
assert not missing, \
|
|
f"{table_name} table missing columns: {missing}\n" \
|
|
f"Method requests: {sorted(extracted_fields)}\n" \
|
|
f"Available: {sorted(table_cols)}"
|
|
|
|
print(f"✓ {method_name} → {table_name}: {len(extracted_fields)} fields validated")
|
|
|
|
def test_grab_campaign_insights_by_country_fields(self, schema_columns, extracted_fields_by_method):
|
|
"""Test that grab_campaign_insights_by_country fields exist in schema."""
|
|
method_name = 'grab_campaign_insights_by_country'
|
|
table_name = METHOD_TO_TABLE[method_name]
|
|
|
|
assert method_name in extracted_fields_by_method, f"Could not extract fields from {method_name}"
|
|
|
|
extracted_fields = set(extracted_fields_by_method[method_name])
|
|
table_cols = schema_columns.get(table_name, set())
|
|
assert table_cols, f"Table {table_name} not found in schema"
|
|
|
|
# Remove ID/name fields (stored in metadata tables, not insights table)
|
|
insight_only_fields = extracted_fields - METADATA_ONLY_FIELDS
|
|
|
|
# Country is special - it's part of the breakdown
|
|
assert "country" in table_cols, \
|
|
f"country field missing in {table_name} table\n" \
|
|
f"Available: {sorted(table_cols)}"
|
|
|
|
missing = insight_only_fields - table_cols
|
|
assert not missing, \
|
|
f"{table_name} table missing columns: {missing}\n" \
|
|
f"Method requests: {sorted(extracted_fields)}\n" \
|
|
f"Available: {sorted(table_cols)}"
|
|
|
|
print(f"✓ {method_name} → {table_name}: {len(extracted_fields)} fields validated")
|
|
|
|
def test_all_tables_exist(self, schema_columns):
|
|
"""Test that all required insight tables exist in schema."""
|
|
required_tables = {
|
|
"account_insights",
|
|
"campaign_insights",
|
|
"adset_insights",
|
|
"campaign_insights_by_country",
|
|
}
|
|
|
|
existing_tables = set(schema_columns.keys())
|
|
missing = required_tables - existing_tables
|
|
|
|
assert not missing, \
|
|
f"Missing tables: {missing}\n" \
|
|
f"Found: {sorted(existing_tables)}"
|
|
|
|
def test_schema_documentation(self, schema_columns):
|
|
"""Print out the parsed schema for verification."""
|
|
print("\n" + "="*80)
|
|
print("PARSED DATABASE SCHEMA")
|
|
print("="*80)
|
|
|
|
for table_name in sorted(schema_columns.keys()):
|
|
columns = sorted(schema_columns[table_name])
|
|
print(f"\nTable: {table_name}")
|
|
print(f"Columns ({len(columns)}): {', '.join(columns)}")
|
|
|
|
def test_extracted_fields_documentation(self, extracted_fields_by_method):
|
|
"""Print out extracted fields from each method."""
|
|
print("\n" + "="*80)
|
|
print("EXTRACTED FIELDS FROM GRAB METHODS")
|
|
print("="*80)
|
|
|
|
for method_name, fields in sorted(extracted_fields_by_method.items()):
|
|
print(f"\n{method_name}:")
|
|
print(f" Fields ({len(fields)}): {', '.join(sorted(set(fields)))}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v"])
|