import os import secrets from typing import Optional from fastapi import HTTPException, Security, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials import hashlib import hmac from datetime import datetime, timedelta import logging from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() logger = logging.getLogger(__name__) # Security scheme security = HTTPBearer() # API Keys - In production, store these in environment variables or a secure database API_KEYS = { # Example API keys - replace with your own secure keys "wix-webhook-key": "sk_live_your_secure_api_key_here", "admin-key": "sk_admin_your_admin_key_here", } # Load API keys from environment if available if os.getenv("WIX_API_KEY"): API_KEYS["wix-webhook-key"] = os.getenv("WIX_API_KEY") if os.getenv("ADMIN_API_KEY"): API_KEYS["admin-key"] = os.getenv("ADMIN_API_KEY") def generate_api_key() -> str: """Generate a secure API key""" return f"sk_live_{secrets.token_urlsafe(32)}" def validate_api_key( credentials: HTTPAuthorizationCredentials = Security(security), ) -> str: """ Validate API key from Authorization header. Expected format: Authorization: Bearer your_api_key_here """ token = credentials.credentials # Check if the token is in our valid API keys for key_name, valid_key in API_KEYS.items(): if secrets.compare_digest(token, valid_key): logger.info(f"Valid API key used: {key_name}") return key_name logger.warning(f"Invalid API key attempted: {token[:10]}...") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key", headers={"WWW-Authenticate": "Bearer"}, ) def validate_wix_signature(payload: bytes, signature: str, secret: str) -> bool: """ Validate Wix webhook signature for additional security. Wix signs their webhooks with HMAC-SHA256. """ if not signature or not secret: return False try: # Remove 'sha256=' prefix if present if signature.startswith("sha256="): signature = signature[7:] # Calculate expected signature expected_signature = hmac.new( secret.encode("utf-8"), payload, hashlib.sha256 ).hexdigest() # Compare signatures securely return secrets.compare_digest(signature, expected_signature) except Exception as e: logger.error(f"Error validating signature: {e}") return False class APIKeyAuth: """Simple API key authentication class""" def __init__(self, api_keys: dict): self.api_keys = api_keys def authenticate(self, api_key: str) -> Optional[str]: """Authenticate an API key and return the key name if valid""" for key_name, valid_key in self.api_keys.items(): if secrets.compare_digest(api_key, valid_key): return key_name return None def add_key(self, name: str, key: str): """Add a new API key""" self.api_keys[name] = key def remove_key(self, name: str): """Remove an API key""" if name in self.api_keys: del self.api_keys[name] # Initialize auth system auth_system = APIKeyAuth(API_KEYS)