""" Integration test that validates all fields requested by grab_* methods exist in the database schema. This test: 1. Parses the SQL schema file (db_schema.sql) to extract actual table columns 2. Reads scheduled_grabber.py to find which methods call which tables 3. Verifies that all requested fields exist in the actual database schema """ import re import pathlib from typing import Dict, Set, List import pytest def parse_sql_schema() -> Dict[str, Set[str]]: """ Parse db_schema.sql to extract table columns. Returns: Dictionary mapping table names to sets of column names """ schema_file = pathlib.Path(__file__).parent.parent / "src" / "meta_api_grabber" / "db_schema.sql" if not schema_file.exists(): raise FileNotFoundError(f"Schema file not found: {schema_file}") with open(schema_file, 'r') as f: content = f.read() tables = {} # Parse CREATE TABLE statements # Pattern: CREATE TABLE IF NOT EXISTS table_name (...) create_table_pattern = r'CREATE TABLE IF NOT EXISTS (\w+)\s*\((.*?)\);' for match in re.finditer(create_table_pattern, content, re.DOTALL): table_name = match.group(1) table_body = match.group(2) # Extract column names (first word before space/comma) # Pattern: column_name TYPE ... column_pattern = r'^\s*(\w+)\s+\w+' columns = set() for line in table_body.split('\n'): line = line.strip() if not line or line.startswith('--') or line.startswith('PRIMARY') or line.startswith('FOREIGN') or line.startswith('CONSTRAINT'): continue col_match = re.match(column_pattern, line) if col_match: columns.add(col_match.group(1)) if columns: tables[table_name] = columns return tables def get_field_name(field_str: str) -> str: """ Extract field name from AdsInsights.Field.xxx notation. Example: 'impressions' from 'AdsInsights.Field.impressions' """ if '.' in field_str: return field_str.split('.')[-1] return field_str def extract_fields_from_grabber_source() -> Dict[str, List[str]]: """ Extract field lists from grab_* methods by reading scheduled_grabber.py source. Returns: Dictionary mapping method names to lists of field names """ grabber_file = pathlib.Path(__file__).parent.parent / "src" / "meta_api_grabber" / "scheduled_grabber.py" if not grabber_file.exists(): raise FileNotFoundError(f"scheduled_grabber.py not found: {grabber_file}") with open(grabber_file, 'r') as f: source = f.read() methods_to_table = { 'grab_account_insights': 'account_insights', 'grab_campaign_insights': 'campaign_insights', 'grab_adset_insights': 'adset_insights', 'grab_campaign_insights_by_country': 'campaign_insights_by_country', } result = {} for method_name in methods_to_table.keys(): # Find the method definition by looking for: async def method_name(...) method_pattern = rf'async def {method_name}\s*\(' method_match = re.search(method_pattern, source) if not method_match: continue # Get the position after the method name pattern start_pos = method_match.end() # Now find where the method body actually starts (after the closing paren and docstring) # Skip to the opening paren open_paren_pos = start_pos - 1 # Count parentheses to find the closing paren of the function signature paren_count = 1 pos = open_paren_pos + 1 while pos < len(source) and paren_count > 0: if source[pos] == '(': paren_count += 1 elif source[pos] == ')': paren_count -= 1 pos += 1 # Now pos is after the closing paren. Find the colon colon_pos = source.find(':', pos) # Skip past any docstring if present after_colon = source[colon_pos + 1:colon_pos + 10].lstrip() if after_colon.startswith('"""') or after_colon.startswith("'''"): quote_type = '"""' if after_colon.startswith('"""') else "'''" docstring_start = source.find(quote_type, colon_pos) docstring_end = source.find(quote_type, docstring_start + 3) + 3 method_body_start = docstring_end else: method_body_start = colon_pos + 1 # Find the next method definition to know where this method ends next_method_pattern = r'async def \w+\s*\(' next_match = re.search(next_method_pattern, source[method_body_start:]) if next_match: method_body_end = method_body_start + next_match.start() else: # Last method - use rest of file method_body_end = len(source) method_body = source[method_body_start:method_body_end] # Extract fields from the method body # Look for: fields = [...] or fields = common_fields + [...] # First check if this method uses common_fields uses_common_fields = 'common_fields' in method_body[:500] if uses_common_fields: # Pattern: fields = common_fields + [...] fields_pattern = r'fields\s*=\s*common_fields\s*\+\s*\[(.*?)\]' fields_match = re.search(fields_pattern, method_body, re.DOTALL) if fields_match: fields_str = fields_match.group(1) # Extract individual field names field_pattern = r'AdsInsights\.Field\.(\w+)' fields = re.findall(field_pattern, fields_str) # Also get common_fields from the module level common_pattern = r'common_fields\s*=\s*\[(.*?)\]' common_match = re.search(common_pattern, source, re.DOTALL) if common_match: common_str = common_match.group(1) common_fields_list = re.findall(field_pattern, common_str) fields = common_fields_list + fields result[method_name] = fields else: # Pattern: fields = [...] # Use bracket matching to find the correct field list fields_keyword_pos = method_body.find('fields =') if fields_keyword_pos != -1: # Find the opening bracket after fields = bracket_pos = method_body.find('[', fields_keyword_pos) if bracket_pos != -1: # Count brackets to find the matching closing bracket bracket_count = 0 end_pos = bracket_pos for i, char in enumerate(method_body[bracket_pos:]): if char == '[': bracket_count += 1 elif char == ']': bracket_count -= 1 if bracket_count == 0: end_pos = bracket_pos + i break fields_str = method_body[bracket_pos + 1:end_pos] field_pattern = r'AdsInsights\.Field\.(\w+)' fields = re.findall(field_pattern, fields_str) result[method_name] = fields return result @pytest.fixture(scope="module") def schema_columns(): """Parse and cache the schema columns.""" return parse_sql_schema() @pytest.fixture(scope="module") def extracted_fields_by_method(): """Extract and cache the fields from each grab_* method.""" return extract_fields_from_grabber_source() # Mapping of method names to their insight table names METHOD_TO_TABLE = { 'grab_account_insights': 'account_insights', 'grab_campaign_insights': 'campaign_insights', 'grab_adset_insights': 'adset_insights', 'grab_campaign_insights_by_country': 'campaign_insights_by_country', } # Fields that are IDs/names stored in metadata tables, not in the insights table METADATA_ONLY_FIELDS = { 'campaign_id', 'campaign_name', 'adset_id', 'adset_name', } class TestFieldSchemaValidation: """Validate that all API field requests have corresponding database columns.""" def test_grab_account_insights_fields(self, schema_columns, extracted_fields_by_method): """Test that grab_account_insights fields exist in schema.""" method_name = 'grab_account_insights' table_name = METHOD_TO_TABLE[method_name] assert method_name in extracted_fields_by_method, f"Could not extract fields from {method_name}" extracted_fields = set(extracted_fields_by_method[method_name]) table_cols = schema_columns.get(table_name, set()) assert table_cols, f"Table {table_name} not found in schema" missing = extracted_fields - table_cols assert not missing, \ f"{table_name} table missing columns: {missing}\n" \ f"Method requests: {sorted(extracted_fields)}\n" \ f"Available: {sorted(table_cols)}" print(f"✓ {method_name} → {table_name}: {len(extracted_fields)} fields validated") def test_grab_campaign_insights_fields(self, schema_columns, extracted_fields_by_method): """Test that grab_campaign_insights fields exist in schema.""" method_name = 'grab_campaign_insights' table_name = METHOD_TO_TABLE[method_name] assert method_name in extracted_fields_by_method, f"Could not extract fields from {method_name}" extracted_fields = set(extracted_fields_by_method[method_name]) table_cols = schema_columns.get(table_name, set()) assert table_cols, f"Table {table_name} not found in schema" # Remove ID/name fields (stored in metadata tables, not insights table) insight_only_fields = extracted_fields - METADATA_ONLY_FIELDS missing = insight_only_fields - table_cols assert not missing, \ f"{table_name} table missing columns: {missing}\n" \ f"Method requests: {sorted(extracted_fields)}\n" \ f"Available: {sorted(table_cols)}" print(f"✓ {method_name} → {table_name}: {len(extracted_fields)} fields validated") def test_grab_adset_insights_fields(self, schema_columns, extracted_fields_by_method): """Test that grab_adset_insights fields exist in schema.""" method_name = 'grab_adset_insights' table_name = METHOD_TO_TABLE[method_name] assert method_name in extracted_fields_by_method, f"Could not extract fields from {method_name}" extracted_fields = set(extracted_fields_by_method[method_name]) table_cols = schema_columns.get(table_name, set()) assert table_cols, f"Table {table_name} not found in schema" # Remove ID/name fields (stored in metadata tables, not insights table) insight_only_fields = extracted_fields - METADATA_ONLY_FIELDS missing = insight_only_fields - table_cols assert not missing, \ f"{table_name} table missing columns: {missing}\n" \ f"Method requests: {sorted(extracted_fields)}\n" \ f"Available: {sorted(table_cols)}" print(f"✓ {method_name} → {table_name}: {len(extracted_fields)} fields validated") def test_grab_campaign_insights_by_country_fields(self, schema_columns, extracted_fields_by_method): """Test that grab_campaign_insights_by_country fields exist in schema.""" method_name = 'grab_campaign_insights_by_country' table_name = METHOD_TO_TABLE[method_name] assert method_name in extracted_fields_by_method, f"Could not extract fields from {method_name}" extracted_fields = set(extracted_fields_by_method[method_name]) table_cols = schema_columns.get(table_name, set()) assert table_cols, f"Table {table_name} not found in schema" # Remove ID/name fields (stored in metadata tables, not insights table) insight_only_fields = extracted_fields - METADATA_ONLY_FIELDS # Country is special - it's part of the breakdown assert "country" in table_cols, \ f"country field missing in {table_name} table\n" \ f"Available: {sorted(table_cols)}" missing = insight_only_fields - table_cols assert not missing, \ f"{table_name} table missing columns: {missing}\n" \ f"Method requests: {sorted(extracted_fields)}\n" \ f"Available: {sorted(table_cols)}" print(f"✓ {method_name} → {table_name}: {len(extracted_fields)} fields validated") def test_all_tables_exist(self, schema_columns): """Test that all required insight tables exist in schema.""" required_tables = { "account_insights", "campaign_insights", "adset_insights", "campaign_insights_by_country", } existing_tables = set(schema_columns.keys()) missing = required_tables - existing_tables assert not missing, \ f"Missing tables: {missing}\n" \ f"Found: {sorted(existing_tables)}" def test_schema_documentation(self, schema_columns): """Print out the parsed schema for verification.""" print("\n" + "="*80) print("PARSED DATABASE SCHEMA") print("="*80) for table_name in sorted(schema_columns.keys()): columns = sorted(schema_columns[table_name]) print(f"\nTable: {table_name}") print(f"Columns ({len(columns)}): {', '.join(columns)}") def test_extracted_fields_documentation(self, extracted_fields_by_method): """Print out extracted fields from each method.""" print("\n" + "="*80) print("EXTRACTED FIELDS FROM GRAB METHODS") print("="*80) for method_name, fields in sorted(extracted_fields_by_method.items()): print(f"\n{method_name}:") print(f" Fields ({len(fields)}): {', '.join(sorted(set(fields)))}") if __name__ == "__main__": pytest.main([__file__, "-v"])