Updated reporting scheme. Should work better now
This commit is contained in:
@@ -41,7 +41,7 @@ alpine_bits_auth:
|
||||
api_tokens:
|
||||
- tLTI8wXF1OVEvUX7kdZRhSW3Qr5feBCz0mHo-kbnEp0
|
||||
|
||||
# Email configuration for monitoring and alerts
|
||||
# Email configuration (SMTP service config - kept for when port is unblocked)
|
||||
email:
|
||||
# SMTP server configuration
|
||||
smtp:
|
||||
@@ -56,52 +56,32 @@ email:
|
||||
from_address: "info@99tales.net" # Sender address
|
||||
from_name: "AlpineBits Monitor" # Sender display name
|
||||
|
||||
# Monitoring and alerting
|
||||
monitoring:
|
||||
# Daily report configuration
|
||||
daily_report:
|
||||
enabled: false # Set to true to enable daily reports
|
||||
recipients:
|
||||
- "jonas@vaius.ai"
|
||||
#- "dev@99tales.com"
|
||||
send_time: "08:00" # Time to send daily report (24h format, local time)
|
||||
include_stats: true # Include reservation/customer stats
|
||||
include_errors: true # Include error summary
|
||||
|
||||
# Error alert configuration (hybrid approach)
|
||||
error_alerts:
|
||||
enabled: false # Set to true to enable error alerts
|
||||
recipients:
|
||||
- "jonas@vaius.ai"
|
||||
#- "oncall@99tales.com"
|
||||
# Alert is sent immediately if threshold is reached
|
||||
error_threshold: 5 # Send immediate alert after N errors
|
||||
# Otherwise, alert is sent after buffer time expires
|
||||
buffer_minutes: 15 # Wait N minutes before sending buffered errors
|
||||
# Cooldown period to prevent alert spam
|
||||
cooldown_minutes: 15 # Wait N min before sending another alert
|
||||
# Error severity levels to monitor
|
||||
log_levels:
|
||||
- "ERROR"
|
||||
- "CRITICAL"
|
||||
|
||||
# Pushover configuration for push notifications (alternative to email)
|
||||
# Pushover configuration (push notification service config)
|
||||
pushover:
|
||||
# Pushover API credentials (get from https://pushover.net)
|
||||
user_key: !secret PUSHOVER_USER_KEY # Your user/group key
|
||||
api_token: !secret PUSHOVER_API_TOKEN # Your application API token
|
||||
|
||||
# Monitoring and alerting (same structure as email)
|
||||
monitoring:
|
||||
# Daily report configuration
|
||||
# Unified notification system - recipient-based routing
|
||||
notifications:
|
||||
# Recipients and their preferred notification methods
|
||||
recipients:
|
||||
- name: "jonas"
|
||||
methods:
|
||||
# Uncomment email when port is unblocked
|
||||
#- type: "email"
|
||||
# address: "jonas@vaius.ai"
|
||||
- type: "pushover"
|
||||
priority: 1 # Pushover priority: -2=lowest, -1=low, 0=normal, 1=high, 2=emergency
|
||||
|
||||
# Daily report configuration (applies to all recipients)
|
||||
daily_report:
|
||||
enabled: true # Set to true to enable daily reports
|
||||
send_time: "08:00" # Time to send daily report (24h format, local time)
|
||||
include_stats: true # Include reservation/customer stats
|
||||
include_errors: true # Include error summary
|
||||
priority: 0 # Pushover priority: -2=lowest, -1=low, 0=normal, 1=high, 2=emergency
|
||||
|
||||
# Error alert configuration (hybrid approach)
|
||||
# Error alert configuration (applies to all recipients)
|
||||
error_alerts:
|
||||
enabled: true # Set to true to enable error alerts
|
||||
# Alert is sent immediately if threshold is reached
|
||||
@@ -114,4 +94,3 @@ pushover:
|
||||
log_levels:
|
||||
- "ERROR"
|
||||
- "CRITICAL"
|
||||
priority: 1 # Pushover priority: -2=lowest, -1=low, 0=normal, 1=high, 2=emergency
|
||||
|
||||
@@ -44,8 +44,6 @@ from .email_monitoring import ReservationStatsCollector
|
||||
from .email_service import create_email_service
|
||||
from .logging_config import get_logger, setup_logging
|
||||
from .migrations import run_all_migrations
|
||||
from .notification_adapters import EmailNotificationAdapter, PushoverNotificationAdapter
|
||||
from .notification_service import NotificationService
|
||||
from .pushover_service import create_pushover_service
|
||||
from .rate_limit import (
|
||||
BURST_RATE_LIMIT,
|
||||
@@ -236,9 +234,9 @@ async def lifespan(app: FastAPI):
|
||||
# Initialize pushover service
|
||||
pushover_service = create_pushover_service(config)
|
||||
|
||||
# Setup logging from config with email and pushover monitoring
|
||||
# Setup logging from config with unified notification monitoring
|
||||
# Only primary worker should have the report scheduler running
|
||||
email_handler, report_scheduler = setup_logging(
|
||||
alert_handler, report_scheduler = setup_logging(
|
||||
config, email_service, pushover_service, loop, enable_scheduler=is_primary
|
||||
)
|
||||
_LOGGER.info("Application startup initiated (primary_worker=%s)", is_primary)
|
||||
@@ -254,7 +252,7 @@ async def lifespan(app: FastAPI):
|
||||
app.state.event_dispatcher = event_dispatcher
|
||||
app.state.email_service = email_service
|
||||
app.state.pushover_service = pushover_service
|
||||
app.state.email_handler = email_handler
|
||||
app.state.alert_handler = alert_handler
|
||||
app.state.report_scheduler = report_scheduler
|
||||
|
||||
# Register push listeners for hotels with push_endpoint
|
||||
@@ -313,44 +311,6 @@ async def lifespan(app: FastAPI):
|
||||
report_scheduler.set_stats_collector(stats_collector.collect_stats)
|
||||
_LOGGER.info("Stats collector initialized and hooked up to report scheduler")
|
||||
|
||||
# Send a test daily report on startup for testing (with 24-hour lookback)
|
||||
_LOGGER.info("Sending test daily report on startup (last 24 hours)")
|
||||
try:
|
||||
# Use lookback_hours=24 to get stats from last 24 hours
|
||||
stats = await stats_collector.collect_stats(lookback_hours=24)
|
||||
|
||||
# Send via email (if configured)
|
||||
if email_service:
|
||||
success = await email_service.send_daily_report(
|
||||
recipients=report_scheduler.recipients,
|
||||
stats=stats,
|
||||
errors=None,
|
||||
)
|
||||
if success:
|
||||
_LOGGER.info("Test daily report sent via email successfully on startup")
|
||||
else:
|
||||
_LOGGER.error("Failed to send test daily report via email on startup")
|
||||
|
||||
# Send via Pushover (if configured)
|
||||
if pushover_service:
|
||||
pushover_config = config.get("pushover", {})
|
||||
pushover_monitoring = pushover_config.get("monitoring", {})
|
||||
pushover_daily_report = pushover_monitoring.get("daily_report", {})
|
||||
priority = pushover_daily_report.get("priority", 0)
|
||||
|
||||
success = await pushover_service.send_daily_report(
|
||||
stats=stats,
|
||||
errors=None,
|
||||
priority=priority,
|
||||
)
|
||||
if success:
|
||||
_LOGGER.info("Test daily report sent via Pushover successfully on startup")
|
||||
else:
|
||||
_LOGGER.error("Failed to send test daily report via Pushover on startup")
|
||||
|
||||
except Exception:
|
||||
_LOGGER.exception("Error sending test daily report on startup")
|
||||
|
||||
# Start daily report scheduler
|
||||
report_scheduler.start()
|
||||
_LOGGER.info("Daily report scheduler started")
|
||||
@@ -367,10 +327,10 @@ async def lifespan(app: FastAPI):
|
||||
report_scheduler.stop()
|
||||
_LOGGER.info("Daily report scheduler stopped")
|
||||
|
||||
# Close email alert handler (flush any remaining errors)
|
||||
if email_handler:
|
||||
email_handler.close()
|
||||
_LOGGER.info("Email alert handler closed")
|
||||
# Close alert handler (flush any remaining errors)
|
||||
if alert_handler:
|
||||
alert_handler.close()
|
||||
_LOGGER.info("Alert handler closed")
|
||||
|
||||
# Shutdown email service thread pool
|
||||
if email_service:
|
||||
|
||||
@@ -192,14 +192,69 @@ pushover_schema = Schema(
|
||||
extra=PREVENT_EXTRA,
|
||||
)
|
||||
|
||||
# Unified notification method schema
|
||||
notification_method_schema = Schema(
|
||||
{
|
||||
Required("type"): In(["email", "pushover"]),
|
||||
Optional("address"): str, # For email
|
||||
Optional("priority"): Range(min=-2, max=2), # For pushover
|
||||
},
|
||||
extra=PREVENT_EXTRA,
|
||||
)
|
||||
|
||||
# Unified notification recipient schema
|
||||
notification_recipient_schema = Schema(
|
||||
{
|
||||
Required("name"): str,
|
||||
Required("methods"): [notification_method_schema],
|
||||
},
|
||||
extra=PREVENT_EXTRA,
|
||||
)
|
||||
|
||||
# Unified daily report configuration schema (without recipients)
|
||||
unified_daily_report_schema = Schema(
|
||||
{
|
||||
Required("enabled", default=False): Boolean(),
|
||||
Required("send_time", default="08:00"): str,
|
||||
Required("include_stats", default=True): Boolean(),
|
||||
Required("include_errors", default=True): Boolean(),
|
||||
},
|
||||
extra=PREVENT_EXTRA,
|
||||
)
|
||||
|
||||
# Unified error alerts configuration schema (without recipients)
|
||||
unified_error_alerts_schema = Schema(
|
||||
{
|
||||
Required("enabled", default=False): Boolean(),
|
||||
Required("error_threshold", default=5): Range(min=1),
|
||||
Required("buffer_minutes", default=15): Range(min=1),
|
||||
Required("cooldown_minutes", default=15): Range(min=0),
|
||||
Required("log_levels", default=["ERROR", "CRITICAL"]): [
|
||||
In(["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"])
|
||||
],
|
||||
},
|
||||
extra=PREVENT_EXTRA,
|
||||
)
|
||||
|
||||
# Unified notifications configuration schema
|
||||
notifications_schema = Schema(
|
||||
{
|
||||
Required("recipients", default=[]): [notification_recipient_schema],
|
||||
Optional("daily_report", default={}): unified_daily_report_schema,
|
||||
Optional("error_alerts", default={}): unified_error_alerts_schema,
|
||||
},
|
||||
extra=PREVENT_EXTRA,
|
||||
)
|
||||
|
||||
config_schema = Schema(
|
||||
{
|
||||
Required(CONF_DATABASE): database_schema,
|
||||
Required(CONF_ALPINE_BITS_AUTH): basic_auth_schema,
|
||||
Required(CONF_SERVER): server_info,
|
||||
Required(CONF_LOGGING): logger_schema,
|
||||
Optional("email"): email_schema, # Email is optional
|
||||
Optional("pushover"): pushover_schema, # Pushover is optional
|
||||
Optional("email"): email_schema, # Email is optional (service config only)
|
||||
Optional("pushover"): pushover_schema, # Pushover is optional (service config only)
|
||||
Optional("notifications"): notifications_schema, # Unified notification config
|
||||
Optional("api_tokens", default=[]): [str], # API tokens for bearer auth
|
||||
},
|
||||
extra=PREVENT_EXTRA,
|
||||
|
||||
@@ -25,7 +25,7 @@ def setup_logging(
|
||||
pushover_service: "PushoverService | None" = None,
|
||||
loop: asyncio.AbstractEventLoop | None = None,
|
||||
enable_scheduler: bool = True,
|
||||
) -> tuple["EmailAlertHandler | None", "DailyReportScheduler | None"]:
|
||||
) -> tuple[logging.Handler | None, object | None]:
|
||||
"""Configure logging based on application config.
|
||||
|
||||
Args:
|
||||
@@ -37,7 +37,7 @@ def setup_logging(
|
||||
(should be False for non-primary workers)
|
||||
|
||||
Returns:
|
||||
Tuple of (email_alert_handler, daily_report_scheduler) if monitoring
|
||||
Tuple of (alert_handler, daily_report_scheduler) if monitoring
|
||||
is enabled, otherwise (None, None)
|
||||
|
||||
Logger config format:
|
||||
@@ -92,88 +92,67 @@ def setup_logging(
|
||||
|
||||
root_logger.info("Logging configured at %s level", level)
|
||||
|
||||
# Setup notification monitoring if configured
|
||||
email_handler = None
|
||||
# Setup unified notification monitoring if configured
|
||||
alert_handler = None
|
||||
report_scheduler = None
|
||||
|
||||
# Setup email monitoring if configured
|
||||
if email_service:
|
||||
email_config = config.get("email", {})
|
||||
monitoring_config = email_config.get("monitoring", {})
|
||||
|
||||
# Setup error alert handler
|
||||
error_alerts_config = monitoring_config.get("error_alerts", {})
|
||||
if error_alerts_config.get("enabled", False):
|
||||
# Check if unified notifications are configured
|
||||
notifications_config = config.get("notifications", {})
|
||||
if notifications_config and (email_service or pushover_service):
|
||||
try:
|
||||
# Import here to avoid circular dependencies
|
||||
from alpine_bits_python.email_monitoring import EmailAlertHandler
|
||||
from alpine_bits_python.notification_manager import (
|
||||
get_notification_config,
|
||||
setup_notification_service,
|
||||
)
|
||||
from alpine_bits_python.unified_monitoring import (
|
||||
UnifiedAlertHandler,
|
||||
UnifiedDailyReportScheduler,
|
||||
)
|
||||
|
||||
email_handler = EmailAlertHandler(
|
||||
# Setup unified notification service
|
||||
notification_service = setup_notification_service(
|
||||
config=config,
|
||||
email_service=email_service,
|
||||
pushover_service=pushover_service,
|
||||
)
|
||||
|
||||
if notification_service:
|
||||
# Setup error alert handler
|
||||
error_alerts_config = get_notification_config("error_alerts", config)
|
||||
if error_alerts_config.get("enabled", False):
|
||||
try:
|
||||
alert_handler = UnifiedAlertHandler(
|
||||
notification_service=notification_service,
|
||||
config=error_alerts_config,
|
||||
loop=loop,
|
||||
)
|
||||
email_handler.setLevel(logging.ERROR)
|
||||
root_logger.addHandler(email_handler)
|
||||
root_logger.info("Email alert handler enabled for error monitoring")
|
||||
alert_handler.setLevel(logging.ERROR)
|
||||
root_logger.addHandler(alert_handler)
|
||||
root_logger.info("Unified alert handler enabled for error monitoring")
|
||||
except Exception:
|
||||
root_logger.exception("Failed to setup email alert handler")
|
||||
root_logger.exception("Failed to setup unified alert handler")
|
||||
|
||||
# Setup daily report scheduler (only if enabled and this is primary worker)
|
||||
daily_report_config = monitoring_config.get("daily_report", {})
|
||||
daily_report_config = get_notification_config("daily_report", config)
|
||||
if daily_report_config.get("enabled", False) and enable_scheduler:
|
||||
try:
|
||||
# Import here to avoid circular dependencies
|
||||
from alpine_bits_python.email_monitoring import DailyReportScheduler
|
||||
|
||||
report_scheduler = DailyReportScheduler(
|
||||
email_service=email_service,
|
||||
report_scheduler = UnifiedDailyReportScheduler(
|
||||
notification_service=notification_service,
|
||||
config=daily_report_config,
|
||||
)
|
||||
root_logger.info("Daily report scheduler configured (primary worker)")
|
||||
root_logger.info("Unified daily report scheduler configured (primary worker)")
|
||||
except Exception:
|
||||
root_logger.exception("Failed to setup daily report scheduler")
|
||||
root_logger.exception("Failed to setup unified daily report scheduler")
|
||||
elif daily_report_config.get("enabled", False) and not enable_scheduler:
|
||||
root_logger.info(
|
||||
"Daily report scheduler disabled (non-primary worker)"
|
||||
"Unified daily report scheduler disabled (non-primary worker)"
|
||||
)
|
||||
|
||||
# Check if Pushover daily reports are enabled
|
||||
# If so and no report_scheduler exists yet, create one
|
||||
if pushover_service and not report_scheduler:
|
||||
pushover_config = config.get("pushover", {})
|
||||
pushover_monitoring = pushover_config.get("monitoring", {})
|
||||
pushover_daily_report = pushover_monitoring.get("daily_report", {})
|
||||
|
||||
if pushover_daily_report.get("enabled", False) and enable_scheduler:
|
||||
try:
|
||||
# Import here to avoid circular dependencies
|
||||
from alpine_bits_python.email_monitoring import DailyReportScheduler
|
||||
|
||||
# Create a dummy config for the scheduler
|
||||
# (it doesn't need email-specific fields if email is disabled)
|
||||
scheduler_config = {
|
||||
"send_time": pushover_daily_report.get("send_time", "08:00"),
|
||||
"include_stats": pushover_daily_report.get("include_stats", True),
|
||||
"include_errors": pushover_daily_report.get("include_errors", True),
|
||||
"recipients": [], # Not used for Pushover
|
||||
}
|
||||
|
||||
report_scheduler = DailyReportScheduler(
|
||||
email_service=email_service, # Can be None
|
||||
config=scheduler_config,
|
||||
)
|
||||
root_logger.info(
|
||||
"Daily report scheduler configured for Pushover (primary worker)"
|
||||
)
|
||||
except Exception:
|
||||
root_logger.exception("Failed to setup Pushover daily report scheduler")
|
||||
elif pushover_daily_report.get("enabled", False) and not enable_scheduler:
|
||||
root_logger.info(
|
||||
"Pushover daily report scheduler disabled (non-primary worker)"
|
||||
)
|
||||
root_logger.exception("Failed to setup unified notification monitoring")
|
||||
|
||||
return email_handler, report_scheduler
|
||||
return alert_handler, report_scheduler
|
||||
|
||||
|
||||
def get_logger(name: str) -> logging.Logger:
|
||||
|
||||
156
src/alpine_bits_python/notification_manager.py
Normal file
156
src/alpine_bits_python/notification_manager.py
Normal file
@@ -0,0 +1,156 @@
|
||||
"""Unified notification manager for setting up recipient-based notification routing.
|
||||
|
||||
This module provides helpers to initialize the unified notification system
|
||||
based on the recipients configuration.
|
||||
"""
|
||||
|
||||
from typing import Any
|
||||
|
||||
from .email_service import EmailService
|
||||
from .logging_config import get_logger
|
||||
from .notification_adapters import EmailNotificationAdapter, PushoverNotificationAdapter
|
||||
from .notification_service import NotificationService
|
||||
from .pushover_service import PushoverService
|
||||
|
||||
_LOGGER = get_logger(__name__)
|
||||
|
||||
|
||||
def setup_notification_service(
|
||||
config: dict[str, Any],
|
||||
email_service: EmailService | None = None,
|
||||
pushover_service: PushoverService | None = None,
|
||||
) -> NotificationService | None:
|
||||
"""Set up unified notification service from config.
|
||||
|
||||
Args:
|
||||
config: Full configuration dictionary
|
||||
email_service: Optional EmailService instance
|
||||
pushover_service: Optional PushoverService instance
|
||||
|
||||
Returns:
|
||||
NotificationService instance, or None if no recipients configured
|
||||
|
||||
"""
|
||||
notifications_config = config.get("notifications", {})
|
||||
recipients = notifications_config.get("recipients", [])
|
||||
|
||||
if not recipients:
|
||||
_LOGGER.info("No notification recipients configured")
|
||||
return None
|
||||
|
||||
notification_service = NotificationService()
|
||||
|
||||
# Process each recipient and their methods
|
||||
for recipient in recipients:
|
||||
recipient_name = recipient.get("name", "unknown")
|
||||
methods = recipient.get("methods", [])
|
||||
|
||||
for method in methods:
|
||||
method_type = method.get("type")
|
||||
|
||||
if method_type == "email":
|
||||
if not email_service:
|
||||
_LOGGER.warning(
|
||||
"Email method configured for %s but email service not available",
|
||||
recipient_name,
|
||||
)
|
||||
continue
|
||||
|
||||
email_address = method.get("address")
|
||||
if not email_address:
|
||||
_LOGGER.warning(
|
||||
"Email method for %s missing address", recipient_name
|
||||
)
|
||||
continue
|
||||
|
||||
# Create a unique backend name for this recipient's email
|
||||
backend_name = f"email_{recipient_name}"
|
||||
|
||||
# Check if we already have an email backend
|
||||
if not notification_service.has_backend("email"):
|
||||
# Create email adapter with all email recipients
|
||||
email_recipients = []
|
||||
for r in recipients:
|
||||
for m in r.get("methods", []):
|
||||
if m.get("type") == "email" and m.get("address"):
|
||||
email_recipients.append(m.get("address"))
|
||||
|
||||
if email_recipients:
|
||||
email_adapter = EmailNotificationAdapter(
|
||||
email_service, email_recipients
|
||||
)
|
||||
notification_service.register_backend("email", email_adapter)
|
||||
_LOGGER.info(
|
||||
"Registered email backend with %d recipient(s)",
|
||||
len(email_recipients),
|
||||
)
|
||||
|
||||
elif method_type == "pushover":
|
||||
if not pushover_service:
|
||||
_LOGGER.warning(
|
||||
"Pushover method configured for %s but pushover service not available",
|
||||
recipient_name,
|
||||
)
|
||||
continue
|
||||
|
||||
priority = method.get("priority", 0)
|
||||
|
||||
# Check if we already have a pushover backend
|
||||
if not notification_service.has_backend("pushover"):
|
||||
# Pushover sends to user_key configured in pushover service
|
||||
pushover_adapter = PushoverNotificationAdapter(
|
||||
pushover_service, priority
|
||||
)
|
||||
notification_service.register_backend("pushover", pushover_adapter)
|
||||
_LOGGER.info("Registered pushover backend with priority %d", priority)
|
||||
|
||||
if not notification_service.backends:
|
||||
_LOGGER.warning("No notification backends could be configured")
|
||||
return None
|
||||
|
||||
_LOGGER.info(
|
||||
"Notification service configured with backends: %s",
|
||||
list(notification_service.backends.keys()),
|
||||
)
|
||||
return notification_service
|
||||
|
||||
|
||||
def get_enabled_backends(
|
||||
notification_type: str, config: dict[str, Any]
|
||||
) -> list[str] | None:
|
||||
"""Get list of enabled backends for a notification type.
|
||||
|
||||
Args:
|
||||
notification_type: "daily_report" or "error_alerts"
|
||||
config: Full configuration dictionary
|
||||
|
||||
Returns:
|
||||
List of backend names to use, or None for all backends
|
||||
|
||||
"""
|
||||
notifications_config = config.get("notifications", {})
|
||||
notification_config = notifications_config.get(notification_type, {})
|
||||
|
||||
if not notification_config.get("enabled", False):
|
||||
return []
|
||||
|
||||
# Return None to indicate all backends should be used
|
||||
# The NotificationService will send to all registered backends
|
||||
return None
|
||||
|
||||
|
||||
def get_notification_config(
|
||||
notification_type: str, config: dict[str, Any]
|
||||
) -> dict[str, Any]:
|
||||
"""Get configuration for a specific notification type.
|
||||
|
||||
Args:
|
||||
notification_type: "daily_report" or "error_alerts"
|
||||
config: Full configuration dictionary
|
||||
|
||||
Returns:
|
||||
Configuration dictionary for the notification type
|
||||
|
||||
"""
|
||||
notifications_config = config.get("notifications", {})
|
||||
return notifications_config.get(notification_type, {})
|
||||
390
src/alpine_bits_python/unified_monitoring.py
Normal file
390
src/alpine_bits_python/unified_monitoring.py
Normal file
@@ -0,0 +1,390 @@
|
||||
"""Unified monitoring with support for multiple notification backends.
|
||||
|
||||
This module provides alert handlers and schedulers that work with the
|
||||
unified notification service to send alerts through multiple channels.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import threading
|
||||
from collections import deque
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any
|
||||
|
||||
from .email_monitoring import ErrorRecord, ReservationStatsCollector
|
||||
from .logging_config import get_logger
|
||||
from .notification_service import NotificationService
|
||||
|
||||
_LOGGER = get_logger(__name__)
|
||||
|
||||
|
||||
class UnifiedAlertHandler(logging.Handler):
|
||||
"""Custom logging handler that sends alerts through unified notification service.
|
||||
|
||||
This handler uses a hybrid approach:
|
||||
- Accumulates errors in a buffer
|
||||
- Sends immediately if error threshold is reached
|
||||
- Otherwise sends after buffer duration expires
|
||||
- Always sends buffered errors (no minimum threshold for time-based flush)
|
||||
- Implements cooldown to prevent alert spam
|
||||
|
||||
The handler is thread-safe and works with asyncio event loops.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
notification_service: NotificationService,
|
||||
config: dict[str, Any],
|
||||
loop: asyncio.AbstractEventLoop | None = None,
|
||||
):
|
||||
"""Initialize the unified alert handler.
|
||||
|
||||
Args:
|
||||
notification_service: Unified notification service
|
||||
config: Configuration dictionary for error alerts
|
||||
loop: Asyncio event loop (will use current loop if not provided)
|
||||
|
||||
"""
|
||||
super().__init__()
|
||||
self.notification_service = notification_service
|
||||
self.config = config
|
||||
self.loop = loop # Will be set when first error occurs if not provided
|
||||
|
||||
# Configuration
|
||||
self.error_threshold = config.get("error_threshold", 5)
|
||||
self.buffer_minutes = config.get("buffer_minutes", 15)
|
||||
self.cooldown_minutes = config.get("cooldown_minutes", 15)
|
||||
self.log_levels = config.get("log_levels", ["ERROR", "CRITICAL"])
|
||||
|
||||
# State
|
||||
self.error_buffer: deque[ErrorRecord] = deque()
|
||||
self.last_sent = datetime.min # Last time we sent an alert
|
||||
self._flush_task: asyncio.Task | None = None
|
||||
self._lock = threading.Lock() # Thread-safe for multi-threaded logging
|
||||
|
||||
_LOGGER.info(
|
||||
"UnifiedAlertHandler initialized: threshold=%d, buffer=%dmin, cooldown=%dmin",
|
||||
self.error_threshold,
|
||||
self.buffer_minutes,
|
||||
self.cooldown_minutes,
|
||||
)
|
||||
|
||||
def emit(self, record: logging.LogRecord) -> None:
|
||||
"""Handle a log record.
|
||||
|
||||
This is called automatically by the logging system when an error is logged.
|
||||
It's important that this method is fast and doesn't block.
|
||||
|
||||
Args:
|
||||
record: The log record to handle
|
||||
|
||||
"""
|
||||
# Only handle configured log levels
|
||||
if record.levelname not in self.log_levels:
|
||||
return
|
||||
|
||||
try:
|
||||
# Ensure we have an event loop
|
||||
if self.loop is None:
|
||||
try:
|
||||
self.loop = asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
# No running loop, we'll need to handle this differently
|
||||
_LOGGER.warning("No asyncio event loop available for alerts")
|
||||
return
|
||||
|
||||
# Add error to buffer (thread-safe)
|
||||
with self._lock:
|
||||
error_record = ErrorRecord(record)
|
||||
self.error_buffer.append(error_record)
|
||||
buffer_size = len(self.error_buffer)
|
||||
|
||||
# Determine if we should send immediately
|
||||
should_send_immediately = buffer_size >= self.error_threshold
|
||||
|
||||
if should_send_immediately:
|
||||
# Cancel any pending flush task
|
||||
if self._flush_task and not self._flush_task.done():
|
||||
self._flush_task.cancel()
|
||||
|
||||
# Schedule immediate flush
|
||||
self._flush_task = asyncio.run_coroutine_threadsafe(
|
||||
self._flush_buffer(immediate=True),
|
||||
self.loop,
|
||||
)
|
||||
# Schedule delayed flush if not already scheduled
|
||||
elif not self._flush_task or self._flush_task.done():
|
||||
self._flush_task = asyncio.run_coroutine_threadsafe(
|
||||
self._schedule_delayed_flush(),
|
||||
self.loop,
|
||||
)
|
||||
|
||||
except Exception:
|
||||
# Never let the handler crash - just log and continue
|
||||
_LOGGER.exception("Error in UnifiedAlertHandler.emit")
|
||||
|
||||
async def _schedule_delayed_flush(self) -> None:
|
||||
"""Schedule a delayed buffer flush after buffer duration."""
|
||||
await asyncio.sleep(self.buffer_minutes * 60)
|
||||
await self._flush_buffer(immediate=False)
|
||||
|
||||
async def _flush_buffer(self, *, immediate: bool) -> None:
|
||||
"""Flush the error buffer and send alert.
|
||||
|
||||
Args:
|
||||
immediate: Whether this is an immediate flush (threshold hit)
|
||||
|
||||
"""
|
||||
# Check cooldown period
|
||||
now = datetime.now()
|
||||
time_since_last = (now - self.last_sent).total_seconds() / 60
|
||||
|
||||
if time_since_last < self.cooldown_minutes:
|
||||
_LOGGER.info(
|
||||
"Alert cooldown active (%.1f min remaining), buffering errors",
|
||||
self.cooldown_minutes - time_since_last,
|
||||
)
|
||||
# Don't clear buffer - let errors accumulate until cooldown expires
|
||||
return
|
||||
|
||||
# Get all buffered errors (thread-safe)
|
||||
with self._lock:
|
||||
if not self.error_buffer:
|
||||
return
|
||||
|
||||
errors = list(self.error_buffer)
|
||||
self.error_buffer.clear()
|
||||
|
||||
# Update last sent time
|
||||
self.last_sent = now
|
||||
|
||||
# Format alert
|
||||
error_count = len(errors)
|
||||
time_range = (
|
||||
f"{errors[0].timestamp.strftime('%H:%M:%S')} to "
|
||||
f"{errors[-1].timestamp.strftime('%H:%M:%S')}"
|
||||
)
|
||||
|
||||
# Determine alert type
|
||||
alert_type = "Immediate Alert" if immediate else "Scheduled Alert"
|
||||
if immediate:
|
||||
reason = f"(threshold of {self.error_threshold} exceeded)"
|
||||
else:
|
||||
reason = f"({self.buffer_minutes} minute buffer)"
|
||||
|
||||
title = f"AlpineBits Error {alert_type}: {error_count} errors {reason}"
|
||||
|
||||
# Build message
|
||||
message = f"Error Alert - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
|
||||
message += "=" * 70 + "\n\n"
|
||||
message += f"Alert Type: {alert_type}\n"
|
||||
message += f"Error Count: {error_count}\n"
|
||||
message += f"Time Range: {time_range}\n"
|
||||
message += f"Reason: {reason}\n"
|
||||
message += "\n" + "=" * 70 + "\n\n"
|
||||
|
||||
# Add individual errors
|
||||
message += "Errors:\n"
|
||||
message += "-" * 70 + "\n\n"
|
||||
for error in errors:
|
||||
message += error.format_plain_text()
|
||||
message += "\n"
|
||||
|
||||
message += "-" * 70 + "\n"
|
||||
message += f"Generated by AlpineBits Monitoring at {now.strftime('%Y-%m-%d %H:%M:%S')}\n"
|
||||
|
||||
# Send through unified notification service
|
||||
try:
|
||||
results = await self.notification_service.send_alert(
|
||||
title=title,
|
||||
message=message,
|
||||
backends=None, # Send to all backends
|
||||
)
|
||||
|
||||
success_count = sum(1 for success in results.values() if success)
|
||||
if success_count > 0:
|
||||
_LOGGER.info(
|
||||
"Alert sent successfully through %d/%d backend(s): %d errors",
|
||||
success_count,
|
||||
len(results),
|
||||
error_count,
|
||||
)
|
||||
else:
|
||||
_LOGGER.error("Failed to send alert through any backend: %d errors", error_count)
|
||||
|
||||
except Exception:
|
||||
_LOGGER.exception("Exception while sending alert")
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close the handler and flush any remaining errors.
|
||||
|
||||
This is called when the logging system shuts down.
|
||||
"""
|
||||
# Cancel any pending flush tasks
|
||||
if self._flush_task and not self._flush_task.done():
|
||||
self._flush_task.cancel()
|
||||
|
||||
# Flush any remaining errors immediately
|
||||
if self.error_buffer and self.loop:
|
||||
try:
|
||||
# Check if the loop is still running
|
||||
if not self.loop.is_closed():
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
self._flush_buffer(immediate=False),
|
||||
self.loop,
|
||||
)
|
||||
future.result(timeout=5)
|
||||
else:
|
||||
_LOGGER.warning(
|
||||
"Event loop closed, cannot flush %d remaining errors",
|
||||
len(self.error_buffer),
|
||||
)
|
||||
except Exception:
|
||||
_LOGGER.exception("Error flushing buffer on close")
|
||||
|
||||
super().close()
|
||||
|
||||
|
||||
class UnifiedDailyReportScheduler:
|
||||
"""Scheduler for sending daily reports through unified notification service.
|
||||
|
||||
This runs as a background task and sends daily reports containing
|
||||
statistics and error summaries through all configured notification backends.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
notification_service: NotificationService,
|
||||
config: dict[str, Any],
|
||||
):
|
||||
"""Initialize the unified daily report scheduler.
|
||||
|
||||
Args:
|
||||
notification_service: Unified notification service
|
||||
config: Configuration for daily reports
|
||||
|
||||
"""
|
||||
self.notification_service = notification_service
|
||||
self.config = config
|
||||
self.send_time = config.get("send_time", "08:00") # Default 8 AM
|
||||
self.include_stats = config.get("include_stats", True)
|
||||
self.include_errors = config.get("include_errors", True)
|
||||
|
||||
self._task: asyncio.Task | None = None
|
||||
self._stats_collector = None # Will be set by application
|
||||
self._error_log: list[dict[str, Any]] = []
|
||||
|
||||
_LOGGER.info(
|
||||
"UnifiedDailyReportScheduler initialized: send_time=%s",
|
||||
self.send_time,
|
||||
)
|
||||
|
||||
def start(self) -> None:
|
||||
"""Start the daily report scheduler."""
|
||||
if self._task is None or self._task.done():
|
||||
self._task = asyncio.create_task(self._run())
|
||||
_LOGGER.info("Daily report scheduler started")
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Stop the daily report scheduler."""
|
||||
if self._task and not self._task.done():
|
||||
self._task.cancel()
|
||||
_LOGGER.info("Daily report scheduler stopped")
|
||||
|
||||
def log_error(self, error: dict[str, Any]) -> None:
|
||||
"""Log an error for inclusion in daily report.
|
||||
|
||||
Args:
|
||||
error: Error information dictionary
|
||||
|
||||
"""
|
||||
self._error_log.append(error)
|
||||
|
||||
async def _run(self) -> None:
|
||||
"""Run the daily report scheduler loop."""
|
||||
while True:
|
||||
try:
|
||||
# Calculate time until next report
|
||||
now = datetime.now()
|
||||
target_hour, target_minute = map(int, self.send_time.split(":"))
|
||||
|
||||
# Calculate next send time
|
||||
next_send = now.replace(
|
||||
hour=target_hour,
|
||||
minute=target_minute,
|
||||
second=0,
|
||||
microsecond=0,
|
||||
)
|
||||
|
||||
# If time has passed today, schedule for tomorrow
|
||||
if next_send <= now:
|
||||
next_send += timedelta(days=1)
|
||||
|
||||
# Calculate sleep duration
|
||||
sleep_seconds = (next_send - now).total_seconds()
|
||||
|
||||
_LOGGER.info(
|
||||
"Next daily report scheduled for %s (in %.1f hours)",
|
||||
next_send.strftime("%Y-%m-%d %H:%M:%S"),
|
||||
sleep_seconds / 3600,
|
||||
)
|
||||
|
||||
# Wait until send time
|
||||
await asyncio.sleep(sleep_seconds)
|
||||
|
||||
# Send report
|
||||
await self._send_report()
|
||||
|
||||
except asyncio.CancelledError:
|
||||
_LOGGER.info("Daily report scheduler cancelled")
|
||||
break
|
||||
except Exception:
|
||||
_LOGGER.exception("Error in daily report scheduler")
|
||||
# Sleep a bit before retrying
|
||||
await asyncio.sleep(60)
|
||||
|
||||
async def _send_report(self) -> None:
|
||||
"""Send the daily report."""
|
||||
stats = {}
|
||||
|
||||
# Collect statistics if enabled
|
||||
if self.include_stats and self._stats_collector:
|
||||
try:
|
||||
stats = await self._stats_collector()
|
||||
except Exception:
|
||||
_LOGGER.exception("Error collecting statistics for daily report")
|
||||
|
||||
# Get errors if enabled
|
||||
errors = self._error_log.copy() if self.include_errors else None
|
||||
|
||||
# Send report through unified notification service
|
||||
try:
|
||||
results = await self.notification_service.send_daily_report(
|
||||
stats=stats,
|
||||
errors=errors,
|
||||
backends=None, # Send to all backends
|
||||
)
|
||||
|
||||
success_count = sum(1 for success in results.values() if success)
|
||||
if success_count > 0:
|
||||
_LOGGER.info(
|
||||
"Daily report sent successfully through %d/%d backend(s)",
|
||||
success_count,
|
||||
len(results),
|
||||
)
|
||||
# Clear error log after successful send
|
||||
self._error_log.clear()
|
||||
else:
|
||||
_LOGGER.error("Failed to send daily report through any backend")
|
||||
|
||||
except Exception:
|
||||
_LOGGER.exception("Exception while sending daily report")
|
||||
|
||||
def set_stats_collector(self, collector) -> None:
|
||||
"""Set the statistics collector function.
|
||||
|
||||
Args:
|
||||
collector: Async function that returns statistics dictionary
|
||||
|
||||
"""
|
||||
self._stats_collector = collector
|
||||
Reference in New Issue
Block a user