Files
alpinebits_python/src/alpine_bits_python/notification_adapters.py
2025-10-16 11:08:39 +02:00

128 lines
3.4 KiB
Python

"""Adapters for notification backends.
This module provides adapters that wrap email and Pushover services
to work with the unified notification service interface.
"""
from typing import Any
from .email_service import EmailService
from .logging_config import get_logger
from .pushover_service import PushoverService
_LOGGER = get_logger(__name__)
class EmailNotificationAdapter:
"""Adapter for EmailService to work with NotificationService."""
def __init__(self, email_service: EmailService, recipients: list[str]):
"""Initialize the email notification adapter.
Args:
email_service: EmailService instance
recipients: List of recipient email addresses
"""
self.email_service = email_service
self.recipients = recipients
async def send_alert(self, title: str, message: str, **kwargs) -> bool:
"""Send an alert via email.
Args:
title: Email subject
message: Email body
**kwargs: Ignored for email
Returns:
True if sent successfully
"""
return await self.email_service.send_alert(
recipients=self.recipients,
subject=title,
body=message,
)
async def send_daily_report(
self,
stats: dict[str, Any],
errors: list[dict[str, Any]] | None = None,
**kwargs,
) -> bool:
"""Send a daily report via email.
Args:
stats: Statistics dictionary
errors: Optional list of errors
**kwargs: Ignored for email
Returns:
True if sent successfully
"""
return await self.email_service.send_daily_report(
recipients=self.recipients,
stats=stats,
errors=errors,
)
class PushoverNotificationAdapter:
"""Adapter for PushoverService to work with NotificationService."""
def __init__(self, pushover_service: PushoverService, priority: int = 0):
"""Initialize the Pushover notification adapter.
Args:
pushover_service: PushoverService instance
priority: Default priority level for notifications
"""
self.pushover_service = pushover_service
self.priority = priority
async def send_alert(self, title: str, message: str, **kwargs) -> bool:
"""Send an alert via Pushover.
Args:
title: Notification title
message: Notification message
**kwargs: Can include 'priority' to override default
Returns:
True if sent successfully
"""
priority = kwargs.get("priority", self.priority)
return await self.pushover_service.send_alert(
title=title,
message=message,
priority=priority,
)
async def send_daily_report(
self,
stats: dict[str, Any],
errors: list[dict[str, Any]] | None = None,
**kwargs,
) -> bool:
"""Send a daily report via Pushover.
Args:
stats: Statistics dictionary
errors: Optional list of errors
**kwargs: Can include 'priority' to override default
Returns:
True if sent successfully
"""
priority = kwargs.get("priority", self.priority)
return await self.pushover_service.send_daily_report(
stats=stats,
errors=errors,
priority=priority,
)