Files
alpinebits_python/src/alpine_bits_python/unified_monitoring.py
2025-10-17 19:47:15 +02:00

391 lines
14 KiB
Python

"""Unified monitoring with support for multiple notification backends.
This module provides alert handlers and schedulers that work with the
unified notification service to send alerts through multiple channels.
"""
import asyncio
import logging
import threading
from collections import deque
from datetime import datetime, timedelta
from typing import Any
from .email_monitoring import ErrorRecord, ReservationStatsCollector
from .logging_config import get_logger
from .notification_service import NotificationService
_LOGGER = get_logger(__name__)
class UnifiedAlertHandler(logging.Handler):
"""Custom logging handler that sends alerts through unified notification service.
This handler uses a hybrid approach:
- Accumulates errors in a buffer
- Sends immediately if error threshold is reached
- Otherwise sends after buffer duration expires
- Always sends buffered errors (no minimum threshold for time-based flush)
- Implements cooldown to prevent alert spam
The handler is thread-safe and works with asyncio event loops.
"""
def __init__(
self,
notification_service: NotificationService,
config: dict[str, Any],
loop: asyncio.AbstractEventLoop | None = None,
):
"""Initialize the unified alert handler.
Args:
notification_service: Unified notification service
config: Configuration dictionary for error alerts
loop: Asyncio event loop (will use current loop if not provided)
"""
super().__init__()
self.notification_service = notification_service
self.config = config
self.loop = loop # Will be set when first error occurs if not provided
# Configuration
self.error_threshold = config.get("error_threshold", 5)
self.buffer_minutes = config.get("buffer_minutes", 15)
self.cooldown_minutes = config.get("cooldown_minutes", 15)
self.log_levels = config.get("log_levels", ["ERROR", "CRITICAL"])
# State
self.error_buffer: deque[ErrorRecord] = deque()
self.last_sent = datetime.min # Last time we sent an alert
self._flush_task: asyncio.Task | None = None
self._lock = threading.Lock() # Thread-safe for multi-threaded logging
_LOGGER.info(
"UnifiedAlertHandler initialized: threshold=%d, buffer=%dmin, cooldown=%dmin",
self.error_threshold,
self.buffer_minutes,
self.cooldown_minutes,
)
def emit(self, record: logging.LogRecord) -> None:
"""Handle a log record.
This is called automatically by the logging system when an error is logged.
It's important that this method is fast and doesn't block.
Args:
record: The log record to handle
"""
# Only handle configured log levels
if record.levelname not in self.log_levels:
return
try:
# Ensure we have an event loop
if self.loop is None:
try:
self.loop = asyncio.get_running_loop()
except RuntimeError:
# No running loop, we'll need to handle this differently
_LOGGER.warning("No asyncio event loop available for alerts")
return
# Add error to buffer (thread-safe)
with self._lock:
error_record = ErrorRecord(record)
self.error_buffer.append(error_record)
buffer_size = len(self.error_buffer)
# Determine if we should send immediately
should_send_immediately = buffer_size >= self.error_threshold
if should_send_immediately:
# Cancel any pending flush task
if self._flush_task and not self._flush_task.done():
self._flush_task.cancel()
# Schedule immediate flush
self._flush_task = asyncio.run_coroutine_threadsafe(
self._flush_buffer(immediate=True),
self.loop,
)
# Schedule delayed flush if not already scheduled
elif not self._flush_task or self._flush_task.done():
self._flush_task = asyncio.run_coroutine_threadsafe(
self._schedule_delayed_flush(),
self.loop,
)
except Exception:
# Never let the handler crash - just log and continue
_LOGGER.exception("Error in UnifiedAlertHandler.emit")
async def _schedule_delayed_flush(self) -> None:
"""Schedule a delayed buffer flush after buffer duration."""
await asyncio.sleep(self.buffer_minutes * 60)
await self._flush_buffer(immediate=False)
async def _flush_buffer(self, *, immediate: bool) -> None:
"""Flush the error buffer and send alert.
Args:
immediate: Whether this is an immediate flush (threshold hit)
"""
# Check cooldown period
now = datetime.now()
time_since_last = (now - self.last_sent).total_seconds() / 60
if time_since_last < self.cooldown_minutes:
_LOGGER.info(
"Alert cooldown active (%.1f min remaining), buffering errors",
self.cooldown_minutes - time_since_last,
)
# Don't clear buffer - let errors accumulate until cooldown expires
return
# Get all buffered errors (thread-safe)
with self._lock:
if not self.error_buffer:
return
errors = list(self.error_buffer)
self.error_buffer.clear()
# Update last sent time
self.last_sent = now
# Format alert
error_count = len(errors)
time_range = (
f"{errors[0].timestamp.strftime('%H:%M:%S')} to "
f"{errors[-1].timestamp.strftime('%H:%M:%S')}"
)
# Determine alert type
alert_type = "Immediate Alert" if immediate else "Scheduled Alert"
if immediate:
reason = f"(threshold of {self.error_threshold} exceeded)"
else:
reason = f"({self.buffer_minutes} minute buffer)"
title = f"AlpineBits Error {alert_type}: {error_count} errors {reason}"
# Build message
message = f"Error Alert - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
message += "=" * 70 + "\n\n"
message += f"Alert Type: {alert_type}\n"
message += f"Error Count: {error_count}\n"
message += f"Time Range: {time_range}\n"
message += f"Reason: {reason}\n"
message += "\n" + "=" * 70 + "\n\n"
# Add individual errors
message += "Errors:\n"
message += "-" * 70 + "\n\n"
for error in errors:
message += error.format_plain_text()
message += "\n"
message += "-" * 70 + "\n"
message += f"Generated by AlpineBits Monitoring at {now.strftime('%Y-%m-%d %H:%M:%S')}\n"
# Send through unified notification service
try:
results = await self.notification_service.send_alert(
title=title,
message=message,
backends=None, # Send to all backends
)
success_count = sum(1 for success in results.values() if success)
if success_count > 0:
_LOGGER.info(
"Alert sent successfully through %d/%d backend(s): %d errors",
success_count,
len(results),
error_count,
)
else:
_LOGGER.error("Failed to send alert through any backend: %d errors", error_count)
except Exception:
_LOGGER.exception("Exception while sending alert")
def close(self) -> None:
"""Close the handler and flush any remaining errors.
This is called when the logging system shuts down.
"""
# Cancel any pending flush tasks
if self._flush_task and not self._flush_task.done():
self._flush_task.cancel()
# Flush any remaining errors immediately
if self.error_buffer and self.loop:
try:
# Check if the loop is still running
if not self.loop.is_closed():
future = asyncio.run_coroutine_threadsafe(
self._flush_buffer(immediate=False),
self.loop,
)
future.result(timeout=5)
else:
_LOGGER.warning(
"Event loop closed, cannot flush %d remaining errors",
len(self.error_buffer),
)
except Exception:
_LOGGER.exception("Error flushing buffer on close")
super().close()
class UnifiedDailyReportScheduler:
"""Scheduler for sending daily reports through unified notification service.
This runs as a background task and sends daily reports containing
statistics and error summaries through all configured notification backends.
"""
def __init__(
self,
notification_service: NotificationService,
config: dict[str, Any],
):
"""Initialize the unified daily report scheduler.
Args:
notification_service: Unified notification service
config: Configuration for daily reports
"""
self.notification_service = notification_service
self.config = config
self.send_time = config.get("send_time", "08:00") # Default 8 AM
self.include_stats = config.get("include_stats", True)
self.include_errors = config.get("include_errors", True)
self._task: asyncio.Task | None = None
self._stats_collector = None # Will be set by application
self._error_log: list[dict[str, Any]] = []
_LOGGER.info(
"UnifiedDailyReportScheduler initialized: send_time=%s",
self.send_time,
)
def start(self) -> None:
"""Start the daily report scheduler."""
if self._task is None or self._task.done():
self._task = asyncio.create_task(self._run())
_LOGGER.info("Daily report scheduler started")
def stop(self) -> None:
"""Stop the daily report scheduler."""
if self._task and not self._task.done():
self._task.cancel()
_LOGGER.info("Daily report scheduler stopped")
def log_error(self, error: dict[str, Any]) -> None:
"""Log an error for inclusion in daily report.
Args:
error: Error information dictionary
"""
self._error_log.append(error)
async def _run(self) -> None:
"""Run the daily report scheduler loop."""
while True:
try:
# Calculate time until next report
now = datetime.now()
target_hour, target_minute = map(int, self.send_time.split(":"))
# Calculate next send time
next_send = now.replace(
hour=target_hour,
minute=target_minute,
second=0,
microsecond=0,
)
# If time has passed today, schedule for tomorrow
if next_send <= now:
next_send += timedelta(days=1)
# Calculate sleep duration
sleep_seconds = (next_send - now).total_seconds()
_LOGGER.info(
"Next daily report scheduled for %s (in %.1f hours)",
next_send.strftime("%Y-%m-%d %H:%M:%S"),
sleep_seconds / 3600,
)
# Wait until send time
await asyncio.sleep(sleep_seconds)
# Send report
await self._send_report()
except asyncio.CancelledError:
_LOGGER.info("Daily report scheduler cancelled")
break
except Exception:
_LOGGER.exception("Error in daily report scheduler")
# Sleep a bit before retrying
await asyncio.sleep(60)
async def _send_report(self) -> None:
"""Send the daily report."""
stats = {}
# Collect statistics if enabled
if self.include_stats and self._stats_collector:
try:
stats = await self._stats_collector()
except Exception:
_LOGGER.exception("Error collecting statistics for daily report")
# Get errors if enabled
errors = self._error_log.copy() if self.include_errors else None
# Send report through unified notification service
try:
results = await self.notification_service.send_daily_report(
stats=stats,
errors=errors,
backends=None, # Send to all backends
)
success_count = sum(1 for success in results.values() if success)
if success_count > 0:
_LOGGER.info(
"Daily report sent successfully through %d/%d backend(s)",
success_count,
len(results),
)
# Clear error log after successful send
self._error_log.clear()
else:
_LOGGER.error("Failed to send daily report through any backend")
except Exception:
_LOGGER.exception("Exception while sending daily report")
def set_stats_collector(self, collector) -> None:
"""Set the statistics collector function.
Args:
collector: Async function that returns statistics dictionary
"""
self._stats_collector = collector