Presumably production ready xD

This commit is contained in:
Jonas Linter
2025-11-18 16:10:57 +01:00
parent b826277b54
commit 3f149fe984
6 changed files with 129551 additions and 65 deletions

View File

@@ -16,7 +16,7 @@ from typing import Any
import httpx
from fast_langdetect import detect
from fastapi import APIRouter, Depends, FastAPI, HTTPException, Request
from fastapi import APIRouter, BackgroundTasks, Depends, FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, Response
from fastapi.security import (
@@ -44,9 +44,9 @@ from .const import CONF_GOOGLE_ACCOUNT, CONF_HOTEL_ID, CONF_META_ACCOUNT, HttpSt
from .conversion_service import ConversionService
from .csv_import import CSVImporter
from .customer_service import CustomerService
from .db import ResilientAsyncSession, SessionMaker, create_database_engine
from .db import Customer as DBCustomer
from .db import Reservation as DBReservation
from .db import ResilientAsyncSession, SessionMaker, create_database_engine
from .db_setup import run_startup_tasks
from .email_monitoring import ReservationStatsCollector
from .email_service import create_email_service
@@ -1164,6 +1164,7 @@ async def import_csv_endpoint(
Returns:
Import statistics including created/skipped counts and any errors
"""
try:
# Validate file path to prevent path traversal
@@ -1202,7 +1203,7 @@ async def import_csv_endpoint(
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
_LOGGER.exception("Error during CSV import")
raise HTTPException(status_code=500, detail=f"Error processing CSV: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error processing CSV: {e!s}")
@api_router.post("/webhook/generic")
@@ -1315,10 +1316,56 @@ async def handle_generic_webhook(
) from e
async def _process_conversion_xml_background(
xml_content: str,
filename: str,
session_maker: SessionMaker,
log_filename: Path,
):
"""Background task to process conversion XML.
This runs in a separate asyncio task after the HTTP response is sent.
Handles both file prettification and database processing.
"""
try:
# First, prettify and save the XML file (in background)
try:
dom = xml.dom.minidom.parseString(xml_content)
pretty_xml = dom.toprettyxml(indent=" ")
# Remove extra blank lines that toprettyxml adds
pretty_xml = "\n".join(
[line for line in pretty_xml.split("\n") if line.strip()]
)
await asyncio.to_thread(
log_filename.write_text, pretty_xml, encoding="utf-8"
)
_LOGGER.debug("XML file prettified and saved to %s", log_filename)
except Exception as e:
# If formatting fails, save the original content
_LOGGER.warning("Failed to format XML: %s. Saving unformatted.", str(e))
await asyncio.to_thread(
log_filename.write_text, xml_content, encoding="utf-8"
)
# Now process the conversion XML
_LOGGER.info("Starting database processing of %s", filename)
conversion_service = ConversionService(session_maker)
processing_stats = await conversion_service.process_conversion_xml(xml_content)
_LOGGER.info(
"Conversion processing complete for %s: %s", filename, processing_stats
)
except Exception:
_LOGGER.exception(
"Error processing conversion XML in background for %s", filename
)
@api_router.put("/hoteldata/conversions_import/{filename:path}")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def handle_xml_upload(
request: Request,
background_tasks: BackgroundTasks,
filename: str,
credentials_tupel: tuple = Depends(validate_basic_auth),
db_session=Depends(get_async_session),
@@ -1332,6 +1379,8 @@ async def handle_xml_upload(
- Links conversions to customers and hashed_customers
- Stores daily sales revenue data
Returns immediately with 202 Accepted while processing continues in background.
Requires basic authentication and saves XML files to log directory.
Supports gzip compression via Content-Encoding header.
@@ -1377,65 +1426,33 @@ async def handle_xml_upload(
status_code=400, detail="ERROR: Content does not appear to be XML"
)
# Create logs directory for XML conversions
# Create logs directory for XML conversions (blocking, but fast)
logs_dir = Path("logs/conversions_import")
if not logs_dir.exists():
logs_dir.mkdir(parents=True, mode=0o755, exist_ok=True)
_LOGGER.info("Created directory: %s", logs_dir)
logs_dir.mkdir(parents=True, mode=0o755, exist_ok=True)
# Generate filename with timestamp and authenticated user
username, _ = credentials_tupel
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Use the filename from the path, but add timestamp and username for uniqueness
base_filename = Path(filename).stem
extension = Path(filename).suffix or ".xml"
log_filename = logs_dir / f"{base_filename}_{username}_{timestamp}{extension}"
# Format and save XML content to file
try:
dom = xml.dom.minidom.parseString(xml_content)
pretty_xml = dom.toprettyxml(indent=" ")
# Remove extra blank lines that toprettyxml adds
pretty_xml = "\n".join([line for line in pretty_xml.split("\n") if line.strip()])
log_filename.write_text(pretty_xml, encoding="utf-8")
except Exception as e:
# If formatting fails, save the original content
_LOGGER.warning("Failed to format XML: %s. Saving unformatted.", str(e))
log_filename.write_text(xml_content, encoding="utf-8")
_LOGGER.info(
"XML file saved to %s by user %s (original: %s)",
"XML file queued for processing: %s by user %s (original: %s)",
log_filename,
username,
filename,
)
# Process the conversion XML in the background
async def process_in_background():
"""Process conversion XML asynchronously in the background."""
try:
# Use SessionMaker for concurrent processing of large XML files
# This allows multiple reservations to be processed
# in parallel with independent sessions
conversion_service = ConversionService(session_maker)
processing_stats = await conversion_service.process_conversion_xml(
xml_content
)
_LOGGER.info(
"Conversion processing complete for %s: %s",
filename,
processing_stats,
)
except Exception:
_LOGGER.exception(
"Error processing conversion XML in background for %s", filename
)
# Create background task and add done callback for error logging
task = asyncio.create_task(process_in_background())
task.add_done_callback(
lambda t: t.exception() if not t.cancelled() else None
# Schedule background processing using FastAPI's BackgroundTasks
# This handles both file prettification/saving AND database processing
# This ensures the response is sent immediately
background_tasks.add_task(
_process_conversion_xml_background,
xml_content,
filename,
session_maker,
log_filename,
)
response_headers = {