email_notifications #7
@@ -299,6 +299,22 @@ async def lifespan(app: FastAPI):
|
|||||||
report_scheduler.set_stats_collector(stats_collector.collect_stats)
|
report_scheduler.set_stats_collector(stats_collector.collect_stats)
|
||||||
_LOGGER.info("Stats collector initialized and hooked up to report scheduler")
|
_LOGGER.info("Stats collector initialized and hooked up to report scheduler")
|
||||||
|
|
||||||
|
# Send a test daily report on startup for testing
|
||||||
|
_LOGGER.info("Sending test daily report on startup")
|
||||||
|
try:
|
||||||
|
stats = await stats_collector.collect_stats()
|
||||||
|
success = await email_service.send_daily_report(
|
||||||
|
recipients=report_scheduler.recipients,
|
||||||
|
stats=stats,
|
||||||
|
errors=None,
|
||||||
|
)
|
||||||
|
if success:
|
||||||
|
_LOGGER.info("Test daily report sent successfully on startup")
|
||||||
|
else:
|
||||||
|
_LOGGER.error("Failed to send test daily report on startup")
|
||||||
|
except Exception:
|
||||||
|
_LOGGER.exception("Error sending test daily report on startup")
|
||||||
|
|
||||||
# Start daily report scheduler
|
# Start daily report scheduler
|
||||||
report_scheduler.start()
|
report_scheduler.start()
|
||||||
_LOGGER.info("Daily report scheduler started")
|
_LOGGER.info("Daily report scheduler started")
|
||||||
@@ -320,6 +336,11 @@ async def lifespan(app: FastAPI):
|
|||||||
email_handler.close()
|
email_handler.close()
|
||||||
_LOGGER.info("Email alert handler closed")
|
_LOGGER.info("Email alert handler closed")
|
||||||
|
|
||||||
|
# Shutdown email service thread pool
|
||||||
|
if email_service:
|
||||||
|
email_service.shutdown()
|
||||||
|
_LOGGER.info("Email service shut down")
|
||||||
|
|
||||||
# Dispose engine
|
# Dispose engine
|
||||||
await engine.dispose()
|
await engine.dispose()
|
||||||
_LOGGER.info("Application shutdown complete")
|
_LOGGER.info("Application shutdown complete")
|
||||||
@@ -711,6 +732,9 @@ async def process_generic_webhook_submission(
|
|||||||
name_prefix = form_data.get("anrede")
|
name_prefix = form_data.get("anrede")
|
||||||
language = form_data.get("sprache", "de")[:2]
|
language = form_data.get("sprache", "de")[:2]
|
||||||
user_comment = form_data.get("nachricht", "")
|
user_comment = form_data.get("nachricht", "")
|
||||||
|
plz = form_data.get("plz", "")
|
||||||
|
city = form_data.get("stadt", "")
|
||||||
|
country = form_data.get("land", "")
|
||||||
|
|
||||||
# Parse dates - handle DD.MM.YYYY format
|
# Parse dates - handle DD.MM.YYYY format
|
||||||
start_date_str = form_data.get("anreise")
|
start_date_str = form_data.get("anreise")
|
||||||
@@ -790,9 +814,9 @@ async def process_generic_webhook_submission(
|
|||||||
"phone": phone_number if phone_number else None,
|
"phone": phone_number if phone_number else None,
|
||||||
"email_newsletter": False,
|
"email_newsletter": False,
|
||||||
"address_line": None,
|
"address_line": None,
|
||||||
"city_name": None,
|
"city_name": city if city else None,
|
||||||
"postal_code": None,
|
"postal_code": plz if plz else None,
|
||||||
"country_code": None,
|
"country_code": country if country else None,
|
||||||
"gender": None,
|
"gender": None,
|
||||||
"birth_date": None,
|
"birth_date": None,
|
||||||
"language": language,
|
"language": language,
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ including error alerts and daily reports.
|
|||||||
import asyncio
|
import asyncio
|
||||||
import smtplib
|
import smtplib
|
||||||
import ssl
|
import ssl
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from email.mime.multipart import MIMEMultipart
|
from email.mime.multipart import MIMEMultipart
|
||||||
from email.mime.text import MIMEText
|
from email.mime.text import MIMEText
|
||||||
@@ -74,7 +75,9 @@ class EmailService:
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
self.config = config
|
self.config = config
|
||||||
self._executor = None # Lazy-initialized thread pool for blocking SMTP
|
# Create dedicated thread pool for SMTP operations (max 2 threads is enough for email)
|
||||||
|
# This prevents issues with default executor in multi-process environments
|
||||||
|
self._executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="smtp-")
|
||||||
|
|
||||||
async def send_email(
|
async def send_email(
|
||||||
self,
|
self,
|
||||||
@@ -114,9 +117,9 @@ class EmailService:
|
|||||||
if html_body:
|
if html_body:
|
||||||
msg.attach(MIMEText(html_body, "html"))
|
msg.attach(MIMEText(html_body, "html"))
|
||||||
|
|
||||||
# Send email in thread pool (SMTP is blocking)
|
# Send email in dedicated thread pool (SMTP is blocking)
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
await loop.run_in_executor(None, self._send_smtp, msg, recipients)
|
await loop.run_in_executor(self._executor, self._send_smtp, msg, recipients)
|
||||||
|
|
||||||
_LOGGER.info("Email sent successfully to %s: %s", recipients, subject)
|
_LOGGER.info("Email sent successfully to %s: %s", recipients, subject)
|
||||||
return True
|
return True
|
||||||
@@ -333,6 +336,17 @@ class EmailService:
|
|||||||
|
|
||||||
return html
|
return html
|
||||||
|
|
||||||
|
def shutdown(self) -> None:
|
||||||
|
"""Shutdown the email service and clean up thread pool.
|
||||||
|
|
||||||
|
This should be called during application shutdown to ensure
|
||||||
|
proper cleanup of the thread pool executor.
|
||||||
|
"""
|
||||||
|
if self._executor:
|
||||||
|
_LOGGER.info("Shutting down email service thread pool")
|
||||||
|
self._executor.shutdown(wait=True, cancel_futures=False)
|
||||||
|
_LOGGER.info("Email service thread pool shut down complete")
|
||||||
|
|
||||||
|
|
||||||
def create_email_service(config: dict[str, Any]) -> EmailService | None:
|
def create_email_service(config: dict[str, Any]) -> EmailService | None:
|
||||||
"""Create an email service from configuration.
|
"""Create an email service from configuration.
|
||||||
|
|||||||
Reference in New Issue
Block a user