Fix field schema validation test and update database schema
- Fixed field extraction logic in test_field_schema_validation.py to properly parse methods with docstrings - Previous regex was too greedy and matched across multiple method definitions - Now uses proper parenthesis and docstring matching to isolate method bodies - Correctly handles both 'fields = [...]' and 'fields = common_fields + [...]' patterns - Updated db_schema.sql to include missing columns: - campaign_insights: added frequency, cpp, cost_per_action_type columns - adset_insights: added account_currency column - campaign_insights_by_country: added frequency, cpp, cost_per_action_type columns - All field schema validation tests now pass - Test dynamically extracts fields from scheduled_grabber.py source code - Compares against actual database schema from db_schema.sql - Properly filters metadata-only fields (campaign_id, campaign_name, etc.) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -1,244 +1,359 @@
|
||||
"""
|
||||
Test that validates all fields requested by grab_* methods exist in the database schema.
|
||||
Integration test that validates all fields requested by grab_* methods exist in the database schema.
|
||||
|
||||
This test ensures that whenever new fields are added to the Meta API field lists,
|
||||
the corresponding database columns exist. It catches schema mismatches early.
|
||||
This test:
|
||||
1. Parses the SQL schema file (db_schema.sql) to extract actual table columns
|
||||
2. Reads scheduled_grabber.py to find which methods call which tables
|
||||
3. Verifies that all requested fields exist in the actual database schema
|
||||
"""
|
||||
|
||||
import re
|
||||
import pathlib
|
||||
from typing import Dict, Set, List
|
||||
|
||||
import pytest
|
||||
from facebook_business.adobjects.adsinsights import AdsInsights
|
||||
|
||||
|
||||
# Database schema field mappings
|
||||
# Maps API field names to database column names
|
||||
FIELD_MAPPINGS = {
|
||||
# Core metrics
|
||||
"impressions": "impressions",
|
||||
"clicks": "clicks",
|
||||
"spend": "spend",
|
||||
"reach": "reach",
|
||||
"frequency": "frequency",
|
||||
def parse_sql_schema() -> Dict[str, Set[str]]:
|
||||
"""
|
||||
Parse db_schema.sql to extract table columns.
|
||||
|
||||
# Calculated metrics
|
||||
"ctr": "ctr",
|
||||
"cpc": "cpc",
|
||||
"cpm": "cpm",
|
||||
"cpp": "cpp",
|
||||
Returns:
|
||||
Dictionary mapping table names to sets of column names
|
||||
"""
|
||||
schema_file = pathlib.Path(__file__).parent.parent / "src" / "meta_api_grabber" / "db_schema.sql"
|
||||
|
||||
# Actions and costs
|
||||
"actions": "actions",
|
||||
"cost_per_action_type": "cost_per_action_type",
|
||||
if not schema_file.exists():
|
||||
raise FileNotFoundError(f"Schema file not found: {schema_file}")
|
||||
|
||||
# Date/time fields
|
||||
"date_start": "date_start",
|
||||
"date_stop": "date_stop",
|
||||
with open(schema_file, 'r') as f:
|
||||
content = f.read()
|
||||
|
||||
# ID fields (not stored in insights tables, but referenced)
|
||||
"campaign_id": "referenced_in_campaigns",
|
||||
"campaign_name": "referenced_in_campaigns",
|
||||
"adset_id": "referenced_in_adsets",
|
||||
"adset_name": "referenced_in_adsets",
|
||||
"country": "country",
|
||||
}
|
||||
tables = {}
|
||||
|
||||
# Table schemas
|
||||
TABLE_SCHEMAS = {
|
||||
"account_insights": {
|
||||
"impressions", "clicks", "spend", "reach", "frequency",
|
||||
"ctr", "cpc", "cpm", "cpp", "actions", "cost_per_action_type",
|
||||
"date_start", "date_stop", "date_preset", "fetched_at"
|
||||
},
|
||||
"campaign_insights": {
|
||||
"impressions", "clicks", "spend", "reach",
|
||||
"ctr", "cpc", "cpm", "actions",
|
||||
"date_start", "date_stop", "date_preset", "fetched_at",
|
||||
"campaign_id", "account_id"
|
||||
},
|
||||
"adset_insights": {
|
||||
"impressions", "clicks", "spend", "reach",
|
||||
"ctr", "cpc", "cpm", "actions",
|
||||
"date_start", "date_stop", "date_preset", "fetched_at",
|
||||
"adset_id", "campaign_id", "account_id"
|
||||
},
|
||||
"campaign_insights_by_country": {
|
||||
"impressions", "clicks", "spend", "reach",
|
||||
"ctr", "cpc", "cpm", "actions",
|
||||
"date_start", "date_stop", "date_preset", "fetched_at",
|
||||
"campaign_id", "account_id", "country"
|
||||
# Parse CREATE TABLE statements
|
||||
# Pattern: CREATE TABLE IF NOT EXISTS table_name (...)
|
||||
create_table_pattern = r'CREATE TABLE IF NOT EXISTS (\w+)\s*\((.*?)\);'
|
||||
|
||||
for match in re.finditer(create_table_pattern, content, re.DOTALL):
|
||||
table_name = match.group(1)
|
||||
table_body = match.group(2)
|
||||
|
||||
# Extract column names (first word before space/comma)
|
||||
# Pattern: column_name TYPE ...
|
||||
column_pattern = r'^\s*(\w+)\s+\w+'
|
||||
columns = set()
|
||||
|
||||
for line in table_body.split('\n'):
|
||||
line = line.strip()
|
||||
if not line or line.startswith('--') or line.startswith('PRIMARY') or line.startswith('FOREIGN') or line.startswith('CONSTRAINT'):
|
||||
continue
|
||||
|
||||
col_match = re.match(column_pattern, line)
|
||||
if col_match:
|
||||
columns.add(col_match.group(1))
|
||||
|
||||
if columns:
|
||||
tables[table_name] = columns
|
||||
|
||||
return tables
|
||||
|
||||
|
||||
def get_field_name(field_str: str) -> str:
|
||||
"""
|
||||
Extract field name from AdsInsights.Field.xxx notation.
|
||||
|
||||
Example: 'impressions' from 'AdsInsights.Field.impressions'
|
||||
"""
|
||||
if '.' in field_str:
|
||||
return field_str.split('.')[-1]
|
||||
return field_str
|
||||
|
||||
|
||||
def extract_fields_from_grabber_source() -> Dict[str, List[str]]:
|
||||
"""
|
||||
Extract field lists from grab_* methods by reading scheduled_grabber.py source.
|
||||
|
||||
Returns:
|
||||
Dictionary mapping method names to lists of field names
|
||||
"""
|
||||
grabber_file = pathlib.Path(__file__).parent.parent / "src" / "meta_api_grabber" / "scheduled_grabber.py"
|
||||
|
||||
if not grabber_file.exists():
|
||||
raise FileNotFoundError(f"scheduled_grabber.py not found: {grabber_file}")
|
||||
|
||||
with open(grabber_file, 'r') as f:
|
||||
source = f.read()
|
||||
|
||||
methods_to_table = {
|
||||
'grab_account_insights': 'account_insights',
|
||||
'grab_campaign_insights': 'campaign_insights',
|
||||
'grab_adset_insights': 'adset_insights',
|
||||
'grab_campaign_insights_by_country': 'campaign_insights_by_country',
|
||||
}
|
||||
|
||||
result = {}
|
||||
|
||||
for method_name in methods_to_table.keys():
|
||||
# Find the method definition by looking for: async def method_name(...)
|
||||
method_pattern = rf'async def {method_name}\s*\('
|
||||
method_match = re.search(method_pattern, source)
|
||||
|
||||
if not method_match:
|
||||
continue
|
||||
|
||||
# Get the position after the method name pattern
|
||||
start_pos = method_match.end()
|
||||
|
||||
# Now find where the method body actually starts (after the closing paren and docstring)
|
||||
# Skip to the opening paren
|
||||
open_paren_pos = start_pos - 1
|
||||
|
||||
# Count parentheses to find the closing paren of the function signature
|
||||
paren_count = 1
|
||||
pos = open_paren_pos + 1
|
||||
while pos < len(source) and paren_count > 0:
|
||||
if source[pos] == '(':
|
||||
paren_count += 1
|
||||
elif source[pos] == ')':
|
||||
paren_count -= 1
|
||||
pos += 1
|
||||
|
||||
# Now pos is after the closing paren. Find the colon
|
||||
colon_pos = source.find(':', pos)
|
||||
|
||||
# Skip past any docstring if present
|
||||
after_colon = source[colon_pos + 1:colon_pos + 10].lstrip()
|
||||
if after_colon.startswith('"""') or after_colon.startswith("'''"):
|
||||
quote_type = '"""' if after_colon.startswith('"""') else "'''"
|
||||
docstring_start = source.find(quote_type, colon_pos)
|
||||
docstring_end = source.find(quote_type, docstring_start + 3) + 3
|
||||
method_body_start = docstring_end
|
||||
else:
|
||||
method_body_start = colon_pos + 1
|
||||
|
||||
# Find the next method definition to know where this method ends
|
||||
next_method_pattern = r'async def \w+\s*\('
|
||||
next_match = re.search(next_method_pattern, source[method_body_start:])
|
||||
|
||||
if next_match:
|
||||
method_body_end = method_body_start + next_match.start()
|
||||
else:
|
||||
# Last method - use rest of file
|
||||
method_body_end = len(source)
|
||||
|
||||
method_body = source[method_body_start:method_body_end]
|
||||
|
||||
# Extract fields from the method body
|
||||
# Look for: fields = [...] or fields = common_fields + [...]
|
||||
|
||||
# First check if this method uses common_fields
|
||||
uses_common_fields = 'common_fields' in method_body[:500]
|
||||
|
||||
if uses_common_fields:
|
||||
# Pattern: fields = common_fields + [...]
|
||||
fields_pattern = r'fields\s*=\s*common_fields\s*\+\s*\[(.*?)\]'
|
||||
fields_match = re.search(fields_pattern, method_body, re.DOTALL)
|
||||
if fields_match:
|
||||
fields_str = fields_match.group(1)
|
||||
# Extract individual field names
|
||||
field_pattern = r'AdsInsights\.Field\.(\w+)'
|
||||
fields = re.findall(field_pattern, fields_str)
|
||||
|
||||
# Also get common_fields from the module level
|
||||
common_pattern = r'common_fields\s*=\s*\[(.*?)\]'
|
||||
common_match = re.search(common_pattern, source, re.DOTALL)
|
||||
if common_match:
|
||||
common_str = common_match.group(1)
|
||||
common_fields_list = re.findall(field_pattern, common_str)
|
||||
fields = common_fields_list + fields
|
||||
|
||||
result[method_name] = fields
|
||||
else:
|
||||
# Pattern: fields = [...]
|
||||
# Use bracket matching to find the correct field list
|
||||
fields_keyword_pos = method_body.find('fields =')
|
||||
|
||||
if fields_keyword_pos != -1:
|
||||
# Find the opening bracket after fields =
|
||||
bracket_pos = method_body.find('[', fields_keyword_pos)
|
||||
if bracket_pos != -1:
|
||||
# Count brackets to find the matching closing bracket
|
||||
bracket_count = 0
|
||||
end_pos = bracket_pos
|
||||
for i, char in enumerate(method_body[bracket_pos:]):
|
||||
if char == '[':
|
||||
bracket_count += 1
|
||||
elif char == ']':
|
||||
bracket_count -= 1
|
||||
if bracket_count == 0:
|
||||
end_pos = bracket_pos + i
|
||||
break
|
||||
|
||||
fields_str = method_body[bracket_pos + 1:end_pos]
|
||||
field_pattern = r'AdsInsights\.Field\.(\w+)'
|
||||
fields = re.findall(field_pattern, fields_str)
|
||||
result[method_name] = fields
|
||||
|
||||
return result
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def schema_columns():
|
||||
"""Parse and cache the schema columns."""
|
||||
return parse_sql_schema()
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def extracted_fields_by_method():
|
||||
"""Extract and cache the fields from each grab_* method."""
|
||||
return extract_fields_from_grabber_source()
|
||||
|
||||
|
||||
# Mapping of method names to their insight table names
|
||||
METHOD_TO_TABLE = {
|
||||
'grab_account_insights': 'account_insights',
|
||||
'grab_campaign_insights': 'campaign_insights',
|
||||
'grab_adset_insights': 'adset_insights',
|
||||
'grab_campaign_insights_by_country': 'campaign_insights_by_country',
|
||||
}
|
||||
|
||||
|
||||
def get_field_value(field_obj) -> str:
|
||||
"""Extract field name from AdsInsights.Field object."""
|
||||
# AdsInsights.Field attributes are simple string values
|
||||
return str(field_obj)
|
||||
# Fields that are IDs/names stored in metadata tables, not in the insights table
|
||||
METADATA_ONLY_FIELDS = {
|
||||
'campaign_id', 'campaign_name',
|
||||
'adset_id', 'adset_name',
|
||||
}
|
||||
|
||||
|
||||
class TestFieldSchemaValidation:
|
||||
"""Validate that all API field requests have corresponding database columns."""
|
||||
|
||||
def test_account_insights_fields(self):
|
||||
"""Test that account insights fields exist in schema."""
|
||||
fields = [
|
||||
AdsInsights.Field.impressions,
|
||||
AdsInsights.Field.clicks,
|
||||
AdsInsights.Field.spend,
|
||||
AdsInsights.Field.cpc,
|
||||
AdsInsights.Field.cpm,
|
||||
AdsInsights.Field.ctr,
|
||||
AdsInsights.Field.cpp,
|
||||
AdsInsights.Field.reach,
|
||||
AdsInsights.Field.frequency,
|
||||
AdsInsights.Field.actions,
|
||||
AdsInsights.Field.cost_per_action_type,
|
||||
AdsInsights.Field.date_start,
|
||||
AdsInsights.Field.date_stop,
|
||||
]
|
||||
def test_grab_account_insights_fields(self, schema_columns, extracted_fields_by_method):
|
||||
"""Test that grab_account_insights fields exist in schema."""
|
||||
method_name = 'grab_account_insights'
|
||||
table_name = METHOD_TO_TABLE[method_name]
|
||||
|
||||
schema_fields = TABLE_SCHEMAS["account_insights"]
|
||||
for field in fields:
|
||||
field_name = get_field_value(field)
|
||||
assert field_name in FIELD_MAPPINGS, f"Field '{field_name}' not in FIELD_MAPPINGS"
|
||||
db_column = FIELD_MAPPINGS[field_name]
|
||||
assert method_name in extracted_fields_by_method, f"Could not extract fields from {method_name}"
|
||||
|
||||
# Skip reference checks for ID fields
|
||||
if "referenced_in" not in db_column:
|
||||
assert db_column in schema_fields, \
|
||||
f"Account insights field '{field_name}' (DB: '{db_column}') not in schema"
|
||||
extracted_fields = set(extracted_fields_by_method[method_name])
|
||||
table_cols = schema_columns.get(table_name, set())
|
||||
assert table_cols, f"Table {table_name} not found in schema"
|
||||
|
||||
def test_campaign_insights_fields(self):
|
||||
"""Test that campaign insights fields exist in schema."""
|
||||
fields = [
|
||||
AdsInsights.Field.campaign_id,
|
||||
AdsInsights.Field.campaign_name,
|
||||
AdsInsights.Field.impressions,
|
||||
AdsInsights.Field.clicks,
|
||||
AdsInsights.Field.spend,
|
||||
AdsInsights.Field.ctr,
|
||||
AdsInsights.Field.cpc,
|
||||
AdsInsights.Field.cpm,
|
||||
AdsInsights.Field.reach,
|
||||
AdsInsights.Field.actions,
|
||||
AdsInsights.Field.date_start,
|
||||
AdsInsights.Field.date_stop,
|
||||
]
|
||||
missing = extracted_fields - table_cols
|
||||
assert not missing, \
|
||||
f"{table_name} table missing columns: {missing}\n" \
|
||||
f"Method requests: {sorted(extracted_fields)}\n" \
|
||||
f"Available: {sorted(table_cols)}"
|
||||
|
||||
schema_fields = TABLE_SCHEMAS["campaign_insights"]
|
||||
for field in fields:
|
||||
field_name = get_field_value(field)
|
||||
assert field_name in FIELD_MAPPINGS, f"Field '{field_name}' not in FIELD_MAPPINGS"
|
||||
db_column = FIELD_MAPPINGS[field_name]
|
||||
print(f"✓ {method_name} → {table_name}: {len(extracted_fields)} fields validated")
|
||||
|
||||
# Skip reference checks for ID/name fields
|
||||
if "referenced_in" not in db_column:
|
||||
assert db_column in schema_fields, \
|
||||
f"Campaign insights field '{field_name}' (DB: '{db_column}') not in schema"
|
||||
def test_grab_campaign_insights_fields(self, schema_columns, extracted_fields_by_method):
|
||||
"""Test that grab_campaign_insights fields exist in schema."""
|
||||
method_name = 'grab_campaign_insights'
|
||||
table_name = METHOD_TO_TABLE[method_name]
|
||||
|
||||
def test_adset_insights_fields(self):
|
||||
"""Test that adset insights fields exist in schema."""
|
||||
fields = [
|
||||
AdsInsights.Field.adset_id,
|
||||
AdsInsights.Field.adset_name,
|
||||
AdsInsights.Field.campaign_id,
|
||||
AdsInsights.Field.impressions,
|
||||
AdsInsights.Field.clicks,
|
||||
AdsInsights.Field.spend,
|
||||
AdsInsights.Field.ctr,
|
||||
AdsInsights.Field.cpc,
|
||||
AdsInsights.Field.cpm,
|
||||
AdsInsights.Field.reach,
|
||||
AdsInsights.Field.actions,
|
||||
AdsInsights.Field.date_start,
|
||||
AdsInsights.Field.date_stop,
|
||||
]
|
||||
assert method_name in extracted_fields_by_method, f"Could not extract fields from {method_name}"
|
||||
|
||||
schema_fields = TABLE_SCHEMAS["adset_insights"]
|
||||
for field in fields:
|
||||
field_name = get_field_value(field)
|
||||
assert field_name in FIELD_MAPPINGS, f"Field '{field_name}' not in FIELD_MAPPINGS"
|
||||
db_column = FIELD_MAPPINGS[field_name]
|
||||
extracted_fields = set(extracted_fields_by_method[method_name])
|
||||
table_cols = schema_columns.get(table_name, set())
|
||||
assert table_cols, f"Table {table_name} not found in schema"
|
||||
|
||||
# Skip reference checks for ID/name fields
|
||||
if "referenced_in" not in db_column:
|
||||
assert db_column in schema_fields, \
|
||||
f"Adset insights field '{field_name}' (DB: '{db_column}') not in schema"
|
||||
# Remove ID/name fields (stored in metadata tables, not insights table)
|
||||
insight_only_fields = extracted_fields - METADATA_ONLY_FIELDS
|
||||
|
||||
def test_campaign_insights_by_country_fields(self):
|
||||
"""Test that campaign insights by country fields exist in schema."""
|
||||
fields = [
|
||||
AdsInsights.Field.campaign_id,
|
||||
AdsInsights.Field.campaign_name,
|
||||
AdsInsights.Field.impressions,
|
||||
AdsInsights.Field.clicks,
|
||||
AdsInsights.Field.spend,
|
||||
AdsInsights.Field.ctr,
|
||||
AdsInsights.Field.cpc,
|
||||
AdsInsights.Field.cpm,
|
||||
AdsInsights.Field.reach,
|
||||
AdsInsights.Field.actions,
|
||||
AdsInsights.Field.date_start,
|
||||
AdsInsights.Field.date_stop,
|
||||
]
|
||||
missing = insight_only_fields - table_cols
|
||||
assert not missing, \
|
||||
f"{table_name} table missing columns: {missing}\n" \
|
||||
f"Method requests: {sorted(extracted_fields)}\n" \
|
||||
f"Available: {sorted(table_cols)}"
|
||||
|
||||
schema_fields = TABLE_SCHEMAS["campaign_insights_by_country"]
|
||||
for field in fields:
|
||||
field_name = get_field_value(field)
|
||||
assert field_name in FIELD_MAPPINGS, f"Field '{field_name}' not in FIELD_MAPPINGS"
|
||||
db_column = FIELD_MAPPINGS[field_name]
|
||||
print(f"✓ {method_name} → {table_name}: {len(extracted_fields)} fields validated")
|
||||
|
||||
# Skip reference checks for ID/name fields
|
||||
if "referenced_in" not in db_column:
|
||||
assert db_column in schema_fields, \
|
||||
f"Campaign by country insights field '{field_name}' (DB: '{db_column}') not in schema"
|
||||
def test_grab_adset_insights_fields(self, schema_columns, extracted_fields_by_method):
|
||||
"""Test that grab_adset_insights fields exist in schema."""
|
||||
method_name = 'grab_adset_insights'
|
||||
table_name = METHOD_TO_TABLE[method_name]
|
||||
|
||||
# Country breakdown field
|
||||
assert "country" in schema_fields, "Country field missing in campaign_insights_by_country schema"
|
||||
assert method_name in extracted_fields_by_method, f"Could not extract fields from {method_name}"
|
||||
|
||||
def test_common_fields_consistency(self):
|
||||
"""Test that common_fields are consistent across all methods."""
|
||||
from meta_api_grabber.scheduled_grabber import common_fields
|
||||
extracted_fields = set(extracted_fields_by_method[method_name])
|
||||
table_cols = schema_columns.get(table_name, set())
|
||||
assert table_cols, f"Table {table_name} not found in schema"
|
||||
|
||||
# Verify common_fields is defined and contains expected metrics
|
||||
expected_metrics = {
|
||||
"impressions", "clicks", "spend", "cpc", "cpm", "ctr", "cpp",
|
||||
"reach", "frequency", "actions", "cost_per_action_type",
|
||||
"date_start", "date_stop"
|
||||
}
|
||||
# Remove ID/name fields (stored in metadata tables, not insights table)
|
||||
insight_only_fields = extracted_fields - METADATA_ONLY_FIELDS
|
||||
|
||||
common_field_names = {get_field_value(f) for f in common_fields}
|
||||
missing = insight_only_fields - table_cols
|
||||
assert not missing, \
|
||||
f"{table_name} table missing columns: {missing}\n" \
|
||||
f"Method requests: {sorted(extracted_fields)}\n" \
|
||||
f"Available: {sorted(table_cols)}"
|
||||
|
||||
for metric in expected_metrics:
|
||||
assert metric in common_field_names, \
|
||||
f"Common metric '{metric}' not found in common_fields"
|
||||
print(f"✓ {method_name} → {table_name}: {len(extracted_fields)} fields validated")
|
||||
|
||||
def test_all_table_schemas_valid(self):
|
||||
"""Test that all table schemas are properly defined."""
|
||||
def test_grab_campaign_insights_by_country_fields(self, schema_columns, extracted_fields_by_method):
|
||||
"""Test that grab_campaign_insights_by_country fields exist in schema."""
|
||||
method_name = 'grab_campaign_insights_by_country'
|
||||
table_name = METHOD_TO_TABLE[method_name]
|
||||
|
||||
assert method_name in extracted_fields_by_method, f"Could not extract fields from {method_name}"
|
||||
|
||||
extracted_fields = set(extracted_fields_by_method[method_name])
|
||||
table_cols = schema_columns.get(table_name, set())
|
||||
assert table_cols, f"Table {table_name} not found in schema"
|
||||
|
||||
# Remove ID/name fields (stored in metadata tables, not insights table)
|
||||
insight_only_fields = extracted_fields - METADATA_ONLY_FIELDS
|
||||
|
||||
# Country is special - it's part of the breakdown
|
||||
assert "country" in table_cols, \
|
||||
f"country field missing in {table_name} table\n" \
|
||||
f"Available: {sorted(table_cols)}"
|
||||
|
||||
missing = insight_only_fields - table_cols
|
||||
assert not missing, \
|
||||
f"{table_name} table missing columns: {missing}\n" \
|
||||
f"Method requests: {sorted(extracted_fields)}\n" \
|
||||
f"Available: {sorted(table_cols)}"
|
||||
|
||||
print(f"✓ {method_name} → {table_name}: {len(extracted_fields)} fields validated")
|
||||
|
||||
def test_all_tables_exist(self, schema_columns):
|
||||
"""Test that all required insight tables exist in schema."""
|
||||
required_tables = {
|
||||
"account_insights",
|
||||
"campaign_insights",
|
||||
"adset_insights",
|
||||
"campaign_insights_by_country"
|
||||
"campaign_insights_by_country",
|
||||
}
|
||||
|
||||
for table in required_tables:
|
||||
assert table in TABLE_SCHEMAS, f"Table '{table}' not defined in TABLE_SCHEMAS"
|
||||
assert len(TABLE_SCHEMAS[table]) > 0, f"Table '{table}' has no fields defined"
|
||||
existing_tables = set(schema_columns.keys())
|
||||
missing = required_tables - existing_tables
|
||||
|
||||
assert not missing, \
|
||||
f"Missing tables: {missing}\n" \
|
||||
f"Found: {sorted(existing_tables)}"
|
||||
|
||||
class TestSchemaDocumentation:
|
||||
"""Document the expected schema structure for reference."""
|
||||
|
||||
def test_schema_documentation(self):
|
||||
"""Print out the schema for verification purposes."""
|
||||
def test_schema_documentation(self, schema_columns):
|
||||
"""Print out the parsed schema for verification."""
|
||||
print("\n" + "="*80)
|
||||
print("DATABASE SCHEMA DOCUMENTATION")
|
||||
print("PARSED DATABASE SCHEMA")
|
||||
print("="*80)
|
||||
|
||||
for table, fields in TABLE_SCHEMAS.items():
|
||||
print(f"\nTable: {table}")
|
||||
print(f"Columns: {sorted(fields)}")
|
||||
print(f"Total columns: {len(fields)}")
|
||||
for table_name in sorted(schema_columns.keys()):
|
||||
columns = sorted(schema_columns[table_name])
|
||||
print(f"\nTable: {table_name}")
|
||||
print(f"Columns ({len(columns)}): {', '.join(columns)}")
|
||||
|
||||
def test_extracted_fields_documentation(self, extracted_fields_by_method):
|
||||
"""Print out extracted fields from each method."""
|
||||
print("\n" + "="*80)
|
||||
print("EXTRACTED FIELDS FROM GRAB METHODS")
|
||||
print("="*80)
|
||||
|
||||
for method_name, fields in sorted(extracted_fields_by_method.items()):
|
||||
print(f"\n{method_name}:")
|
||||
print(f" Fields ({len(fields)}): {', '.join(sorted(set(fields)))}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user