Started merging the two projects for simplicity
This commit is contained in:
596
src/alpine_bits_python/api.py
Normal file
596
src/alpine_bits_python/api.py
Normal file
@@ -0,0 +1,596 @@
|
||||
from fastapi import FastAPI, HTTPException, BackgroundTasks, Request, Depends, APIRouter, Form, File, UploadFile
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.security import HTTPBearer, HTTPBasicCredentials, HTTPBasic
|
||||
from fastapi.responses import HTMLResponse, PlainTextResponse, Response
|
||||
from .models import WixFormSubmission
|
||||
from .auth import validate_api_key, validate_wix_signature, generate_api_key
|
||||
from .rate_limit import (
|
||||
limiter,
|
||||
webhook_limiter,
|
||||
custom_rate_limit_handler,
|
||||
DEFAULT_RATE_LIMIT,
|
||||
WEBHOOK_RATE_LIMIT,
|
||||
BURST_RATE_LIMIT
|
||||
)
|
||||
from slowapi.errors import RateLimitExceeded
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import Dict, Any, Optional, List
|
||||
import json
|
||||
import os
|
||||
import gzip
|
||||
import xml.etree.ElementTree as ET
|
||||
from .alpinebits_server import AlpineBitsServer, Version
|
||||
|
||||
# HTTP Basic auth for AlpineBits
|
||||
security_basic = HTTPBasic()
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
app = FastAPI(
|
||||
title="Wix Form Handler API",
|
||||
description="Secure API endpoint to receive and process Wix form submissions with authentication and rate limiting",
|
||||
version="1.0.0"
|
||||
)
|
||||
|
||||
# Create API router with /api prefix
|
||||
api_router = APIRouter(prefix="/api", tags=["api"])
|
||||
|
||||
# Add rate limiting
|
||||
app.state.limiter = limiter
|
||||
app.add_exception_handler(RateLimitExceeded, custom_rate_limit_handler)
|
||||
|
||||
# Add CORS middleware to allow requests from Wix
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=[
|
||||
"https://*.wix.com",
|
||||
"https://*.wixstatic.com",
|
||||
"http://localhost:3000", # For development
|
||||
"http://localhost:8000" # For local testing
|
||||
],
|
||||
allow_credentials=True,
|
||||
allow_methods=["GET", "POST"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
|
||||
async def process_form_submission(submission_data: Dict[str, Any]) -> None:
|
||||
"""
|
||||
Background task to process the form submission.
|
||||
Add your business logic here.
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Processing form submission: {submission_data.get('submissionId')}")
|
||||
|
||||
# Example processing - you can replace this with your actual logic
|
||||
form_name = submission_data.get('formName')
|
||||
contact_email = submission_data.get('contact', {}).get('email') if submission_data.get('contact') else None
|
||||
|
||||
# Extract form fields
|
||||
form_fields = {k: v for k, v in submission_data.items() if k.startswith('field:')}
|
||||
|
||||
logger.info(f"Form: {form_name}, Contact: {contact_email}, Fields: {len(form_fields)}")
|
||||
|
||||
# Here you could:
|
||||
# - Save to database
|
||||
# - Send emails
|
||||
# - Call external APIs
|
||||
# - Process the data further
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing form submission: {str(e)}")
|
||||
|
||||
|
||||
@api_router.get("/")
|
||||
@limiter.limit(DEFAULT_RATE_LIMIT)
|
||||
async def root(request: Request):
|
||||
"""Health check endpoint"""
|
||||
return {
|
||||
"message": "Wix Form Handler API is running",
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"status": "healthy",
|
||||
"authentication": "required",
|
||||
"rate_limits": {
|
||||
"default": DEFAULT_RATE_LIMIT,
|
||||
"webhook": WEBHOOK_RATE_LIMIT,
|
||||
"burst": BURST_RATE_LIMIT
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@api_router.get("/health")
|
||||
@limiter.limit(DEFAULT_RATE_LIMIT)
|
||||
async def health_check(request: Request):
|
||||
"""Detailed health check"""
|
||||
return {
|
||||
"status": "healthy",
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"service": "wix-form-handler",
|
||||
"version": "1.0.0",
|
||||
"authentication": "enabled",
|
||||
"rate_limiting": "enabled"
|
||||
}
|
||||
|
||||
|
||||
@api_router.post("/webhook/wix-form")
|
||||
@webhook_limiter.limit(WEBHOOK_RATE_LIMIT)
|
||||
async def receive_wix_form(
|
||||
request: Request,
|
||||
submission: WixFormSubmission,
|
||||
background_tasks: BackgroundTasks,
|
||||
api_key: str = Depends(validate_api_key)
|
||||
):
|
||||
"""
|
||||
Secure endpoint to receive Wix form submissions via webhook.
|
||||
|
||||
Requires:
|
||||
- Valid API key in Authorization header: Authorization: Bearer your_api_key
|
||||
- Rate limited to prevent abuse
|
||||
- Optional: Wix signature validation (configure WIX_WEBHOOK_SECRET env var)
|
||||
|
||||
This endpoint accepts POST requests with Wix form data and processes them asynchronously.
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Received form submission: {submission.submissionId} (API key: {api_key})")
|
||||
|
||||
# Optional: Validate Wix webhook signature for extra security
|
||||
wix_secret = os.getenv("WIX_WEBHOOK_SECRET")
|
||||
if wix_secret:
|
||||
signature = request.headers.get("X-Wix-Webhook-Signature", "")
|
||||
body = await request.body()
|
||||
|
||||
if not validate_wix_signature(body, signature, wix_secret):
|
||||
logger.warning("Invalid Wix webhook signature")
|
||||
raise HTTPException(
|
||||
status_code=401,
|
||||
detail="Invalid webhook signature"
|
||||
)
|
||||
|
||||
# Convert to dict for processing
|
||||
submission_dict = submission.dict()
|
||||
|
||||
# Add metadata
|
||||
submission_dict["_metadata"] = {
|
||||
"api_key_used": api_key,
|
||||
"received_at": datetime.now().isoformat(),
|
||||
"client_ip": request.client.host if request.client else "unknown"
|
||||
}
|
||||
|
||||
# Add background task for processing
|
||||
background_tasks.add_task(process_form_submission, submission_dict)
|
||||
|
||||
# Return immediate response to Wix
|
||||
return {
|
||||
"status": "received",
|
||||
"submissionId": submission.submissionId,
|
||||
"message": "Form submission received and is being processed",
|
||||
"timestamp": datetime.now().isoformat()
|
||||
}
|
||||
|
||||
except HTTPException:
|
||||
# Re-raise HTTP exceptions (auth errors, etc.)
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Error receiving form submission: {str(e)}")
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Error processing form submission: {str(e)}"
|
||||
)
|
||||
|
||||
|
||||
@api_router.post("/webhook/wix-form/test")
|
||||
@limiter.limit(DEFAULT_RATE_LIMIT)
|
||||
async def test_endpoint(
|
||||
request: Request,
|
||||
data: Dict[str, Any]
|
||||
):
|
||||
"""
|
||||
Test endpoint to verify the API is working with raw JSON data.
|
||||
Useful for testing without strict validation.
|
||||
No authentication required for testing purposes.
|
||||
"""
|
||||
try:
|
||||
timestamp = datetime.now().isoformat()
|
||||
|
||||
# Debug: Check current user context
|
||||
import pwd
|
||||
import grp
|
||||
|
||||
current_uid = os.getuid()
|
||||
current_gid = os.getgid()
|
||||
effective_uid = os.geteuid()
|
||||
effective_gid = os.getegid()
|
||||
|
||||
try:
|
||||
user_name = pwd.getpwuid(current_uid).pw_name
|
||||
group_name = grp.getgrgid(current_gid).gr_name
|
||||
except KeyError:
|
||||
user_name = f"unknown({current_uid})"
|
||||
group_name = f"unknown({current_gid})"
|
||||
|
||||
logger.info(f"Process running as: {user_name}:{group_name} (uid:{current_uid}, gid:{current_gid})")
|
||||
logger.info(f"Effective user: uid:{effective_uid}, gid:{effective_gid}")
|
||||
logger.info(f"Current working directory: {os.getcwd()}")
|
||||
logger.info(f"Directory permissions: {oct(os.stat('.').st_mode)[-3:]}")
|
||||
|
||||
# Log to console
|
||||
logger.info(f"Received test data at {timestamp}")
|
||||
logger.info(f"Data keys: {list(data.keys())}")
|
||||
logger.info(f"Full data: {json.dumps(data, indent=2)}")
|
||||
|
||||
|
||||
|
||||
# Log to file for detailed inspection
|
||||
log_entry = {
|
||||
"timestamp": timestamp,
|
||||
"client_ip": request.client.host if request.client else "unknown",
|
||||
"headers": dict(request.headers),
|
||||
"data": data,
|
||||
"Cors origins": request.headers.get("origin"),
|
||||
"process_info": {
|
||||
"uid": current_uid,
|
||||
"gid": current_gid,
|
||||
"effective_uid": effective_uid,
|
||||
"effective_gid": effective_gid,
|
||||
"user_name": user_name,
|
||||
"group_name": group_name,
|
||||
"cwd": os.getcwd()
|
||||
}
|
||||
}
|
||||
|
||||
# Create logs directory if it doesn't exist with proper permissions
|
||||
logs_dir = "logs"
|
||||
if not os.path.exists(logs_dir):
|
||||
logger.info(f"Creating logs directory as user {user_name} ({current_uid})")
|
||||
os.makedirs(logs_dir, mode=0o755, exist_ok=True)
|
||||
|
||||
# Check what actually got created
|
||||
stat_info = os.stat(logs_dir)
|
||||
logger.info(f"Created directory owner: uid:{stat_info.st_uid}, gid:{stat_info.st_gid}")
|
||||
logger.info(f"Directory mode: {oct(stat_info.st_mode)[-3:]}")
|
||||
|
||||
# Write to file with timestamp
|
||||
log_filename = f"{logs_dir}/wix_test_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
||||
with open(log_filename, "w", encoding="utf-8") as f:
|
||||
json.dump(log_entry, f, indent=2, default=str, ensure_ascii=False)
|
||||
|
||||
# Check file ownership after creation
|
||||
file_stat = os.stat(log_filename)
|
||||
logger.info(f"Created file owner: uid:{file_stat.st_uid}, gid:{file_stat.st_gid}")
|
||||
logger.info(f"File mode: {oct(file_stat.st_mode)[-3:]}")
|
||||
|
||||
logger.info(f"Data logged to: {log_filename}")
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"message": "Test data received successfully",
|
||||
"received_keys": list(data.keys()),
|
||||
"data_logged_to": log_filename,
|
||||
"timestamp": timestamp,
|
||||
"process_info": log_entry["process_info"],
|
||||
"note": "No authentication required for this test endpoint"
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in test endpoint: {str(e)}")
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Error processing test data: {str(e)}"
|
||||
)
|
||||
|
||||
|
||||
@api_router.post("/admin/generate-api-key")
|
||||
@limiter.limit("5/hour") # Very restrictive for admin operations
|
||||
async def generate_new_api_key(
|
||||
request: Request,
|
||||
admin_key: str = Depends(validate_api_key)
|
||||
):
|
||||
"""
|
||||
Admin endpoint to generate new API keys.
|
||||
Requires admin API key and is heavily rate limited.
|
||||
"""
|
||||
if admin_key != "admin-key":
|
||||
raise HTTPException(
|
||||
status_code=403,
|
||||
detail="Admin access required"
|
||||
)
|
||||
|
||||
new_key = generate_api_key()
|
||||
logger.info(f"Generated new API key (requested by: {admin_key})")
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"message": "New API key generated",
|
||||
"api_key": new_key,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"note": "Store this key securely - it won't be shown again"
|
||||
}
|
||||
|
||||
|
||||
async def validate_basic_auth(credentials: HTTPBasicCredentials = Depends(security_basic)) -> str:
|
||||
"""
|
||||
Validate basic authentication for AlpineBits protocol.
|
||||
Returns username if valid, raises HTTPException if not.
|
||||
"""
|
||||
# In production, validate against your user database
|
||||
# For demo purposes, we'll accept any non-empty credentials
|
||||
if not credentials.username or not credentials.password:
|
||||
raise HTTPException(
|
||||
status_code=401,
|
||||
detail="ERROR: Authentication required",
|
||||
headers={"WWW-Authenticate": "Basic"},
|
||||
)
|
||||
|
||||
# In a real implementation, you'd validate these credentials
|
||||
# For now, we'll just return the username
|
||||
logger.info(f"AlpineBits authentication successful for user: {credentials.username}")
|
||||
return credentials.username
|
||||
|
||||
|
||||
def parse_multipart_data(content_type: str, body: bytes) -> Dict[str, Any]:
|
||||
"""
|
||||
Parse multipart/form-data from raw request body.
|
||||
This is a simplified parser for the AlpineBits use case.
|
||||
"""
|
||||
if "multipart/form-data" not in content_type:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="ERROR: Content-Type must be multipart/form-data"
|
||||
)
|
||||
|
||||
# Extract boundary
|
||||
boundary = None
|
||||
for part in content_type.split(";"):
|
||||
part = part.strip()
|
||||
if part.startswith("boundary="):
|
||||
boundary = part.split("=", 1)[1].strip('"')
|
||||
break
|
||||
|
||||
if not boundary:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="ERROR: Missing boundary in multipart/form-data"
|
||||
)
|
||||
|
||||
# Simple multipart parsing
|
||||
parts = body.split(f"--{boundary}".encode())
|
||||
data = {}
|
||||
|
||||
for part in parts:
|
||||
if not part.strip() or part.strip() == b"--":
|
||||
continue
|
||||
|
||||
# Split headers and content
|
||||
if b"\r\n\r\n" in part:
|
||||
headers_section, content = part.split(b"\r\n\r\n", 1)
|
||||
content = content.rstrip(b"\r\n")
|
||||
|
||||
# Parse Content-Disposition header
|
||||
headers = headers_section.decode('utf-8', errors='ignore')
|
||||
name = None
|
||||
for line in headers.split('\n'):
|
||||
if 'Content-Disposition' in line and 'name=' in line:
|
||||
# Extract name parameter
|
||||
for param in line.split(';'):
|
||||
param = param.strip()
|
||||
if param.startswith('name='):
|
||||
name = param.split('=', 1)[1].strip('"')
|
||||
break
|
||||
|
||||
if name:
|
||||
# Handle file uploads or text content
|
||||
if content.startswith(b'<'):
|
||||
# Likely XML content
|
||||
data[name] = content.decode('utf-8', errors='ignore')
|
||||
else:
|
||||
data[name] = content.decode('utf-8', errors='ignore')
|
||||
|
||||
return data
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@api_router.post("/alpinebits/server-2024-10")
|
||||
@limiter.limit("60/minute")
|
||||
async def alpinebits_server_handshake(
|
||||
request: Request,
|
||||
username: str = Depends(validate_basic_auth)
|
||||
):
|
||||
"""
|
||||
AlpineBits server endpoint implementing the handshake protocol.
|
||||
|
||||
This endpoint handles:
|
||||
- Protocol version negotiation via X-AlpineBits-ClientProtocolVersion header
|
||||
- Client identification via X-AlpineBits-ClientID header (optional)
|
||||
- Multipart/form-data parsing for action and request parameters
|
||||
- Gzip compression support
|
||||
- Proper error handling with HTTP status codes
|
||||
- Handshaking action processing
|
||||
|
||||
Authentication: HTTP Basic Auth required
|
||||
Content-Type: multipart/form-data
|
||||
Compression: gzip supported (check X-AlpineBits-Server-Accept-Encoding)
|
||||
"""
|
||||
try:
|
||||
# Check required headers
|
||||
client_protocol_version = request.headers.get("X-AlpineBits-ClientProtocolVersion")
|
||||
if not client_protocol_version:
|
||||
# Server concludes client speaks a protocol version preceding 2013-04
|
||||
client_protocol_version = "pre-2013-04"
|
||||
logger.info("No X-AlpineBits-ClientProtocolVersion header found, assuming pre-2013-04")
|
||||
else:
|
||||
logger.info(f"Client protocol version: {client_protocol_version}")
|
||||
|
||||
# Optional client ID
|
||||
client_id = request.headers.get("X-AlpineBits-ClientID")
|
||||
if client_id:
|
||||
logger.info(f"Client ID: {client_id}")
|
||||
|
||||
# Check content encoding
|
||||
content_encoding = request.headers.get("Content-Encoding")
|
||||
is_compressed = content_encoding == "gzip"
|
||||
|
||||
if is_compressed:
|
||||
logger.info("Request is gzip compressed")
|
||||
|
||||
# Get content type before processing
|
||||
content_type = request.headers.get("Content-Type", "")
|
||||
|
||||
# Get request body
|
||||
body = await request.body()
|
||||
|
||||
# Decompress if needed
|
||||
if is_compressed:
|
||||
try:
|
||||
body = gzip.decompress(body)
|
||||
logger.info("Successfully decompressed gzip content")
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"ERROR: Failed to decompress gzip content: {str(e)}"
|
||||
)
|
||||
|
||||
# Check content type (after decompression)
|
||||
if "multipart/form-data" not in content_type:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="ERROR: Content-Type must be multipart/form-data"
|
||||
)
|
||||
|
||||
# Parse multipart data
|
||||
try:
|
||||
form_data = parse_multipart_data(content_type, body)
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"ERROR: Failed to parse multipart/form-data: {str(e)}"
|
||||
)
|
||||
|
||||
# Check for required action parameter
|
||||
action = form_data.get("action")
|
||||
if not action:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="ERROR: Missing required 'action' parameter")
|
||||
|
||||
logger.info(f"AlpineBits action: {action}")
|
||||
|
||||
|
||||
# Get optional request XML
|
||||
request_xml = form_data.get("request")
|
||||
|
||||
|
||||
server = AlpineBitsServer()
|
||||
|
||||
version = Version.V2024_10
|
||||
|
||||
|
||||
|
||||
# Create successful handshake response
|
||||
response = await server.handle_request(action, request_xml, version)
|
||||
|
||||
response_xml = response.xml_content
|
||||
|
||||
# Set response headers indicating server capabilities
|
||||
headers = {
|
||||
"Content-Type": "application/xml; charset=utf-8",
|
||||
"X-AlpineBits-Server-Accept-Encoding": "gzip", # Indicate gzip support
|
||||
"X-AlpineBits-Server-Version": "2024-10"
|
||||
}
|
||||
|
||||
return Response(
|
||||
content=response_xml,
|
||||
status_code=response.status_code,
|
||||
headers=headers
|
||||
)
|
||||
|
||||
|
||||
|
||||
except HTTPException:
|
||||
# Re-raise HTTP exceptions (auth errors, etc.)
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Error in AlpineBits handshake: {str(e)}")
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Internal server error: {str(e)}"
|
||||
)
|
||||
|
||||
@api_router.get("/admin/stats")
|
||||
@limiter.limit("10/minute")
|
||||
async def get_api_stats(
|
||||
request: Request,
|
||||
admin_key: str = Depends(validate_api_key)
|
||||
):
|
||||
"""
|
||||
Admin endpoint to get API usage statistics.
|
||||
Requires admin API key.
|
||||
"""
|
||||
if admin_key != "admin-key":
|
||||
raise HTTPException(
|
||||
status_code=403,
|
||||
detail="Admin access required"
|
||||
)
|
||||
|
||||
# In a real application, you'd fetch this from your database/monitoring system
|
||||
return {
|
||||
"status": "success",
|
||||
"stats": {
|
||||
"uptime": "Available in production deployment",
|
||||
"total_requests": "Available with monitoring setup",
|
||||
"active_api_keys": len([k for k in ["wix-webhook-key", "admin-key"] if k]),
|
||||
"rate_limit_backend": "redis" if os.getenv("REDIS_URL") else "memory"
|
||||
},
|
||||
"timestamp": datetime.now().isoformat()
|
||||
}
|
||||
|
||||
|
||||
# Include the API router in the main app
|
||||
app.include_router(api_router)
|
||||
|
||||
|
||||
@app.get("/", response_class=HTMLResponse)
|
||||
async def landing_page():
|
||||
"""
|
||||
Serve the under construction landing page at the root route
|
||||
"""
|
||||
try:
|
||||
# Get the path to the HTML file
|
||||
import os
|
||||
html_path = os.path.join(os.path.dirname(__file__), "templates", "index.html")
|
||||
|
||||
with open(html_path, "r", encoding="utf-8") as f:
|
||||
html_content = f.read()
|
||||
|
||||
return HTMLResponse(content=html_content, status_code=200)
|
||||
except FileNotFoundError:
|
||||
# Fallback if HTML file is not found
|
||||
html_content = """
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<title>99tales - Under Construction</title>
|
||||
<style>
|
||||
body { font-family: Arial, sans-serif; text-align: center; padding: 50px; }
|
||||
h1 { color: #333; }
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<h1>🏗️ 99tales</h1>
|
||||
<h2>Under Construction</h2>
|
||||
<p>We're working hard to bring you something amazing!</p>
|
||||
<p><a href="/api">API Documentation</a></p>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
return HTMLResponse(content=html_content, status_code=200)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||||
111
src/alpine_bits_python/auth.py
Normal file
111
src/alpine_bits_python/auth.py
Normal file
@@ -0,0 +1,111 @@
|
||||
import os
|
||||
import secrets
|
||||
from typing import Optional
|
||||
from fastapi import HTTPException, Security, status
|
||||
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
||||
import hashlib
|
||||
import hmac
|
||||
from datetime import datetime, timedelta
|
||||
import logging
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# Load environment variables from .env file
|
||||
load_dotenv()
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Security scheme
|
||||
security = HTTPBearer()
|
||||
|
||||
# API Keys - In production, store these in environment variables or a secure database
|
||||
API_KEYS = {
|
||||
# Example API keys - replace with your own secure keys
|
||||
"wix-webhook-key": "sk_live_your_secure_api_key_here",
|
||||
"admin-key": "sk_admin_your_admin_key_here"
|
||||
}
|
||||
|
||||
# Load API keys from environment if available
|
||||
if os.getenv("WIX_API_KEY"):
|
||||
API_KEYS["wix-webhook-key"] = os.getenv("WIX_API_KEY")
|
||||
if os.getenv("ADMIN_API_KEY"):
|
||||
API_KEYS["admin-key"] = os.getenv("ADMIN_API_KEY")
|
||||
|
||||
|
||||
def generate_api_key() -> str:
|
||||
"""Generate a secure API key"""
|
||||
return f"sk_live_{secrets.token_urlsafe(32)}"
|
||||
|
||||
|
||||
def validate_api_key(credentials: HTTPAuthorizationCredentials = Security(security)) -> str:
|
||||
"""
|
||||
Validate API key from Authorization header.
|
||||
Expected format: Authorization: Bearer your_api_key_here
|
||||
"""
|
||||
token = credentials.credentials
|
||||
|
||||
# Check if the token is in our valid API keys
|
||||
for key_name, valid_key in API_KEYS.items():
|
||||
if secrets.compare_digest(token, valid_key):
|
||||
logger.info(f"Valid API key used: {key_name}")
|
||||
return key_name
|
||||
|
||||
logger.warning(f"Invalid API key attempted: {token[:10]}...")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Invalid API key",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
|
||||
|
||||
def validate_wix_signature(payload: bytes, signature: str, secret: str) -> bool:
|
||||
"""
|
||||
Validate Wix webhook signature for additional security.
|
||||
Wix signs their webhooks with HMAC-SHA256.
|
||||
"""
|
||||
if not signature or not secret:
|
||||
return False
|
||||
|
||||
try:
|
||||
# Remove 'sha256=' prefix if present
|
||||
if signature.startswith('sha256='):
|
||||
signature = signature[7:]
|
||||
|
||||
# Calculate expected signature
|
||||
expected_signature = hmac.new(
|
||||
secret.encode('utf-8'),
|
||||
payload,
|
||||
hashlib.sha256
|
||||
).hexdigest()
|
||||
|
||||
# Compare signatures securely
|
||||
return secrets.compare_digest(signature, expected_signature)
|
||||
except Exception as e:
|
||||
logger.error(f"Error validating signature: {e}")
|
||||
return False
|
||||
|
||||
|
||||
class APIKeyAuth:
|
||||
"""Simple API key authentication class"""
|
||||
|
||||
def __init__(self, api_keys: dict):
|
||||
self.api_keys = api_keys
|
||||
|
||||
def authenticate(self, api_key: str) -> Optional[str]:
|
||||
"""Authenticate an API key and return the key name if valid"""
|
||||
for key_name, valid_key in self.api_keys.items():
|
||||
if secrets.compare_digest(api_key, valid_key):
|
||||
return key_name
|
||||
return None
|
||||
|
||||
def add_key(self, name: str, key: str):
|
||||
"""Add a new API key"""
|
||||
self.api_keys[name] = key
|
||||
|
||||
def remove_key(self, name: str):
|
||||
"""Remove an API key"""
|
||||
if name in self.api_keys:
|
||||
del self.api_keys[name]
|
||||
|
||||
|
||||
# Initialize auth system
|
||||
auth_system = APIKeyAuth(API_KEYS)
|
||||
65
src/alpine_bits_python/models.py
Normal file
65
src/alpine_bits_python/models.py
Normal file
@@ -0,0 +1,65 @@
|
||||
from typing import Dict, List, Any, Optional
|
||||
from pydantic import BaseModel, Field
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class AlpineBitsHandshakeRequest(BaseModel):
|
||||
"""Model for AlpineBits handshake request data"""
|
||||
action: str = Field(..., description="Action parameter, typically 'OTA_Ping:Handshaking'")
|
||||
request_xml: Optional[str] = Field(None, description="XML request document")
|
||||
|
||||
|
||||
class ContactName(BaseModel):
|
||||
"""Contact name structure"""
|
||||
first: Optional[str] = None
|
||||
last: Optional[str] = None
|
||||
|
||||
|
||||
class ContactAddress(BaseModel):
|
||||
"""Contact address structure"""
|
||||
street: Optional[str] = None
|
||||
city: Optional[str] = None
|
||||
state: Optional[str] = None
|
||||
country: Optional[str] = None
|
||||
postalCode: Optional[str] = None
|
||||
|
||||
|
||||
class Contact(BaseModel):
|
||||
"""Contact information from Wix form"""
|
||||
name: Optional[ContactName] = None
|
||||
email: Optional[str] = None
|
||||
locale: Optional[str] = None
|
||||
company: Optional[str] = None
|
||||
birthdate: Optional[str] = None
|
||||
labelKeys: Optional[Dict[str, Any]] = None
|
||||
contactId: Optional[str] = None
|
||||
address: Optional[ContactAddress] = None
|
||||
jobTitle: Optional[str] = None
|
||||
imageUrl: Optional[str] = None
|
||||
updatedDate: Optional[str] = None
|
||||
phone: Optional[str] = None
|
||||
createdDate: Optional[str] = None
|
||||
|
||||
|
||||
class SubmissionPdf(BaseModel):
|
||||
"""PDF submission structure"""
|
||||
url: Optional[str] = None
|
||||
filename: Optional[str] = None
|
||||
|
||||
|
||||
class WixFormSubmission(BaseModel):
|
||||
"""Model for Wix form submission data"""
|
||||
formName: str
|
||||
submissions: List[Dict[str, Any]] = Field(default_factory=list)
|
||||
submissionTime: str
|
||||
formFieldMask: List[str] = Field(default_factory=list)
|
||||
submissionId: str
|
||||
contactId: str
|
||||
submissionsLink: str
|
||||
submissionPdf: Optional[SubmissionPdf] = None
|
||||
formId: str
|
||||
contact: Optional[Contact] = None
|
||||
|
||||
# Dynamic form fields - these will capture all field:* entries
|
||||
class Config:
|
||||
extra = "allow" # Allow additional fields not defined in the model
|
||||
98
src/alpine_bits_python/rate_limit.py
Normal file
98
src/alpine_bits_python/rate_limit.py
Normal file
@@ -0,0 +1,98 @@
|
||||
from slowapi import Limiter, _rate_limit_exceeded_handler
|
||||
from slowapi.util import get_remote_address
|
||||
from slowapi.errors import RateLimitExceeded
|
||||
from fastapi import Request
|
||||
import redis
|
||||
import os
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Rate limiting configuration
|
||||
DEFAULT_RATE_LIMIT = "10/minute" # 10 requests per minute per IP
|
||||
WEBHOOK_RATE_LIMIT = "60/minute" # 60 webhook requests per minute per IP
|
||||
BURST_RATE_LIMIT = "3/second" # Max 3 requests per second per IP
|
||||
|
||||
# Redis configuration for distributed rate limiting (optional)
|
||||
REDIS_URL = os.getenv("REDIS_URL", None)
|
||||
|
||||
def get_remote_address_with_forwarded(request: Request):
|
||||
"""
|
||||
Get client IP address, considering forwarded headers from proxies/load balancers
|
||||
"""
|
||||
# Check for forwarded headers (common in production behind proxies)
|
||||
forwarded_for = request.headers.get("X-Forwarded-For")
|
||||
if forwarded_for:
|
||||
# Take the first IP in the chain
|
||||
return forwarded_for.split(",")[0].strip()
|
||||
|
||||
real_ip = request.headers.get("X-Real-IP")
|
||||
if real_ip:
|
||||
return real_ip
|
||||
|
||||
# Fallback to direct connection IP
|
||||
return get_remote_address(request)
|
||||
|
||||
|
||||
# Initialize limiter
|
||||
if REDIS_URL:
|
||||
# Use Redis for distributed rate limiting (recommended for production)
|
||||
try:
|
||||
import redis
|
||||
redis_client = redis.from_url(REDIS_URL)
|
||||
limiter = Limiter(
|
||||
key_func=get_remote_address_with_forwarded,
|
||||
storage_uri=REDIS_URL
|
||||
)
|
||||
logger.info("Rate limiting initialized with Redis backend")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to connect to Redis: {e}. Using in-memory rate limiting.")
|
||||
limiter = Limiter(key_func=get_remote_address_with_forwarded)
|
||||
else:
|
||||
# Use in-memory rate limiting (fine for single instance)
|
||||
limiter = Limiter(key_func=get_remote_address_with_forwarded)
|
||||
logger.info("Rate limiting initialized with in-memory backend")
|
||||
|
||||
|
||||
def get_api_key_identifier(request: Request) -> str:
|
||||
"""
|
||||
Get identifier for rate limiting based on API key if available, otherwise IP
|
||||
This allows different rate limits per API key
|
||||
"""
|
||||
# Try to get API key from Authorization header
|
||||
auth_header = request.headers.get("Authorization")
|
||||
if auth_header and auth_header.startswith("Bearer "):
|
||||
api_key = auth_header[7:] # Remove "Bearer " prefix
|
||||
# Use first 10 chars of API key as identifier (don't log full key)
|
||||
return f"api_key:{api_key[:10]}"
|
||||
|
||||
# Fallback to IP address
|
||||
return f"ip:{get_remote_address_with_forwarded(request)}"
|
||||
|
||||
|
||||
# Custom rate limit key function for API key based limiting
|
||||
def api_key_rate_limit_key(request: Request):
|
||||
return get_api_key_identifier(request)
|
||||
|
||||
|
||||
# Rate limiting decorators for different endpoint types
|
||||
webhook_limiter = Limiter(
|
||||
key_func=api_key_rate_limit_key,
|
||||
storage_uri=REDIS_URL if REDIS_URL else None
|
||||
)
|
||||
|
||||
# Custom rate limit exceeded handler
|
||||
def custom_rate_limit_handler(request: Request, exc: RateLimitExceeded):
|
||||
"""Custom handler for rate limit exceeded"""
|
||||
logger.warning(
|
||||
f"Rate limit exceeded for {get_remote_address_with_forwarded(request)}: "
|
||||
f"{exc.detail}"
|
||||
)
|
||||
|
||||
response = _rate_limit_exceeded_handler(request, exc)
|
||||
|
||||
# Add custom headers
|
||||
response.headers["X-RateLimit-Limit"] = str(exc.retry_after)
|
||||
response.headers["X-RateLimit-Retry-After"] = str(exc.retry_after)
|
||||
|
||||
return response
|
||||
15
src/alpine_bits_python/run_api.py
Normal file
15
src/alpine_bits_python/run_api.py
Normal file
@@ -0,0 +1,15 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Startup script for the Wix Form Handler API
|
||||
"""
|
||||
import uvicorn
|
||||
from .api import app
|
||||
|
||||
if __name__ == "__main__":
|
||||
uvicorn.run(
|
||||
"alpine_bits_python.api:app",
|
||||
host="0.0.0.0",
|
||||
port=8000,
|
||||
reload=True, # Enable auto-reload during development
|
||||
log_level="info"
|
||||
)
|
||||
129
src/alpine_bits_python/scripts/setup_security.py
Normal file
129
src/alpine_bits_python/scripts/setup_security.py
Normal file
@@ -0,0 +1,129 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Configuration and setup script for the Wix Form Handler API
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
import secrets
|
||||
|
||||
# Add parent directory to path to import from src
|
||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from alpine_bits_python.auth import generate_api_key
|
||||
|
||||
def generate_secure_keys():
|
||||
"""Generate secure API keys for the application"""
|
||||
|
||||
print("🔐 Generating Secure API Keys")
|
||||
print("=" * 50)
|
||||
|
||||
# Generate API keys
|
||||
wix_api_key = generate_api_key()
|
||||
admin_api_key = generate_api_key()
|
||||
webhook_secret = secrets.token_urlsafe(32)
|
||||
|
||||
print(f"🔑 Wix Webhook API Key: {wix_api_key}")
|
||||
print(f"🔐 Admin API Key: {admin_api_key}")
|
||||
print(f"🔒 Webhook Secret: {webhook_secret}")
|
||||
|
||||
print("\n📋 Environment Variables")
|
||||
print("-" * 30)
|
||||
print(f"export WIX_API_KEY='{wix_api_key}'")
|
||||
print(f"export ADMIN_API_KEY='{admin_api_key}'")
|
||||
print(f"export WIX_WEBHOOK_SECRET='{webhook_secret}'")
|
||||
print(f"export REDIS_URL='redis://localhost:6379' # Optional for production")
|
||||
|
||||
print("\n🔧 .env File Content")
|
||||
print("-" * 20)
|
||||
print(f"WIX_API_KEY={wix_api_key}")
|
||||
print(f"ADMIN_API_KEY={admin_api_key}")
|
||||
print(f"WIX_WEBHOOK_SECRET={webhook_secret}")
|
||||
print("REDIS_URL=redis://localhost:6379")
|
||||
|
||||
# Optionally write to .env file
|
||||
create_env = input("\n❓ Create .env file? (y/n): ").lower().strip()
|
||||
if create_env == 'y':
|
||||
# Create .env in the project root (two levels up from scripts)
|
||||
env_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), '.env')
|
||||
with open(env_path, 'w') as f:
|
||||
f.write(f"WIX_API_KEY={wix_api_key}\n")
|
||||
f.write(f"ADMIN_API_KEY={admin_api_key}\n")
|
||||
f.write(f"WIX_WEBHOOK_SECRET={webhook_secret}\n")
|
||||
f.write("REDIS_URL=redis://localhost:6379\n")
|
||||
print(f"✅ .env file created at {env_path}!")
|
||||
print("⚠️ Add .env to your .gitignore file!")
|
||||
|
||||
print("\n🌐 Wix Configuration")
|
||||
print("-" * 20)
|
||||
print("1. In your Wix site, go to Settings > Webhooks")
|
||||
print("2. Add webhook URL: https://yourdomain.com/webhook/wix-form")
|
||||
print("3. Add custom header: Authorization: Bearer " + wix_api_key)
|
||||
print("4. Optionally configure webhook signature with the secret above")
|
||||
|
||||
return {
|
||||
'wix_api_key': wix_api_key,
|
||||
'admin_api_key': admin_api_key,
|
||||
'webhook_secret': webhook_secret
|
||||
}
|
||||
|
||||
|
||||
def check_security_setup():
|
||||
"""Check current security configuration"""
|
||||
|
||||
print("🔍 Security Configuration Check")
|
||||
print("=" * 40)
|
||||
|
||||
# Check environment variables
|
||||
wix_key = os.getenv('WIX_API_KEY')
|
||||
admin_key = os.getenv('ADMIN_API_KEY')
|
||||
webhook_secret = os.getenv('WIX_WEBHOOK_SECRET')
|
||||
redis_url = os.getenv('REDIS_URL')
|
||||
|
||||
print("Environment Variables:")
|
||||
print(f" WIX_API_KEY: {'✅ Set' if wix_key else '❌ Not set'}")
|
||||
print(f" ADMIN_API_KEY: {'✅ Set' if admin_key else '❌ Not set'}")
|
||||
print(f" WIX_WEBHOOK_SECRET: {'✅ Set' if webhook_secret else '❌ Not set'}")
|
||||
print(f" REDIS_URL: {'✅ Set' if redis_url else '⚠️ Optional (using in-memory)'}")
|
||||
|
||||
# Security recommendations
|
||||
print("\n🛡️ Security Recommendations:")
|
||||
if not wix_key:
|
||||
print(" ❌ Set WIX_API_KEY environment variable")
|
||||
else:
|
||||
if len(wix_key) < 32:
|
||||
print(" ⚠️ WIX_API_KEY should be longer for better security")
|
||||
else:
|
||||
print(" ✅ WIX_API_KEY looks secure")
|
||||
|
||||
if not admin_key:
|
||||
print(" ❌ Set ADMIN_API_KEY environment variable")
|
||||
elif wix_key and admin_key == wix_key:
|
||||
print(" ❌ Admin and Wix keys should be different")
|
||||
else:
|
||||
print(" ✅ ADMIN_API_KEY configured")
|
||||
|
||||
if not webhook_secret:
|
||||
print(" ⚠️ Consider setting WIX_WEBHOOK_SECRET for signature validation")
|
||||
else:
|
||||
print(" ✅ Webhook signature validation enabled")
|
||||
|
||||
print("\n🚀 Production Checklist:")
|
||||
print(" - Use HTTPS in production")
|
||||
print(" - Set up Redis for distributed rate limiting")
|
||||
print(" - Configure proper CORS origins")
|
||||
print(" - Set up monitoring and logging")
|
||||
print(" - Regular key rotation")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("🔐 Wix Form Handler API - Security Setup")
|
||||
print("=" * 50)
|
||||
|
||||
choice = input("Choose an option:\n1. Generate new API keys\n2. Check current setup\n\nEnter choice (1 or 2): ").strip()
|
||||
|
||||
if choice == "1":
|
||||
generate_secure_keys()
|
||||
elif choice == "2":
|
||||
check_security_setup()
|
||||
else:
|
||||
print("Invalid choice. Please run again and choose 1 or 2.")
|
||||
210
src/alpine_bits_python/scripts/test_api.py
Normal file
210
src/alpine_bits_python/scripts/test_api.py
Normal file
@@ -0,0 +1,210 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test script for the Secure Wix Form Handler API
|
||||
"""
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime
|
||||
|
||||
# Add parent directory to path to import from src
|
||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
# API Configuration
|
||||
BASE_URL = "http://localhost:8000"
|
||||
|
||||
# API Keys for testing - replace with your actual keys
|
||||
TEST_API_KEY = os.getenv("WIX_API_KEY", "sk_live_your_secure_api_key_here")
|
||||
ADMIN_API_KEY = os.getenv("ADMIN_API_KEY", "sk_admin_your_admin_key_here")
|
||||
|
||||
# Sample Wix form data based on your example
|
||||
SAMPLE_WIX_DATA = {
|
||||
"formName": "Contact Form",
|
||||
"submissions": [],
|
||||
"submissionTime": "2024-03-20T10:30:00+00:00",
|
||||
"formFieldMask": ["email", "name", "phone"],
|
||||
"submissionId": "test-submission-123",
|
||||
"contactId": "test-contact-456",
|
||||
"submissionsLink": "https://www.wix.app/forms/test-form/submissions",
|
||||
"submissionPdf": {
|
||||
"url": "https://example.com/submission.pdf",
|
||||
"filename": "submission.pdf"
|
||||
},
|
||||
"formId": "test-form-789",
|
||||
"field:email_5139": "test@example.com",
|
||||
"field:first_name_abae": "John",
|
||||
"field:last_name_d97c": "Doe",
|
||||
"field:phone_4c77": "+1234567890",
|
||||
"field:anrede": "Herr",
|
||||
"field:anzahl_kinder": "2",
|
||||
"field:alter_kind_3": "8",
|
||||
"field:alter_kind_4": "12",
|
||||
"field:long_answer_3524": "This is a long answer field with more details about the inquiry.",
|
||||
"contact": {
|
||||
"name": {
|
||||
"first": "John",
|
||||
"last": "Doe"
|
||||
},
|
||||
"email": "test@example.com",
|
||||
"locale": "de",
|
||||
"company": "Test Company",
|
||||
"birthdate": "1985-05-15",
|
||||
"labelKeys": {},
|
||||
"contactId": "test-contact-456",
|
||||
"address": {
|
||||
"street": "Test Street 123",
|
||||
"city": "Test City",
|
||||
"country": "Germany",
|
||||
"postalCode": "12345"
|
||||
},
|
||||
"jobTitle": "Manager",
|
||||
"phone": "+1234567890",
|
||||
"createdDate": "2024-03-20T10:00:00.000Z",
|
||||
"updatedDate": "2024-03-20T10:30:00.000Z"
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
async def test_api():
|
||||
"""Test the API endpoints with authentication"""
|
||||
|
||||
headers_with_auth = {
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": f"Bearer {TEST_API_KEY}"
|
||||
}
|
||||
|
||||
admin_headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": f"Bearer {ADMIN_API_KEY}"
|
||||
}
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
# Test health endpoint (no auth required)
|
||||
print("1. Testing health endpoint (no auth)...")
|
||||
try:
|
||||
async with session.get(f"{BASE_URL}/api/health") as response:
|
||||
result = await response.json()
|
||||
print(f" ✅ Health check: {response.status} - {result.get('status')}")
|
||||
except Exception as e:
|
||||
print(f" ❌ Health check failed: {e}")
|
||||
|
||||
# Test root endpoint (no auth required)
|
||||
print("\n2. Testing root endpoint (no auth)...")
|
||||
try:
|
||||
async with session.get(f"{BASE_URL}/api/") as response:
|
||||
result = await response.json()
|
||||
print(f" ✅ Root: {response.status} - {result.get('message')}")
|
||||
except Exception as e:
|
||||
print(f" ❌ Root endpoint failed: {e}")
|
||||
|
||||
# Test webhook endpoint without auth (should fail)
|
||||
print("\n3. Testing webhook endpoint WITHOUT auth (should fail)...")
|
||||
try:
|
||||
async with session.post(
|
||||
f"{BASE_URL}/api/webhook/wix-form",
|
||||
json=SAMPLE_WIX_DATA,
|
||||
headers={"Content-Type": "application/json"}
|
||||
) as response:
|
||||
result = await response.json()
|
||||
if response.status == 401:
|
||||
print(f" ✅ Correctly rejected: {response.status} - {result.get('detail')}")
|
||||
else:
|
||||
print(f" ❌ Unexpected response: {response.status} - {result}")
|
||||
except Exception as e:
|
||||
print(f" ❌ Test failed: {e}")
|
||||
|
||||
# Test webhook endpoint with valid auth
|
||||
print("\n4. Testing webhook endpoint WITH valid auth...")
|
||||
try:
|
||||
async with session.post(
|
||||
f"{BASE_URL}/api/webhook/wix-form",
|
||||
json=SAMPLE_WIX_DATA,
|
||||
headers=headers_with_auth
|
||||
) as response:
|
||||
result = await response.json()
|
||||
if response.status == 200:
|
||||
print(f" ✅ Webhook success: {response.status} - {result.get('status')}")
|
||||
else:
|
||||
print(f" ❌ Webhook failed: {response.status} - {result}")
|
||||
except Exception as e:
|
||||
print(f" ❌ Webhook test failed: {e}")
|
||||
|
||||
# Test test endpoint with auth
|
||||
print("\n5. Testing simple test endpoint WITH auth...")
|
||||
try:
|
||||
async with session.post(
|
||||
f"{BASE_URL}/api/webhook/wix-form/test",
|
||||
json={"test": "data", "timestamp": datetime.now().isoformat()},
|
||||
headers=headers_with_auth
|
||||
) as response:
|
||||
result = await response.json()
|
||||
if response.status == 200:
|
||||
print(f" ✅ Test endpoint: {response.status} - {result.get('status')}")
|
||||
else:
|
||||
print(f" ❌ Test endpoint failed: {response.status} - {result}")
|
||||
except Exception as e:
|
||||
print(f" ❌ Test endpoint failed: {e}")
|
||||
|
||||
# Test rate limiting by making multiple rapid requests
|
||||
print("\n6. Testing rate limiting (making 5 rapid requests)...")
|
||||
rate_limit_test_count = 0
|
||||
for i in range(5):
|
||||
try:
|
||||
async with session.get(
|
||||
f"{BASE_URL}/api/health"
|
||||
) as response:
|
||||
if response.status == 200:
|
||||
rate_limit_test_count += 1
|
||||
elif response.status == 429:
|
||||
print(f" ✅ Rate limit triggered on request {i+1}")
|
||||
break
|
||||
except Exception as e:
|
||||
print(f" ❌ Rate limit test failed: {e}")
|
||||
break
|
||||
|
||||
if rate_limit_test_count == 5:
|
||||
print(" ℹ️ No rate limit reached (normal for low request volume)")
|
||||
|
||||
# Test admin endpoint (if admin key is configured)
|
||||
print("\n7. Testing admin stats endpoint...")
|
||||
try:
|
||||
async with session.get(
|
||||
f"{BASE_URL}/api/admin/stats",
|
||||
headers=admin_headers
|
||||
) as response:
|
||||
result = await response.json()
|
||||
if response.status == 200:
|
||||
print(f" ✅ Admin stats: {response.status} - {result.get('status')}")
|
||||
elif response.status == 401:
|
||||
print(f" ⚠️ Admin access denied (API key not configured): {result.get('detail')}")
|
||||
else:
|
||||
print(f" ❌ Admin endpoint failed: {response.status} - {result}")
|
||||
except Exception as e:
|
||||
print(f" ❌ Admin test failed: {e}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("🔒 Testing Secure Wix Form Handler API...")
|
||||
print("=" * 60)
|
||||
print("📍 API URL:", BASE_URL)
|
||||
print("🔑 Using API Key:", TEST_API_KEY[:20] + "..." if len(TEST_API_KEY) > 20 else TEST_API_KEY)
|
||||
print("🔐 Using Admin Key:", ADMIN_API_KEY[:20] + "..." if len(ADMIN_API_KEY) > 20 else ADMIN_API_KEY)
|
||||
print("=" * 60)
|
||||
print("Make sure the API is running with: python3 run_api.py")
|
||||
print("-" * 60)
|
||||
|
||||
try:
|
||||
asyncio.run(test_api())
|
||||
print("\n" + "=" * 60)
|
||||
print("✅ Testing completed!")
|
||||
print("\n📋 Quick Setup Reminder:")
|
||||
print("1. Set environment variables:")
|
||||
print(" export WIX_API_KEY='your_secure_api_key'")
|
||||
print(" export ADMIN_API_KEY='your_admin_key'")
|
||||
print("2. Configure Wix webhook URL: https://yourdomain.com/webhook/wix-form")
|
||||
print("3. Add Authorization header: Bearer your_api_key")
|
||||
except Exception as e:
|
||||
print(f"\n❌ Error testing API: {e}")
|
||||
print("Make sure the API server is running!")
|
||||
108
src/alpine_bits_python/templates/index.html
Normal file
108
src/alpine_bits_python/templates/index.html
Normal file
@@ -0,0 +1,108 @@
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<title>99 Tales - Under Construction</title>
|
||||
<style>
|
||||
body {
|
||||
margin: 0;
|
||||
padding: 0;
|
||||
font-family: 'Arial', sans-serif;
|
||||
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
|
||||
color: white;
|
||||
display: flex;
|
||||
justify-content: center;
|
||||
align-items: center;
|
||||
min-height: 100vh;
|
||||
text-align: center;
|
||||
}
|
||||
|
||||
.container {
|
||||
max-width: 600px;
|
||||
padding: 2rem;
|
||||
background: rgba(255, 255, 255, 0.1);
|
||||
border-radius: 20px;
|
||||
backdrop-filter: blur(10px);
|
||||
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.1);
|
||||
}
|
||||
|
||||
h1 {
|
||||
font-size: 3rem;
|
||||
margin-bottom: 1rem;
|
||||
text-shadow: 2px 2px 4px rgba(0, 0, 0, 0.3);
|
||||
}
|
||||
|
||||
.subtitle {
|
||||
font-size: 1.5rem;
|
||||
margin-bottom: 2rem;
|
||||
opacity: 0.9;
|
||||
}
|
||||
|
||||
.description {
|
||||
font-size: 1.1rem;
|
||||
line-height: 1.6;
|
||||
margin-bottom: 2rem;
|
||||
opacity: 0.8;
|
||||
}
|
||||
|
||||
.construction-icon {
|
||||
font-size: 4rem;
|
||||
margin-bottom: 1rem;
|
||||
animation: bounce 2s infinite;
|
||||
}
|
||||
|
||||
@keyframes bounce {
|
||||
0%, 20%, 50%, 80%, 100% {
|
||||
transform: translateY(0);
|
||||
}
|
||||
40% {
|
||||
transform: translateY(-10px);
|
||||
}
|
||||
60% {
|
||||
transform: translateY(-5px);
|
||||
}
|
||||
}
|
||||
|
||||
.contact-info {
|
||||
margin-top: 2rem;
|
||||
font-size: 0.9rem;
|
||||
opacity: 0.7;
|
||||
}
|
||||
|
||||
.api-link {
|
||||
display: inline-block;
|
||||
margin-top: 1rem;
|
||||
padding: 0.5rem 1rem;
|
||||
background: rgba(255, 255, 255, 0.2);
|
||||
border: 1px solid rgba(255, 255, 255, 0.3);
|
||||
border-radius: 10px;
|
||||
color: white;
|
||||
text-decoration: none;
|
||||
transition: all 0.3s ease;
|
||||
}
|
||||
|
||||
.api-link:hover {
|
||||
background: rgba(255, 255, 255, 0.3);
|
||||
transform: translateY(-2px);
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<div class="container">
|
||||
<div class="construction-icon">🏗️</div>
|
||||
<h1>99 Tales</h1>
|
||||
<div class="subtitle">Coming Soon</div>
|
||||
<div class="description">
|
||||
We're working hard to bring you something amazing. Our team is putting the finishing touches on an exciting new experience.
|
||||
</div>
|
||||
<div class="description">
|
||||
Thank you for your patience while we build something special for you.
|
||||
</div>
|
||||
<a href="/api" class="api-link">API Documentation</a>
|
||||
<div class="contact-info">
|
||||
Check back soon for updates!
|
||||
</div>
|
||||
</div>
|
||||
</body>
|
||||
</html>
|
||||
Reference in New Issue
Block a user