"""Email monitoring and alerting through logging integration. This module provides a custom logging handler that accumulates errors and sends email alerts based on configurable thresholds and time windows. """ import asyncio import logging import threading from collections import defaultdict, deque from datetime import datetime, timedelta from typing import Any from sqlalchemy import func, select from sqlalchemy.ext.asyncio import async_sessionmaker from .db import Reservation from .email_service import EmailService from .logging_config import get_logger _LOGGER = get_logger(__name__) class ErrorRecord: """Represents a single error log record for monitoring. Attributes: timestamp: When the error occurred level: Log level (ERROR, CRITICAL, etc.) logger_name: Name of the logger that generated the error message: The error message exception: Exception info if available module: Module where error occurred line_no: Line number where error occurred """ def __init__(self, record: logging.LogRecord): """Initialize from a logging.LogRecord. Args: record: The logging record to wrap """ self.timestamp = datetime.fromtimestamp(record.created) self.level = record.levelname self.logger_name = record.name self.message = record.getMessage() self.exception = record.exc_text if record.exc_info else None self.module = record.module self.line_no = record.lineno self.pathname = record.pathname def to_dict(self) -> dict[str, Any]: """Convert to dictionary format. Returns: Dictionary representation of the error """ return { "timestamp": self.timestamp.strftime("%Y-%m-%d %H:%M:%S"), "level": self.level, "logger_name": self.logger_name, "message": self.message, "exception": self.exception, "module": self.module, "line_no": self.line_no, "pathname": self.pathname, } def format_plain_text(self) -> str: """Format error as plain text for email. Returns: Formatted plain text string """ text = f"[{self.timestamp.strftime('%Y-%m-%d %H:%M:%S')}] {self.level}: {self.message}\n" text += f" Module: {self.module}:{self.line_no} ({self.logger_name})\n" if self.exception: text += f" Exception:\n{self.exception}\n" return text class EmailAlertHandler(logging.Handler): """Custom logging handler that sends email alerts for errors. This handler uses a hybrid approach: - Accumulates errors in a buffer - Sends immediately if error threshold is reached - Otherwise sends after buffer duration expires - Always sends buffered errors (no minimum threshold for time-based flush) - Implements cooldown to prevent alert spam The handler is thread-safe and works with asyncio event loops. """ def __init__( self, email_service: EmailService, config: dict[str, Any], loop: asyncio.AbstractEventLoop | None = None, ): """Initialize the email alert handler. Args: email_service: Email service instance for sending alerts config: Configuration dictionary for error alerts loop: Asyncio event loop (will use current loop if not provided) """ super().__init__() self.email_service = email_service self.config = config self.loop = loop # Will be set when first error occurs if not provided # Configuration self.recipients = config.get("recipients", []) self.error_threshold = config.get("error_threshold", 5) self.buffer_minutes = config.get("buffer_minutes", 15) self.cooldown_minutes = config.get("cooldown_minutes", 15) self.log_levels = config.get("log_levels", ["ERROR", "CRITICAL"]) # State self.error_buffer: deque[ErrorRecord] = deque() self.last_sent = datetime.min # Last time we sent an alert self._flush_task: asyncio.Task | None = None self._lock = threading.Lock() # Thread-safe for multi-threaded logging _LOGGER.info( "EmailAlertHandler initialized: threshold=%d, buffer=%dmin, cooldown=%dmin", self.error_threshold, self.buffer_minutes, self.cooldown_minutes, ) def emit(self, record: logging.LogRecord) -> None: """Handle a log record. This is called automatically by the logging system when an error is logged. It's important that this method is fast and doesn't block. Args: record: The log record to handle """ # Only handle configured log levels if record.levelname not in self.log_levels: return try: # Ensure we have an event loop if self.loop is None: try: self.loop = asyncio.get_running_loop() except RuntimeError: # No running loop, we'll need to handle this differently _LOGGER.warning("No asyncio event loop available for email alerts") return # Add error to buffer (thread-safe) with self._lock: error_record = ErrorRecord(record) self.error_buffer.append(error_record) buffer_size = len(self.error_buffer) # Determine if we should send immediately should_send_immediately = buffer_size >= self.error_threshold if should_send_immediately: # Cancel any pending flush task if self._flush_task and not self._flush_task.done(): self._flush_task.cancel() # Schedule immediate flush self._flush_task = asyncio.run_coroutine_threadsafe( self._flush_buffer(immediate=True), self.loop, ) # Schedule delayed flush if not already scheduled elif not self._flush_task or self._flush_task.done(): self._flush_task = asyncio.run_coroutine_threadsafe( self._schedule_delayed_flush(), self.loop, ) except Exception: # Never let the handler crash - just log and continue _LOGGER.exception("Error in EmailAlertHandler.emit") async def _schedule_delayed_flush(self) -> None: """Schedule a delayed buffer flush after buffer duration.""" await asyncio.sleep(self.buffer_minutes * 60) await self._flush_buffer(immediate=False) async def _flush_buffer(self, *, immediate: bool) -> None: """Flush the error buffer and send email alert. Args: immediate: Whether this is an immediate flush (threshold hit) """ # Check cooldown period now = datetime.now() time_since_last = (now - self.last_sent).total_seconds() / 60 if time_since_last < self.cooldown_minutes: _LOGGER.info( "Alert cooldown active (%.1f min remaining), buffering errors", self.cooldown_minutes - time_since_last, ) # Don't clear buffer - let errors accumulate until cooldown expires return # Get all buffered errors (thread-safe) with self._lock: if not self.error_buffer: return errors = list(self.error_buffer) self.error_buffer.clear() # Update last sent time self.last_sent = now # Format email error_count = len(errors) time_range = ( f"{errors[0].timestamp.strftime('%H:%M:%S')} to " f"{errors[-1].timestamp.strftime('%H:%M:%S')}" ) # Determine alert type for subject alert_type = "Immediate Alert" if immediate else "Scheduled Alert" if immediate: emoji = "🚨" reason = f"(threshold of {self.error_threshold} exceeded)" else: emoji = "⚠️" reason = f"({self.buffer_minutes} minute buffer)" subject = ( f"{emoji} AlpineBits Error {alert_type}: {error_count} errors {reason}" ) # Build plain text body body = f"Error Alert - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" body += "=" * 70 + "\n\n" body += f"Alert Type: {alert_type}\n" body += f"Error Count: {error_count}\n" body += f"Time Range: {time_range}\n" body += f"Reason: {reason}\n" body += "\n" + "=" * 70 + "\n\n" # Add individual errors body += "Errors:\n" body += "-" * 70 + "\n\n" for error in errors: body += error.format_plain_text() body += "\n" body += "-" * 70 + "\n" body += f"Generated by AlpineBits Email Monitoring at {now.strftime('%Y-%m-%d %H:%M:%S')}\n" # Send email try: success = await self.email_service.send_alert( recipients=self.recipients, subject=subject, body=body, ) if success: _LOGGER.info( "Email alert sent successfully: %d errors to %s", error_count, self.recipients, ) else: _LOGGER.error("Failed to send email alert for %d errors", error_count) except Exception: _LOGGER.exception("Exception while sending email alert") def close(self) -> None: """Close the handler and flush any remaining errors. This is called when the logging system shuts down. """ # Cancel any pending flush tasks if self._flush_task and not self._flush_task.done(): self._flush_task.cancel() # Flush any remaining errors immediately if self.error_buffer and self.loop: try: # Check if the loop is still running if not self.loop.is_closed(): future = asyncio.run_coroutine_threadsafe( self._flush_buffer(immediate=False), self.loop, ) future.result(timeout=5) else: _LOGGER.warning( "Event loop closed, cannot flush %d remaining errors", len(self.error_buffer), ) except Exception: _LOGGER.exception("Error flushing buffer on close") super().close() class DailyReportScheduler: """Scheduler for sending daily reports at configured times. This runs as a background task and sends daily reports containing statistics and error summaries. """ def __init__( self, email_service: EmailService, config: dict[str, Any], ): """Initialize the daily report scheduler. Args: email_service: Email service for sending reports config: Configuration for daily reports """ self.email_service = email_service self.config = config self.recipients = config.get("recipients", []) self.send_time = config.get("send_time", "08:00") # Default 8 AM self.include_stats = config.get("include_stats", True) self.include_errors = config.get("include_errors", True) self._task: asyncio.Task | None = None self._stats_collector = None # Will be set by application self._error_log: list[dict[str, Any]] = [] _LOGGER.info( "DailyReportScheduler initialized: send_time=%s, recipients=%s", self.send_time, self.recipients, ) def start(self) -> None: """Start the daily report scheduler.""" if self._task is None or self._task.done(): self._task = asyncio.create_task(self._run()) _LOGGER.info("Daily report scheduler started") def stop(self) -> None: """Stop the daily report scheduler.""" if self._task and not self._task.done(): self._task.cancel() _LOGGER.info("Daily report scheduler stopped") def log_error(self, error: dict[str, Any]) -> None: """Log an error for inclusion in daily report. Args: error: Error information dictionary """ self._error_log.append(error) async def _run(self) -> None: """Run the daily report scheduler loop.""" while True: try: # Calculate time until next report now = datetime.now() target_hour, target_minute = map(int, self.send_time.split(":")) # Calculate next send time next_send = now.replace( hour=target_hour, minute=target_minute, second=0, microsecond=0, ) # If time has passed today, schedule for tomorrow if next_send <= now: next_send += timedelta(days=1) # Calculate sleep duration sleep_seconds = (next_send - now).total_seconds() _LOGGER.info( "Next daily report scheduled for %s (in %.1f hours)", next_send.strftime("%Y-%m-%d %H:%M:%S"), sleep_seconds / 3600, ) # Wait until send time await asyncio.sleep(sleep_seconds) # Send report await self._send_report() except asyncio.CancelledError: _LOGGER.info("Daily report scheduler cancelled") break except Exception: _LOGGER.exception("Error in daily report scheduler") # Sleep a bit before retrying await asyncio.sleep(60) async def _send_report(self) -> None: """Send the daily report.""" stats = {} # Collect statistics if enabled if self.include_stats and self._stats_collector: try: stats = await self._stats_collector() except Exception: _LOGGER.exception("Error collecting statistics for daily report") # Get errors if enabled errors = self._error_log.copy() if self.include_errors else None # Send report try: success = await self.email_service.send_daily_report( recipients=self.recipients, stats=stats, errors=errors, ) if success: _LOGGER.info("Daily report sent successfully to %s", self.recipients) # Clear error log after successful send self._error_log.clear() else: _LOGGER.error("Failed to send daily report") except Exception: _LOGGER.exception("Exception while sending daily report") def set_stats_collector(self, collector) -> None: """Set the statistics collector function. Args: collector: Async function that returns statistics dictionary """ self._stats_collector = collector class ReservationStatsCollector: """Collects reservation statistics per hotel for daily reports. This collector queries the database for reservations created since the last report and aggregates them by hotel. It includes hotel_code and hotel_name from the configuration. """ def __init__( self, async_sessionmaker: async_sessionmaker, config: dict[str, Any], ): """Initialize the stats collector. Args: async_sessionmaker: SQLAlchemy async session maker config: Application configuration containing hotel information """ self.async_sessionmaker = async_sessionmaker self.config = config self._last_report_time = datetime.now() # Build hotel mapping from config self._hotel_map = {} for hotel in config.get("alpine_bits_auth", []): hotel_id = hotel.get("hotel_id") hotel_name = hotel.get("hotel_name") if hotel_id: self._hotel_map[hotel_id] = hotel_name or "Unknown Hotel" _LOGGER.info( "ReservationStatsCollector initialized with %d hotels", len(self._hotel_map), ) async def collect_stats(self, lookback_hours: int | None = None) -> dict[str, Any]: """Collect reservation statistics for the reporting period. Args: lookback_hours: Optional override to look back N hours from now. If None, uses time since last report. Returns: Dictionary with statistics including reservations per hotel """ now = datetime.now() if lookback_hours is not None: # Override mode: look back N hours from now period_start = now - timedelta(hours=lookback_hours) period_end = now else: # Normal mode: since last report period_start = self._last_report_time period_end = now _LOGGER.info( "Collecting reservation stats from %s to %s", period_start.strftime("%Y-%m-%d %H:%M:%S"), period_end.strftime("%Y-%m-%d %H:%M:%S"), ) async with self.async_sessionmaker() as session: # Query reservations created in the reporting period result = await session.execute( select(Reservation.hotel_code, func.count(Reservation.id)) .where(Reservation.created_at >= period_start) .where(Reservation.created_at < period_end) .group_by(Reservation.hotel_code) ) hotel_counts = dict(result.all()) # Build stats with hotel names from config hotels_stats = [] total_reservations = 0 for hotel_code, count in hotel_counts.items(): hotel_name = self._hotel_map.get(hotel_code, "Unknown Hotel") hotels_stats.append( { "hotel_code": hotel_code, "hotel_name": hotel_name, "reservations": count, } ) total_reservations += count # Sort by reservation count descending hotels_stats.sort(key=lambda x: x["reservations"], reverse=True) # Update last report time only in normal mode (not lookback mode) if lookback_hours is None: self._last_report_time = now stats = { "reporting_period": { "start": period_start.strftime("%Y-%m-%d %H:%M:%S"), "end": period_end.strftime("%Y-%m-%d %H:%M:%S"), }, "total_reservations": total_reservations, "hotels": hotels_stats, } _LOGGER.info( "Collected stats: %d total reservations across %d hotels", total_reservations, len(hotels_stats), ) return stats