Added urllib parsing for gzip compressed content

This commit is contained in:
2025-09-29 09:03:41 +00:00
parent b79288f6b6
commit 9eb993cba5

View File

@@ -21,6 +21,8 @@ import os
import gzip import gzip
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
from .alpinebits_server import AlpineBitsServer, Version from .alpinebits_server import AlpineBitsServer, Version
import urllib.parse
# HTTP Basic auth for AlpineBits # HTTP Basic auth for AlpineBits
security_basic = HTTPBasic() security_basic = HTTPBasic()
@@ -440,6 +442,9 @@ async def alpinebits_server_handshake(
# Get content type before processing # Get content type before processing
content_type = request.headers.get("Content-Type", "") content_type = request.headers.get("Content-Type", "")
logger.info(f"Content-Type: {content_type}")
logger.info(f"Content-Encoding: {content_encoding}")
# Get request body # Get request body
body = await request.body() body = await request.body()
@@ -447,6 +452,8 @@ async def alpinebits_server_handshake(
# Decompress if needed # Decompress if needed
if is_compressed: if is_compressed:
try: try:
logger.info(f"Raw compressed body (first 200 bytes): {body[:200]!r}")
body = gzip.decompress(body) body = gzip.decompress(body)
logger.info("Successfully decompressed gzip content") logger.info("Successfully decompressed gzip content")
@@ -454,7 +461,7 @@ async def alpinebits_server_handshake(
logs_dir = "logs" logs_dir = "logs"
if not os.path.exists(logs_dir): if not os.path.exists(logs_dir):
os.makedirs(logs_dir, mode=0o755, exist_ok=True) os.makedirs(logs_dir, mode=0o755, exist_ok=True)
log_filename = f"{logs_dir}/alpinebits_request_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xml" log_filename = f"{logs_dir}/alpinebits_request_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
with open(log_filename, "wb") as f: with open(log_filename, "wb") as f:
f.write(body) f.write(body)
@@ -466,19 +473,37 @@ async def alpinebits_server_handshake(
) )
# Check content type (after decompression) # Check content type (after decompression)
if "multipart/form-data" not in content_type: if "multipart/form-data" not in content_type and "application/x-www-form-urlencoded" not in content_type:
raise HTTPException( raise HTTPException(
status_code=400, status_code=400,
detail="ERROR: Content-Type must be multipart/form-data" detail="ERROR: Content-Type must be multipart/form-data"
) )
logger.info(f"Request body length: {len(body)} bytes")
# log body to file for debugging
logs_dir = "logs"
if not os.path.exists(logs_dir):
os.makedirs(logs_dir, mode=0o755, exist_ok=True)
log_filename = f"{logs_dir}/alpinebits_request_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
with open(log_filename, "wb") as f:
f.write(body)
# Parse multipart data # Parse multipart data
try: if "multipart/form-data" in content_type:
form_data = parse_multipart_data(content_type, body) try:
except Exception as e: form_data = parse_multipart_data(content_type, body)
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"ERROR: Failed to parse multipart/form-data: {str(e)}"
)
elif "application/x-www-form-urlencoded" in content_type:
# Parse as urlencoded
form_data = dict(urllib.parse.parse_qsl(body.decode("utf-8")))
else:
raise HTTPException( raise HTTPException(
status_code=400, status_code=400,
detail=f"ERROR: Failed to parse multipart/form-data: {str(e)}" detail="ERROR: Content-Type must be multipart/form-data or application/x-www-form-urlencoded"
) )
# Check for required action parameter # Check for required action parameter