email_notifications #7
@@ -57,6 +57,9 @@ security_basic = HTTPBasic()
|
||||
# HTTP Bearer auth for API endpoints
|
||||
security_bearer = HTTPBearer()
|
||||
|
||||
# Constants for token sanitization
|
||||
TOKEN_LOG_LENGTH = 10
|
||||
|
||||
|
||||
# Pydantic models for language detection
|
||||
class LanguageDetectionRequest(BaseModel):
|
||||
@@ -405,9 +408,25 @@ async def detect_language(
|
||||
token = credentials.credentials
|
||||
config = request.app.state.config
|
||||
|
||||
# Check if token is valid (you may want to implement proper token validation)
|
||||
# Check if token is valid
|
||||
valid_tokens = config.get("api_tokens", [])
|
||||
if not valid_tokens or token not in valid_tokens:
|
||||
|
||||
# If no tokens configured, reject authentication
|
||||
if not valid_tokens:
|
||||
_LOGGER.error("No api_tokens configured in config.yaml")
|
||||
raise HTTPException(
|
||||
status_code=401,
|
||||
detail="Authentication token not configured on server",
|
||||
)
|
||||
|
||||
if token not in valid_tokens:
|
||||
# Log sanitized token (first TOKEN_LOG_LENGTH chars) for security
|
||||
sanitized_token = (
|
||||
token[:TOKEN_LOG_LENGTH] + "..."
|
||||
if len(token) > TOKEN_LOG_LENGTH
|
||||
else token
|
||||
)
|
||||
_LOGGER.warning("Invalid token attempt: %s", sanitized_token)
|
||||
raise HTTPException(
|
||||
status_code=401,
|
||||
detail="Invalid authentication token",
|
||||
|
||||
Reference in New Issue
Block a user