Merge remote-tracking branch 'origin/main' into schema_extension

This commit is contained in:
Jonas Linter
2025-10-16 16:03:24 +02:00
27 changed files with 5352 additions and 107 deletions

View File

@@ -1,6 +1,7 @@
import asyncio
import gzip
import json
import multiprocessing
import os
import traceback
import urllib.parse
@@ -11,10 +12,17 @@ from pathlib import Path
from typing import Any
import httpx
from fast_langdetect import detect
from fastapi import APIRouter, Depends, FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, Response
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from fastapi.security import (
HTTPAuthorizationCredentials,
HTTPBasic,
HTTPBasicCredentials,
HTTPBearer,
)
from pydantic import BaseModel
from slowapi.errors import RateLimitExceeded
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
@@ -32,7 +40,12 @@ from .customer_service import CustomerService
from .db import Base, get_database_url
from .db import Customer as DBCustomer
from .db import Reservation as DBReservation
from .email_monitoring import ReservationStatsCollector
from .email_service import create_email_service
from .logging_config import get_logger, setup_logging
from .notification_adapters import EmailNotificationAdapter, PushoverNotificationAdapter
from .notification_service import NotificationService
from .pushover_service import create_pushover_service
from .rate_limit import (
BURST_RATE_LIMIT,
DEFAULT_RATE_LIMIT,
@@ -42,12 +55,28 @@ from .rate_limit import (
webhook_limiter,
)
from .reservation_service import ReservationService
from .worker_coordination import is_primary_worker
# Configure logging - will be reconfigured during lifespan with actual config
_LOGGER = get_logger(__name__)
# HTTP Basic auth for AlpineBits
security_basic = HTTPBasic()
# HTTP Bearer auth for API endpoints
security_bearer = HTTPBearer()
# Constants for token sanitization
TOKEN_LOG_LENGTH = 10
# Pydantic models for language detection
class LanguageDetectionRequest(BaseModel):
text: str
class LanguageDetectionResponse(BaseModel):
language_code: str
score: float
# --- Enhanced event dispatcher with hotel-specific routing ---
@@ -179,15 +208,39 @@ async def push_listener(customer: DBCustomer, reservation: DBReservation, hotel)
async def lifespan(app: FastAPI):
# Setup DB
# Determine if this is the primary worker using file-based locking
# Only primary runs schedulers/background tasks
# In multi-worker setups, only one worker should run singleton services
is_primary, worker_lock = is_primary_worker()
_LOGGER.info(
"Worker startup: process=%s, pid=%d, primary=%s",
multiprocessing.current_process().name,
os.getpid(),
is_primary,
)
try:
config = load_config()
except Exception:
_LOGGER.exception("Failed to load config: ")
config = {}
# Setup logging from config
setup_logging(config)
_LOGGER.info("Application startup initiated")
# Get event loop for email monitoring
loop = asyncio.get_running_loop()
# Initialize email service (before logging setup so it can be used by handlers)
email_service = create_email_service(config)
# Initialize pushover service
pushover_service = create_pushover_service(config)
# Setup logging from config with email and pushover monitoring
# Only primary worker should have the report scheduler running
email_handler, report_scheduler = setup_logging(
config, email_service, pushover_service, loop, enable_scheduler=is_primary
)
_LOGGER.info("Application startup initiated (primary_worker=%s)", is_primary)
DATABASE_URL = get_database_url(config)
engine = create_async_engine(DATABASE_URL, echo=False)
@@ -198,6 +251,10 @@ async def lifespan(app: FastAPI):
app.state.config = config
app.state.alpine_bits_server = AlpineBitsServer(config)
app.state.event_dispatcher = event_dispatcher
app.state.email_service = email_service
app.state.pushover_service = pushover_service
app.state.email_handler = email_handler
app.state.report_scheduler = report_scheduler
# Register push listeners for hotels with push_endpoint
for hotel in config.get("alpine_bits_auth", []):
@@ -224,21 +281,102 @@ async def lifespan(app: FastAPI):
await conn.run_sync(Base.metadata.create_all)
_LOGGER.info("Database tables checked/created at startup.")
# Hash any existing customers that don't have hashed versions yet
async with AsyncSessionLocal() as session:
customer_service = CustomerService(session)
hashed_count = await customer_service.hash_existing_customers()
if hashed_count > 0:
_LOGGER.info(
"Backfilled hashed data for %d existing customers", hashed_count
)
else:
_LOGGER.info("All existing customers already have hashed data")
# Hash any existing customers (only in primary worker to avoid race conditions)
if is_primary:
async with AsyncSessionLocal() as session:
customer_service = CustomerService(session)
hashed_count = await customer_service.hash_existing_customers()
if hashed_count > 0:
_LOGGER.info(
"Backfilled hashed data for %d existing customers", hashed_count
)
else:
_LOGGER.info("All existing customers already have hashed data")
else:
_LOGGER.info("Skipping customer hashing (non-primary worker)")
# Initialize and hook up stats collector for daily reports
# Note: report_scheduler will only exist on the primary worker
if report_scheduler:
stats_collector = ReservationStatsCollector(
async_sessionmaker=AsyncSessionLocal,
config=config,
)
# Hook up the stats collector to the report scheduler
report_scheduler.set_stats_collector(stats_collector.collect_stats)
_LOGGER.info("Stats collector initialized and hooked up to report scheduler")
# Send a test daily report on startup for testing (with 24-hour lookback)
_LOGGER.info("Sending test daily report on startup (last 24 hours)")
try:
# Use lookback_hours=24 to get stats from last 24 hours
stats = await stats_collector.collect_stats(lookback_hours=24)
# Send via email (if configured)
if email_service:
success = await email_service.send_daily_report(
recipients=report_scheduler.recipients,
stats=stats,
errors=None,
)
if success:
_LOGGER.info("Test daily report sent via email successfully on startup")
else:
_LOGGER.error("Failed to send test daily report via email on startup")
# Send via Pushover (if configured)
if pushover_service:
pushover_config = config.get("pushover", {})
pushover_monitoring = pushover_config.get("monitoring", {})
pushover_daily_report = pushover_monitoring.get("daily_report", {})
priority = pushover_daily_report.get("priority", 0)
success = await pushover_service.send_daily_report(
stats=stats,
errors=None,
priority=priority,
)
if success:
_LOGGER.info("Test daily report sent via Pushover successfully on startup")
else:
_LOGGER.error("Failed to send test daily report via Pushover on startup")
except Exception:
_LOGGER.exception("Error sending test daily report on startup")
# Start daily report scheduler
report_scheduler.start()
_LOGGER.info("Daily report scheduler started")
_LOGGER.info("Application startup complete")
yield
# Optional: Dispose engine on shutdown
# Cleanup on shutdown
_LOGGER.info("Application shutdown initiated")
# Stop daily report scheduler
if report_scheduler:
report_scheduler.stop()
_LOGGER.info("Daily report scheduler stopped")
# Close email alert handler (flush any remaining errors)
if email_handler:
email_handler.close()
_LOGGER.info("Email alert handler closed")
# Shutdown email service thread pool
if email_service:
email_service.shutdown()
_LOGGER.info("Email service shut down")
# Dispose engine
await engine.dispose()
_LOGGER.info("Application shutdown complete")
# Release worker lock if this was the primary worker
if worker_lock:
worker_lock.release()
async def get_async_session(request: Request):
@@ -307,6 +445,85 @@ async def health_check(request: Request):
}
@api_router.post("/detect-language", response_model=LanguageDetectionResponse)
@limiter.limit(DEFAULT_RATE_LIMIT)
async def detect_language(
request: Request,
data: LanguageDetectionRequest,
credentials: HTTPAuthorizationCredentials = Depends(security_bearer),
):
"""Detect language of text, restricted to Italian or German.
Requires Bearer token authentication.
Returns the most likely language (it or de) with confidence score.
"""
# Validate bearer token
token = credentials.credentials
config = request.app.state.config
# Check if token is valid
valid_tokens = config.get("api_tokens", [])
# If no tokens configured, reject authentication
if not valid_tokens:
_LOGGER.error("No api_tokens configured in config.yaml")
raise HTTPException(
status_code=401,
detail="Authentication token not configured on server",
)
if token not in valid_tokens:
# Log sanitized token (first TOKEN_LOG_LENGTH chars) for security
sanitized_token = (
token[:TOKEN_LOG_LENGTH] + "..." if len(token) > TOKEN_LOG_LENGTH else token
)
_LOGGER.warning("Invalid token attempt: %s", sanitized_token)
raise HTTPException(
status_code=401,
detail="Invalid authentication token",
)
try:
# Detect language with k=2 to get top 2 candidates
results = detect(data.text, k=2)
_LOGGER.info("Language detection results: %s", results)
# Filter for Italian (it) or German (de)
italian_german_results = [r for r in results if r.get("lang") in ["it", "de"]]
if italian_german_results:
# Return the best match between Italian and German
best_match = italian_german_results[0]
return_value = "Italienisch" if best_match["lang"] == "it" else "Deutsch"
return LanguageDetectionResponse(
language_code=return_value, score=best_match.get("score", 0.0)
)
# If neither Italian nor German detected in top 2, check all results
all_results = detect(data.text, k=10)
italian_german_all = [r for r in all_results if r.get("lang") in ["it", "de"]]
if italian_german_all:
best_match = italian_german_all[0]
return_value = "Italienisch" if best_match["lang"] == "it" else "Deutsch"
return LanguageDetectionResponse(
language_code=return_value, score=best_match.get("score", 0.0)
)
# Default to German if no clear detection
_LOGGER.warning(
"Could not detect Italian or German in text: %s, defaulting to 'de'",
data.text[:100],
)
return LanguageDetectionResponse(language_code="Deutsch", score=0.0)
except Exception as e:
_LOGGER.exception("Error detecting language: %s", e)
raise HTTPException(status_code=500, detail=f"Error detecting language: {e!s}")
# Extracted business logic for handling Wix form submissions
async def process_wix_form_submission(request: Request, data: dict[str, Any], db):
"""Shared business logic for handling Wix form submissions (test and production)."""
@@ -544,6 +761,9 @@ async def process_generic_webhook_submission(
name_prefix = form_data.get("anrede")
language = form_data.get("sprache", "de")[:2]
user_comment = form_data.get("nachricht", "")
plz = form_data.get("plz", "")
city = form_data.get("stadt", "")
country = form_data.get("land", "")
# Parse dates - handle DD.MM.YYYY format
start_date_str = form_data.get("anreise")
@@ -623,9 +843,9 @@ async def process_generic_webhook_submission(
"phone": phone_number if phone_number else None,
"email_newsletter": False,
"address_line": None,
"city_name": None,
"postal_code": None,
"country_code": None,
"city_name": city if city else None,
"postal_code": plz if plz else None,
"country_code": country if country else None,
"gender": None,
"birth_date": None,
"language": language,

View File

@@ -6,9 +6,12 @@ from annotatedyaml.loader import load_yaml as load_annotated_yaml
from voluptuous import (
PREVENT_EXTRA,
All,
Boolean,
In,
Length,
MultipleInvalid,
Optional,
Range,
Required,
Schema,
)
@@ -82,12 +85,122 @@ hotel_auth_schema = Schema(
basic_auth_schema = Schema(All([hotel_auth_schema], Length(min=1)))
# Email SMTP configuration schema
smtp_schema = Schema(
{
Required("host", default="localhost"): str,
Required("port", default=587): Range(min=1, max=65535),
Optional("username"): str,
Optional("password"): str,
Required("use_tls", default=True): Boolean(),
Required("use_ssl", default=False): Boolean(),
},
extra=PREVENT_EXTRA,
)
# Email daily report configuration schema
daily_report_schema = Schema(
{
Required("enabled", default=False): Boolean(),
Optional("recipients", default=[]): [str],
Required("send_time", default="08:00"): str,
Required("include_stats", default=True): Boolean(),
Required("include_errors", default=True): Boolean(),
},
extra=PREVENT_EXTRA,
)
# Email error alerts configuration schema
error_alerts_schema = Schema(
{
Required("enabled", default=False): Boolean(),
Optional("recipients", default=[]): [str],
Required("error_threshold", default=5): Range(min=1),
Required("buffer_minutes", default=15): Range(min=1),
Required("cooldown_minutes", default=15): Range(min=0),
Required("log_levels", default=["ERROR", "CRITICAL"]): [
In(["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"])
],
},
extra=PREVENT_EXTRA,
)
# Email monitoring configuration schema
monitoring_schema = Schema(
{
Optional("daily_report", default={}): daily_report_schema,
Optional("error_alerts", default={}): error_alerts_schema,
},
extra=PREVENT_EXTRA,
)
# Complete email configuration schema
email_schema = Schema(
{
Optional("smtp", default={}): smtp_schema,
Required("from_address", default="noreply@example.com"): str,
Required("from_name", default="AlpineBits Server"): str,
Optional("timeout", default=10): Range(min=1, max=300),
Optional("monitoring", default={}): monitoring_schema,
},
extra=PREVENT_EXTRA,
)
# Pushover daily report configuration schema
pushover_daily_report_schema = Schema(
{
Required("enabled", default=False): Boolean(),
Required("send_time", default="08:00"): str,
Required("include_stats", default=True): Boolean(),
Required("include_errors", default=True): Boolean(),
Required("priority", default=0): Range(min=-2, max=2), # Pushover priority levels
},
extra=PREVENT_EXTRA,
)
# Pushover error alerts configuration schema
pushover_error_alerts_schema = Schema(
{
Required("enabled", default=False): Boolean(),
Required("error_threshold", default=5): Range(min=1),
Required("buffer_minutes", default=15): Range(min=1),
Required("cooldown_minutes", default=15): Range(min=0),
Required("log_levels", default=["ERROR", "CRITICAL"]): [
In(["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"])
],
Required("priority", default=1): Range(min=-2, max=2), # Pushover priority levels
},
extra=PREVENT_EXTRA,
)
# Pushover monitoring configuration schema
pushover_monitoring_schema = Schema(
{
Optional("daily_report", default={}): pushover_daily_report_schema,
Optional("error_alerts", default={}): pushover_error_alerts_schema,
},
extra=PREVENT_EXTRA,
)
# Complete pushover configuration schema
pushover_schema = Schema(
{
Optional("user_key"): str, # Optional but required for pushover to work
Optional("api_token"): str, # Optional but required for pushover to work
Optional("monitoring", default={}): pushover_monitoring_schema,
},
extra=PREVENT_EXTRA,
)
config_schema = Schema(
{
Required(CONF_DATABASE): database_schema,
Required(CONF_ALPINE_BITS_AUTH): basic_auth_schema,
Required(CONF_SERVER): server_info,
Required(CONF_LOGGING): logger_schema,
Optional("email"): email_schema, # Email is optional
Optional("pushover"): pushover_schema, # Pushover is optional
Optional("api_tokens", default=[]): [str], # API tokens for bearer auth
},
extra=PREVENT_EXTRA,
)

View File

@@ -0,0 +1,571 @@
"""Email monitoring and alerting through logging integration.
This module provides a custom logging handler that accumulates errors and sends
email alerts based on configurable thresholds and time windows.
"""
import asyncio
import logging
import threading
from collections import defaultdict, deque
from datetime import datetime, timedelta
from typing import Any
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import async_sessionmaker
from .db import Reservation
from .email_service import EmailService
from .logging_config import get_logger
_LOGGER = get_logger(__name__)
class ErrorRecord:
"""Represents a single error log record for monitoring.
Attributes:
timestamp: When the error occurred
level: Log level (ERROR, CRITICAL, etc.)
logger_name: Name of the logger that generated the error
message: The error message
exception: Exception info if available
module: Module where error occurred
line_no: Line number where error occurred
"""
def __init__(self, record: logging.LogRecord):
"""Initialize from a logging.LogRecord.
Args:
record: The logging record to wrap
"""
self.timestamp = datetime.fromtimestamp(record.created)
self.level = record.levelname
self.logger_name = record.name
self.message = record.getMessage()
self.exception = record.exc_text if record.exc_info else None
self.module = record.module
self.line_no = record.lineno
self.pathname = record.pathname
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary format.
Returns:
Dictionary representation of the error
"""
return {
"timestamp": self.timestamp.strftime("%Y-%m-%d %H:%M:%S"),
"level": self.level,
"logger_name": self.logger_name,
"message": self.message,
"exception": self.exception,
"module": self.module,
"line_no": self.line_no,
"pathname": self.pathname,
}
def format_plain_text(self) -> str:
"""Format error as plain text for email.
Returns:
Formatted plain text string
"""
text = f"[{self.timestamp.strftime('%Y-%m-%d %H:%M:%S')}] {self.level}: {self.message}\n"
text += f" Module: {self.module}:{self.line_no} ({self.logger_name})\n"
if self.exception:
text += f" Exception:\n{self.exception}\n"
return text
class EmailAlertHandler(logging.Handler):
"""Custom logging handler that sends email alerts for errors.
This handler uses a hybrid approach:
- Accumulates errors in a buffer
- Sends immediately if error threshold is reached
- Otherwise sends after buffer duration expires
- Always sends buffered errors (no minimum threshold for time-based flush)
- Implements cooldown to prevent alert spam
The handler is thread-safe and works with asyncio event loops.
"""
def __init__(
self,
email_service: EmailService,
config: dict[str, Any],
loop: asyncio.AbstractEventLoop | None = None,
):
"""Initialize the email alert handler.
Args:
email_service: Email service instance for sending alerts
config: Configuration dictionary for error alerts
loop: Asyncio event loop (will use current loop if not provided)
"""
super().__init__()
self.email_service = email_service
self.config = config
self.loop = loop # Will be set when first error occurs if not provided
# Configuration
self.recipients = config.get("recipients", [])
self.error_threshold = config.get("error_threshold", 5)
self.buffer_minutes = config.get("buffer_minutes", 15)
self.cooldown_minutes = config.get("cooldown_minutes", 15)
self.log_levels = config.get("log_levels", ["ERROR", "CRITICAL"])
# State
self.error_buffer: deque[ErrorRecord] = deque()
self.last_sent = datetime.min # Last time we sent an alert
self._flush_task: asyncio.Task | None = None
self._lock = threading.Lock() # Thread-safe for multi-threaded logging
_LOGGER.info(
"EmailAlertHandler initialized: threshold=%d, buffer=%dmin, cooldown=%dmin",
self.error_threshold,
self.buffer_minutes,
self.cooldown_minutes,
)
def emit(self, record: logging.LogRecord) -> None:
"""Handle a log record.
This is called automatically by the logging system when an error is logged.
It's important that this method is fast and doesn't block.
Args:
record: The log record to handle
"""
# Only handle configured log levels
if record.levelname not in self.log_levels:
return
try:
# Ensure we have an event loop
if self.loop is None:
try:
self.loop = asyncio.get_running_loop()
except RuntimeError:
# No running loop, we'll need to handle this differently
_LOGGER.warning("No asyncio event loop available for email alerts")
return
# Add error to buffer (thread-safe)
with self._lock:
error_record = ErrorRecord(record)
self.error_buffer.append(error_record)
buffer_size = len(self.error_buffer)
# Determine if we should send immediately
should_send_immediately = buffer_size >= self.error_threshold
if should_send_immediately:
# Cancel any pending flush task
if self._flush_task and not self._flush_task.done():
self._flush_task.cancel()
# Schedule immediate flush
self._flush_task = asyncio.run_coroutine_threadsafe(
self._flush_buffer(immediate=True),
self.loop,
)
# Schedule delayed flush if not already scheduled
elif not self._flush_task or self._flush_task.done():
self._flush_task = asyncio.run_coroutine_threadsafe(
self._schedule_delayed_flush(),
self.loop,
)
except Exception:
# Never let the handler crash - just log and continue
_LOGGER.exception("Error in EmailAlertHandler.emit")
async def _schedule_delayed_flush(self) -> None:
"""Schedule a delayed buffer flush after buffer duration."""
await asyncio.sleep(self.buffer_minutes * 60)
await self._flush_buffer(immediate=False)
async def _flush_buffer(self, *, immediate: bool) -> None:
"""Flush the error buffer and send email alert.
Args:
immediate: Whether this is an immediate flush (threshold hit)
"""
# Check cooldown period
now = datetime.now()
time_since_last = (now - self.last_sent).total_seconds() / 60
if time_since_last < self.cooldown_minutes:
_LOGGER.info(
"Alert cooldown active (%.1f min remaining), buffering errors",
self.cooldown_minutes - time_since_last,
)
# Don't clear buffer - let errors accumulate until cooldown expires
return
# Get all buffered errors (thread-safe)
with self._lock:
if not self.error_buffer:
return
errors = list(self.error_buffer)
self.error_buffer.clear()
# Update last sent time
self.last_sent = now
# Format email
error_count = len(errors)
time_range = (
f"{errors[0].timestamp.strftime('%H:%M:%S')} to "
f"{errors[-1].timestamp.strftime('%H:%M:%S')}"
)
# Determine alert type for subject
alert_type = "Immediate Alert" if immediate else "Scheduled Alert"
if immediate:
emoji = "🚨"
reason = f"(threshold of {self.error_threshold} exceeded)"
else:
emoji = "⚠️"
reason = f"({self.buffer_minutes} minute buffer)"
subject = (
f"{emoji} AlpineBits Error {alert_type}: {error_count} errors {reason}"
)
# Build plain text body
body = f"Error Alert - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
body += "=" * 70 + "\n\n"
body += f"Alert Type: {alert_type}\n"
body += f"Error Count: {error_count}\n"
body += f"Time Range: {time_range}\n"
body += f"Reason: {reason}\n"
body += "\n" + "=" * 70 + "\n\n"
# Add individual errors
body += "Errors:\n"
body += "-" * 70 + "\n\n"
for error in errors:
body += error.format_plain_text()
body += "\n"
body += "-" * 70 + "\n"
body += f"Generated by AlpineBits Email Monitoring at {now.strftime('%Y-%m-%d %H:%M:%S')}\n"
# Send email
try:
success = await self.email_service.send_alert(
recipients=self.recipients,
subject=subject,
body=body,
)
if success:
_LOGGER.info(
"Email alert sent successfully: %d errors to %s",
error_count,
self.recipients,
)
else:
_LOGGER.error("Failed to send email alert for %d errors", error_count)
except Exception:
_LOGGER.exception("Exception while sending email alert")
def close(self) -> None:
"""Close the handler and flush any remaining errors.
This is called when the logging system shuts down.
"""
# Cancel any pending flush tasks
if self._flush_task and not self._flush_task.done():
self._flush_task.cancel()
# Flush any remaining errors immediately
if self.error_buffer and self.loop:
try:
# Check if the loop is still running
if not self.loop.is_closed():
future = asyncio.run_coroutine_threadsafe(
self._flush_buffer(immediate=False),
self.loop,
)
future.result(timeout=5)
else:
_LOGGER.warning(
"Event loop closed, cannot flush %d remaining errors",
len(self.error_buffer),
)
except Exception:
_LOGGER.exception("Error flushing buffer on close")
super().close()
class DailyReportScheduler:
"""Scheduler for sending daily reports at configured times.
This runs as a background task and sends daily reports containing
statistics and error summaries.
"""
def __init__(
self,
email_service: EmailService,
config: dict[str, Any],
):
"""Initialize the daily report scheduler.
Args:
email_service: Email service for sending reports
config: Configuration for daily reports
"""
self.email_service = email_service
self.config = config
self.recipients = config.get("recipients", [])
self.send_time = config.get("send_time", "08:00") # Default 8 AM
self.include_stats = config.get("include_stats", True)
self.include_errors = config.get("include_errors", True)
self._task: asyncio.Task | None = None
self._stats_collector = None # Will be set by application
self._error_log: list[dict[str, Any]] = []
_LOGGER.info(
"DailyReportScheduler initialized: send_time=%s, recipients=%s",
self.send_time,
self.recipients,
)
def start(self) -> None:
"""Start the daily report scheduler."""
if self._task is None or self._task.done():
self._task = asyncio.create_task(self._run())
_LOGGER.info("Daily report scheduler started")
def stop(self) -> None:
"""Stop the daily report scheduler."""
if self._task and not self._task.done():
self._task.cancel()
_LOGGER.info("Daily report scheduler stopped")
def log_error(self, error: dict[str, Any]) -> None:
"""Log an error for inclusion in daily report.
Args:
error: Error information dictionary
"""
self._error_log.append(error)
async def _run(self) -> None:
"""Run the daily report scheduler loop."""
while True:
try:
# Calculate time until next report
now = datetime.now()
target_hour, target_minute = map(int, self.send_time.split(":"))
# Calculate next send time
next_send = now.replace(
hour=target_hour,
minute=target_minute,
second=0,
microsecond=0,
)
# If time has passed today, schedule for tomorrow
if next_send <= now:
next_send += timedelta(days=1)
# Calculate sleep duration
sleep_seconds = (next_send - now).total_seconds()
_LOGGER.info(
"Next daily report scheduled for %s (in %.1f hours)",
next_send.strftime("%Y-%m-%d %H:%M:%S"),
sleep_seconds / 3600,
)
# Wait until send time
await asyncio.sleep(sleep_seconds)
# Send report
await self._send_report()
except asyncio.CancelledError:
_LOGGER.info("Daily report scheduler cancelled")
break
except Exception:
_LOGGER.exception("Error in daily report scheduler")
# Sleep a bit before retrying
await asyncio.sleep(60)
async def _send_report(self) -> None:
"""Send the daily report."""
stats = {}
# Collect statistics if enabled
if self.include_stats and self._stats_collector:
try:
stats = await self._stats_collector()
except Exception:
_LOGGER.exception("Error collecting statistics for daily report")
# Get errors if enabled
errors = self._error_log.copy() if self.include_errors else None
# Send report
try:
success = await self.email_service.send_daily_report(
recipients=self.recipients,
stats=stats,
errors=errors,
)
if success:
_LOGGER.info("Daily report sent successfully to %s", self.recipients)
# Clear error log after successful send
self._error_log.clear()
else:
_LOGGER.error("Failed to send daily report")
except Exception:
_LOGGER.exception("Exception while sending daily report")
def set_stats_collector(self, collector) -> None:
"""Set the statistics collector function.
Args:
collector: Async function that returns statistics dictionary
"""
self._stats_collector = collector
class ReservationStatsCollector:
"""Collects reservation statistics per hotel for daily reports.
This collector queries the database for reservations created since the last
report and aggregates them by hotel. It includes hotel_code and hotel_name
from the configuration.
"""
def __init__(
self,
async_sessionmaker: async_sessionmaker,
config: dict[str, Any],
):
"""Initialize the stats collector.
Args:
async_sessionmaker: SQLAlchemy async session maker
config: Application configuration containing hotel information
"""
self.async_sessionmaker = async_sessionmaker
self.config = config
self._last_report_time = datetime.now()
# Build hotel mapping from config
self._hotel_map = {}
for hotel in config.get("alpine_bits_auth", []):
hotel_id = hotel.get("hotel_id")
hotel_name = hotel.get("hotel_name")
if hotel_id:
self._hotel_map[hotel_id] = hotel_name or "Unknown Hotel"
_LOGGER.info(
"ReservationStatsCollector initialized with %d hotels",
len(self._hotel_map),
)
async def collect_stats(self, lookback_hours: int | None = None) -> dict[str, Any]:
"""Collect reservation statistics for the reporting period.
Args:
lookback_hours: Optional override to look back N hours from now.
If None, uses time since last report.
Returns:
Dictionary with statistics including reservations per hotel
"""
now = datetime.now()
if lookback_hours is not None:
# Override mode: look back N hours from now
period_start = now - timedelta(hours=lookback_hours)
period_end = now
else:
# Normal mode: since last report
period_start = self._last_report_time
period_end = now
_LOGGER.info(
"Collecting reservation stats from %s to %s",
period_start.strftime("%Y-%m-%d %H:%M:%S"),
period_end.strftime("%Y-%m-%d %H:%M:%S"),
)
async with self.async_sessionmaker() as session:
# Query reservations created in the reporting period
result = await session.execute(
select(Reservation.hotel_code, func.count(Reservation.id))
.where(Reservation.created_at >= period_start)
.where(Reservation.created_at < period_end)
.group_by(Reservation.hotel_code)
)
hotel_counts = dict(result.all())
# Build stats with hotel names from config
hotels_stats = []
total_reservations = 0
for hotel_code, count in hotel_counts.items():
hotel_name = self._hotel_map.get(hotel_code, "Unknown Hotel")
hotels_stats.append(
{
"hotel_code": hotel_code,
"hotel_name": hotel_name,
"reservations": count,
}
)
total_reservations += count
# Sort by reservation count descending
hotels_stats.sort(key=lambda x: x["reservations"], reverse=True)
# Update last report time only in normal mode (not lookback mode)
if lookback_hours is None:
self._last_report_time = now
stats = {
"reporting_period": {
"start": period_start.strftime("%Y-%m-%d %H:%M:%S"),
"end": period_end.strftime("%Y-%m-%d %H:%M:%S"),
},
"total_reservations": total_reservations,
"hotels": hotels_stats,
}
_LOGGER.info(
"Collected stats: %d total reservations across %d hotels",
total_reservations,
len(hotels_stats),
)
return stats

View File

@@ -0,0 +1,373 @@
"""Email service for sending alerts and reports.
This module provides email functionality for the AlpineBits application,
including error alerts and daily reports.
"""
import asyncio
import smtplib
import ssl
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import Any
from pydantic import EmailStr, Field, field_validator
from .logging_config import get_logger
_LOGGER = get_logger(__name__)
class EmailConfig:
"""Configuration for email service.
Attributes:
smtp_host: SMTP server hostname
smtp_port: SMTP server port
smtp_username: SMTP authentication username
smtp_password: SMTP authentication password
use_tls: Use STARTTLS for encryption
use_ssl: Use SSL/TLS from the start
from_address: Sender email address
from_name: Sender display name
timeout: Connection timeout in seconds
"""
def __init__(self, config: dict[str, Any]):
"""Initialize email configuration from config dict.
Args:
config: Email configuration dictionary
"""
smtp_config = config.get("smtp", {})
self.smtp_host: str = smtp_config.get("host", "localhost")
self.smtp_port: int = smtp_config.get("port", 587)
self.smtp_username: str | None = smtp_config.get("username")
self.smtp_password: str | None = smtp_config.get("password")
self.use_tls: bool = smtp_config.get("use_tls", True)
self.use_ssl: bool = smtp_config.get("use_ssl", False)
self.from_address: str = config.get("from_address", "noreply@example.com")
self.from_name: str = config.get("from_name", "AlpineBits Server")
self.timeout: int = config.get("timeout", 10)
# Validate configuration
if self.use_tls and self.use_ssl:
msg = "Cannot use both TLS and SSL"
raise ValueError(msg)
class EmailService:
"""Service for sending emails via SMTP.
This service handles sending both plain text and HTML emails,
with support for TLS/SSL encryption and authentication.
"""
def __init__(self, config: EmailConfig):
"""Initialize email service.
Args:
config: Email configuration
"""
self.config = config
# Create dedicated thread pool for SMTP operations (max 2 threads is enough for email)
# This prevents issues with default executor in multi-process environments
self._executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="smtp-")
async def send_email(
self,
recipients: list[str],
subject: str,
body: str,
html_body: str | None = None,
) -> bool:
"""Send an email to recipients.
Args:
recipients: List of recipient email addresses
subject: Email subject line
body: Plain text email body
html_body: Optional HTML email body
Returns:
True if email was sent successfully, False otherwise
"""
if not recipients:
_LOGGER.warning("No recipients specified for email: %s", subject)
return False
try:
# Build message
msg = MIMEMultipart("alternative")
msg["Subject"] = subject
msg["From"] = f"{self.config.from_name} <{self.config.from_address}>"
msg["To"] = ", ".join(recipients)
msg["Date"] = datetime.now().strftime("%a, %d %b %Y %H:%M:%S %z")
# Attach plain text body
msg.attach(MIMEText(body, "plain"))
# Attach HTML body if provided
if html_body:
msg.attach(MIMEText(html_body, "html"))
# Send email in dedicated thread pool (SMTP is blocking)
loop = asyncio.get_event_loop()
await loop.run_in_executor(self._executor, self._send_smtp, msg, recipients)
_LOGGER.info("Email sent successfully to %s: %s", recipients, subject)
return True
except Exception:
_LOGGER.exception("Failed to send email to %s: %s", recipients, subject)
return False
def _send_smtp(self, msg: MIMEMultipart, recipients: list[str]) -> None:
"""Send email via SMTP (blocking operation).
Args:
msg: Email message to send
recipients: List of recipient addresses
Raises:
Exception: If email sending fails
"""
if self.config.use_ssl:
# Connect with SSL from the start
context = ssl.create_default_context()
with smtplib.SMTP_SSL(
self.config.smtp_host,
self.config.smtp_port,
timeout=self.config.timeout,
context=context,
) as server:
if self.config.smtp_username and self.config.smtp_password:
server.login(self.config.smtp_username, self.config.smtp_password)
server.send_message(msg, self.config.from_address, recipients)
else:
# Connect and optionally upgrade to TLS
with smtplib.SMTP(
self.config.smtp_host,
self.config.smtp_port,
timeout=self.config.timeout,
) as server:
if self.config.use_tls:
context = ssl.create_default_context()
server.starttls(context=context)
if self.config.smtp_username and self.config.smtp_password:
server.login(self.config.smtp_username, self.config.smtp_password)
server.send_message(msg, self.config.from_address, recipients)
async def send_alert(
self,
recipients: list[str],
subject: str,
body: str,
) -> bool:
"""Send an alert email (convenience method).
Args:
recipients: List of recipient email addresses
subject: Email subject line
body: Email body text
Returns:
True if email was sent successfully, False otherwise
"""
return await self.send_email(recipients, subject, body)
async def send_daily_report(
self,
recipients: list[str],
stats: dict[str, Any],
errors: list[dict[str, Any]] | None = None,
) -> bool:
"""Send a daily report email.
Args:
recipients: List of recipient email addresses
stats: Dictionary containing statistics to include in report
errors: Optional list of errors to include
Returns:
True if email was sent successfully, False otherwise
"""
date_str = datetime.now().strftime("%Y-%m-%d")
subject = f"AlpineBits Daily Report - {date_str}"
# Build plain text body
body = f"AlpineBits Daily Report for {date_str}\n"
body += "=" * 60 + "\n\n"
# Add statistics
if stats:
body += "Statistics:\n"
body += "-" * 60 + "\n"
for key, value in stats.items():
body += f" {key}: {value}\n"
body += "\n"
# Add errors if present
if errors:
body += f"Errors ({len(errors)}):\n"
body += "-" * 60 + "\n"
for error in errors[:20]: # Limit to 20 most recent errors
timestamp = error.get("timestamp", "Unknown")
level = error.get("level", "ERROR")
message = error.get("message", "No message")
body += f" [{timestamp}] {level}: {message}\n"
if len(errors) > 20:
body += f" ... and {len(errors) - 20} more errors\n"
body += "\n"
body += "-" * 60 + "\n"
body += "Generated by AlpineBits Server\n"
# Build HTML body for better formatting
html_body = self._build_daily_report_html(date_str, stats, errors)
return await self.send_email(recipients, subject, body, html_body)
def _build_daily_report_html(
self,
date_str: str,
stats: dict[str, Any],
errors: list[dict[str, Any]] | None,
) -> str:
"""Build HTML version of daily report.
Args:
date_str: Date string for the report
stats: Statistics dictionary
errors: Optional list of errors
Returns:
HTML string for the email body
"""
html = f"""
<html>
<head>
<style>
body {{ font-family: Arial, sans-serif; }}
h1 {{ color: #333; }}
h2 {{ color: #666; margin-top: 20px; }}
table {{ border-collapse: collapse; width: 100%; }}
th, td {{ text-align: left; padding: 8px; border-bottom: 1px solid #ddd; }}
th {{ background-color: #f2f2f2; }}
.error {{ color: #d32f2f; }}
.warning {{ color: #f57c00; }}
.footer {{ margin-top: 30px; color: #999; font-size: 12px; }}
</style>
</head>
<body>
<h1>AlpineBits Daily Report</h1>
<p><strong>Date:</strong> {date_str}</p>
"""
# Add statistics table
if stats:
html += """
<h2>Statistics</h2>
<table>
<tr>
<th>Metric</th>
<th>Value</th>
</tr>
"""
for key, value in stats.items():
html += f"""
<tr>
<td>{key}</td>
<td>{value}</td>
</tr>
"""
html += "</table>"
# Add errors table
if errors:
html += f"""
<h2>Errors ({len(errors)})</h2>
<table>
<tr>
<th>Time</th>
<th>Level</th>
<th>Message</th>
</tr>
"""
for error in errors[:20]: # Limit to 20 most recent
timestamp = error.get("timestamp", "Unknown")
level = error.get("level", "ERROR")
message = error.get("message", "No message")
css_class = "error" if level == "ERROR" or level == "CRITICAL" else "warning"
html += f"""
<tr>
<td>{timestamp}</td>
<td class="{css_class}">{level}</td>
<td>{message}</td>
</tr>
"""
if len(errors) > 20:
html += f"""
<tr>
<td colspan="3"><em>... and {len(errors) - 20} more errors</em></td>
</tr>
"""
html += "</table>"
html += """
<div class="footer">
<p>Generated by AlpineBits Server</p>
</div>
</body>
</html>
"""
return html
def shutdown(self) -> None:
"""Shutdown the email service and clean up thread pool.
This should be called during application shutdown to ensure
proper cleanup of the thread pool executor.
"""
if self._executor:
_LOGGER.info("Shutting down email service thread pool")
self._executor.shutdown(wait=True, cancel_futures=False)
_LOGGER.info("Email service thread pool shut down complete")
def create_email_service(config: dict[str, Any]) -> EmailService | None:
"""Create an email service from configuration.
Args:
config: Full application configuration dictionary
Returns:
EmailService instance if email is configured, None otherwise
"""
email_config = config.get("email")
if not email_config:
_LOGGER.info("Email not configured, email service disabled")
return None
try:
email_cfg = EmailConfig(email_config)
service = EmailService(email_cfg)
_LOGGER.info("Email service initialized: %s:%s", email_cfg.smtp_host, email_cfg.smtp_port)
return service
except Exception:
_LOGGER.exception("Failed to initialize email service")
return None

View File

@@ -4,16 +4,41 @@ This module sets up logging based on config and provides a function to get
loggers from anywhere in the application.
"""
import asyncio
import logging
import sys
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from alpine_bits_python.email_monitoring import (
DailyReportScheduler,
EmailAlertHandler,
)
from alpine_bits_python.email_service import EmailService
from alpine_bits_python.pushover_service import PushoverService
def setup_logging(config: dict | None = None):
def setup_logging(
config: dict | None = None,
email_service: "EmailService | None" = None,
pushover_service: "PushoverService | None" = None,
loop: asyncio.AbstractEventLoop | None = None,
enable_scheduler: bool = True,
) -> tuple["EmailAlertHandler | None", "DailyReportScheduler | None"]:
"""Configure logging based on application config.
Args:
config: Application configuration dict with optional 'logger' section
email_service: Optional email service for email alerts
pushover_service: Optional pushover service for push notifications
loop: Optional asyncio event loop for email alerts
enable_scheduler: Whether to enable the daily report scheduler
(should be False for non-primary workers)
Returns:
Tuple of (email_alert_handler, daily_report_scheduler) if monitoring
is enabled, otherwise (None, None)
Logger config format:
logger:
@@ -67,6 +92,89 @@ def setup_logging(config: dict | None = None):
root_logger.info("Logging configured at %s level", level)
# Setup notification monitoring if configured
email_handler = None
report_scheduler = None
# Setup email monitoring if configured
if email_service:
email_config = config.get("email", {})
monitoring_config = email_config.get("monitoring", {})
# Setup error alert handler
error_alerts_config = monitoring_config.get("error_alerts", {})
if error_alerts_config.get("enabled", False):
try:
# Import here to avoid circular dependencies
from alpine_bits_python.email_monitoring import EmailAlertHandler
email_handler = EmailAlertHandler(
email_service=email_service,
config=error_alerts_config,
loop=loop,
)
email_handler.setLevel(logging.ERROR)
root_logger.addHandler(email_handler)
root_logger.info("Email alert handler enabled for error monitoring")
except Exception:
root_logger.exception("Failed to setup email alert handler")
# Setup daily report scheduler (only if enabled and this is primary worker)
daily_report_config = monitoring_config.get("daily_report", {})
if daily_report_config.get("enabled", False) and enable_scheduler:
try:
# Import here to avoid circular dependencies
from alpine_bits_python.email_monitoring import DailyReportScheduler
report_scheduler = DailyReportScheduler(
email_service=email_service,
config=daily_report_config,
)
root_logger.info("Daily report scheduler configured (primary worker)")
except Exception:
root_logger.exception("Failed to setup daily report scheduler")
elif daily_report_config.get("enabled", False) and not enable_scheduler:
root_logger.info(
"Daily report scheduler disabled (non-primary worker)"
)
# Check if Pushover daily reports are enabled
# If so and no report_scheduler exists yet, create one
if pushover_service and not report_scheduler:
pushover_config = config.get("pushover", {})
pushover_monitoring = pushover_config.get("monitoring", {})
pushover_daily_report = pushover_monitoring.get("daily_report", {})
if pushover_daily_report.get("enabled", False) and enable_scheduler:
try:
# Import here to avoid circular dependencies
from alpine_bits_python.email_monitoring import DailyReportScheduler
# Create a dummy config for the scheduler
# (it doesn't need email-specific fields if email is disabled)
scheduler_config = {
"send_time": pushover_daily_report.get("send_time", "08:00"),
"include_stats": pushover_daily_report.get("include_stats", True),
"include_errors": pushover_daily_report.get("include_errors", True),
"recipients": [], # Not used for Pushover
}
report_scheduler = DailyReportScheduler(
email_service=email_service, # Can be None
config=scheduler_config,
)
root_logger.info(
"Daily report scheduler configured for Pushover (primary worker)"
)
except Exception:
root_logger.exception("Failed to setup Pushover daily report scheduler")
elif pushover_daily_report.get("enabled", False) and not enable_scheduler:
root_logger.info(
"Pushover daily report scheduler disabled (non-primary worker)"
)
return email_handler, report_scheduler
def get_logger(name: str) -> logging.Logger:
"""Get a logger instance for the given module name.

View File

@@ -0,0 +1,127 @@
"""Adapters for notification backends.
This module provides adapters that wrap email and Pushover services
to work with the unified notification service interface.
"""
from typing import Any
from .email_service import EmailService
from .logging_config import get_logger
from .pushover_service import PushoverService
_LOGGER = get_logger(__name__)
class EmailNotificationAdapter:
"""Adapter for EmailService to work with NotificationService."""
def __init__(self, email_service: EmailService, recipients: list[str]):
"""Initialize the email notification adapter.
Args:
email_service: EmailService instance
recipients: List of recipient email addresses
"""
self.email_service = email_service
self.recipients = recipients
async def send_alert(self, title: str, message: str, **kwargs) -> bool:
"""Send an alert via email.
Args:
title: Email subject
message: Email body
**kwargs: Ignored for email
Returns:
True if sent successfully
"""
return await self.email_service.send_alert(
recipients=self.recipients,
subject=title,
body=message,
)
async def send_daily_report(
self,
stats: dict[str, Any],
errors: list[dict[str, Any]] | None = None,
**kwargs,
) -> bool:
"""Send a daily report via email.
Args:
stats: Statistics dictionary
errors: Optional list of errors
**kwargs: Ignored for email
Returns:
True if sent successfully
"""
return await self.email_service.send_daily_report(
recipients=self.recipients,
stats=stats,
errors=errors,
)
class PushoverNotificationAdapter:
"""Adapter for PushoverService to work with NotificationService."""
def __init__(self, pushover_service: PushoverService, priority: int = 0):
"""Initialize the Pushover notification adapter.
Args:
pushover_service: PushoverService instance
priority: Default priority level for notifications
"""
self.pushover_service = pushover_service
self.priority = priority
async def send_alert(self, title: str, message: str, **kwargs) -> bool:
"""Send an alert via Pushover.
Args:
title: Notification title
message: Notification message
**kwargs: Can include 'priority' to override default
Returns:
True if sent successfully
"""
priority = kwargs.get("priority", self.priority)
return await self.pushover_service.send_alert(
title=title,
message=message,
priority=priority,
)
async def send_daily_report(
self,
stats: dict[str, Any],
errors: list[dict[str, Any]] | None = None,
**kwargs,
) -> bool:
"""Send a daily report via Pushover.
Args:
stats: Statistics dictionary
errors: Optional list of errors
**kwargs: Can include 'priority' to override default
Returns:
True if sent successfully
"""
priority = kwargs.get("priority", self.priority)
return await self.pushover_service.send_daily_report(
stats=stats,
errors=errors,
priority=priority,
)

View File

@@ -0,0 +1,177 @@
"""Unified notification service supporting multiple backends.
This module provides a unified interface for sending notifications through
different channels (email, Pushover, etc.) for alerts and daily reports.
"""
from typing import Any, Protocol
from .logging_config import get_logger
_LOGGER = get_logger(__name__)
class NotificationBackend(Protocol):
"""Protocol for notification backends."""
async def send_alert(self, title: str, message: str, **kwargs) -> bool:
"""Send an alert notification.
Args:
title: Alert title/subject
message: Alert message/body
**kwargs: Backend-specific parameters
Returns:
True if sent successfully, False otherwise
"""
...
async def send_daily_report(
self,
stats: dict[str, Any],
errors: list[dict[str, Any]] | None = None,
**kwargs,
) -> bool:
"""Send a daily report notification.
Args:
stats: Statistics dictionary
errors: Optional list of errors
**kwargs: Backend-specific parameters
Returns:
True if sent successfully, False otherwise
"""
...
class NotificationService:
"""Unified notification service that supports multiple backends.
This service can send notifications through multiple channels simultaneously
(email, Pushover, etc.) based on configuration.
"""
def __init__(self):
"""Initialize the notification service."""
self.backends: dict[str, NotificationBackend] = {}
def register_backend(self, name: str, backend: NotificationBackend) -> None:
"""Register a notification backend.
Args:
name: Backend name (e.g., "email", "pushover")
backend: Backend instance implementing NotificationBackend protocol
"""
self.backends[name] = backend
_LOGGER.info("Registered notification backend: %s", name)
async def send_alert(
self,
title: str,
message: str,
backends: list[str] | None = None,
**kwargs,
) -> dict[str, bool]:
"""Send an alert through specified backends.
Args:
title: Alert title/subject
message: Alert message/body
backends: List of backend names to use (None = all registered)
**kwargs: Backend-specific parameters
Returns:
Dictionary mapping backend names to success status
"""
if backends is None:
backends = list(self.backends.keys())
results = {}
for backend_name in backends:
backend = self.backends.get(backend_name)
if backend is None:
_LOGGER.warning("Backend not found: %s", backend_name)
results[backend_name] = False
continue
try:
success = await backend.send_alert(title, message, **kwargs)
results[backend_name] = success
except Exception:
_LOGGER.exception(
"Error sending alert through backend %s", backend_name
)
results[backend_name] = False
return results
async def send_daily_report(
self,
stats: dict[str, Any],
errors: list[dict[str, Any]] | None = None,
backends: list[str] | None = None,
**kwargs,
) -> dict[str, bool]:
"""Send a daily report through specified backends.
Args:
stats: Statistics dictionary
errors: Optional list of errors
backends: List of backend names to use (None = all registered)
**kwargs: Backend-specific parameters
Returns:
Dictionary mapping backend names to success status
"""
if backends is None:
backends = list(self.backends.keys())
results = {}
for backend_name in backends:
backend = self.backends.get(backend_name)
if backend is None:
_LOGGER.warning("Backend not found: %s", backend_name)
results[backend_name] = False
continue
try:
success = await backend.send_daily_report(stats, errors, **kwargs)
results[backend_name] = success
except Exception:
_LOGGER.exception(
"Error sending daily report through backend %s", backend_name
)
results[backend_name] = False
return results
def get_backend(self, name: str) -> NotificationBackend | None:
"""Get a specific notification backend.
Args:
name: Backend name
Returns:
Backend instance or None if not found
"""
return self.backends.get(name)
def has_backend(self, name: str) -> bool:
"""Check if a backend is registered.
Args:
name: Backend name
Returns:
True if backend is registered
"""
return name in self.backends

View File

@@ -0,0 +1,281 @@
"""Pushover service for sending push notifications.
This module provides push notification functionality for the AlpineBits application,
including error alerts and daily reports via Pushover.
"""
import asyncio
from datetime import datetime
from typing import Any
from pushover_complete import PushoverAPI
from .logging_config import get_logger
_LOGGER = get_logger(__name__)
class PushoverConfig:
"""Configuration for Pushover service.
Attributes:
user_key: Pushover user/group key
api_token: Pushover application API token
"""
def __init__(self, config: dict[str, Any]):
"""Initialize Pushover configuration from config dict.
Args:
config: Pushover configuration dictionary
"""
self.user_key: str | None = config.get("user_key")
self.api_token: str | None = config.get("api_token")
# Validate configuration
if not self.user_key or not self.api_token:
msg = "Both user_key and api_token are required for Pushover"
raise ValueError(msg)
class PushoverService:
"""Service for sending push notifications via Pushover.
This service handles sending notifications through the Pushover API,
including alerts and daily reports.
"""
def __init__(self, config: PushoverConfig):
"""Initialize Pushover service.
Args:
config: Pushover configuration
"""
self.config = config
self.api = PushoverAPI(config.api_token)
async def send_notification(
self,
title: str,
message: str,
priority: int = 0,
url: str | None = None,
url_title: str | None = None,
) -> bool:
"""Send a push notification via Pushover.
Args:
title: Notification title
message: Notification message
priority: Priority level (-2 to 2, default 0)
url: Optional supplementary URL
url_title: Optional title for the URL
Returns:
True if notification was sent successfully, False otherwise
"""
try:
# Send notification in thread pool (API is blocking)
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
self._send_pushover,
title,
message,
priority,
url,
url_title,
)
_LOGGER.info("Pushover notification sent successfully: %s", title)
return True
except Exception:
_LOGGER.exception("Failed to send Pushover notification: %s", title)
return False
def _send_pushover(
self,
title: str,
message: str,
priority: int,
url: str | None,
url_title: str | None,
) -> None:
"""Send notification via Pushover (blocking operation).
Args:
title: Notification title
message: Notification message
priority: Priority level
url: Optional URL
url_title: Optional URL title
Raises:
Exception: If notification sending fails
"""
kwargs = {
"user": self.config.user_key,
"title": title,
"message": message,
"priority": priority,
}
if url:
kwargs["url"] = url
if url_title:
kwargs["url_title"] = url_title
self.api.send_message(**kwargs)
async def send_alert(
self,
title: str,
message: str,
priority: int = 1,
) -> bool:
"""Send an alert notification (convenience method).
Args:
title: Alert title
message: Alert message
priority: Priority level (default 1 for high priority)
Returns:
True if notification was sent successfully, False otherwise
"""
return await self.send_notification(title, message, priority=priority)
async def send_daily_report(
self,
stats: dict[str, Any],
errors: list[dict[str, Any]] | None = None,
priority: int = 0,
) -> bool:
"""Send a daily report notification.
Args:
stats: Dictionary containing statistics to include in report
errors: Optional list of errors to include
priority: Priority level (default 0 for normal)
Returns:
True if notification was sent successfully, False otherwise
"""
date_str = datetime.now().strftime("%Y-%m-%d")
title = f"AlpineBits Daily Report - {date_str}"
# Build message body (Pushover has a 1024 character limit)
message = self._build_daily_report_message(date_str, stats, errors)
return await self.send_notification(title, message, priority=priority)
def _build_daily_report_message(
self,
date_str: str,
stats: dict[str, Any],
errors: list[dict[str, Any]] | None,
) -> str:
"""Build daily report message for Pushover.
Args:
date_str: Date string for the report
stats: Statistics dictionary
errors: Optional list of errors
Returns:
Formatted message string (max 1024 chars for Pushover)
"""
lines = [f"Report for {date_str}", ""]
# Add statistics (simplified for push notification)
if stats:
# Handle reporting period
period = stats.get("reporting_period", {})
if period:
start = period.get("start", "")
end = period.get("end", "")
if start and end:
# Parse the datetime strings to check if they're on different days
if " " in start and " " in end:
start_date, start_time = start.split(" ")
end_date, end_time = end.split(" ")
# If same day, just show times
if start_date == end_date:
lines.append(f"Period: {start_time} - {end_time}")
else:
# Different days, show date + time in compact format
# Format: "MM-DD HH:MM - MM-DD HH:MM"
start_compact = f"{start_date[5:]} {start_time[:5]}"
end_compact = f"{end_date[5:]} {end_time[:5]}"
lines.append(f"Period: {start_compact} - {end_compact}")
else:
# Fallback if format is unexpected
lines.append(f"Period: {start} - {end}")
# Total reservations
total = stats.get("total_reservations", 0)
lines.append(f"Total Reservations: {total}")
# Per-hotel breakdown (top 5 only to save space)
hotels = stats.get("hotels", [])
if hotels:
lines.append("")
lines.append("By Hotel:")
for hotel in hotels[:5]: # Top 5 hotels
hotel_name = hotel.get("hotel_name", "Unknown")
count = hotel.get("reservations", 0)
# Truncate long hotel names
if len(hotel_name) > 20:
hotel_name = hotel_name[:17] + "..."
lines.append(f"{hotel_name}: {count}")
if len(hotels) > 5:
lines.append(f" • ... and {len(hotels) - 5} more")
# Add error summary if present
if errors:
lines.append("")
lines.append(f"Errors: {len(errors)} (see logs)")
message = "\n".join(lines)
# Truncate if too long (Pushover limit is 1024 chars)
if len(message) > 1020:
message = message[:1017] + "..."
return message
def create_pushover_service(config: dict[str, Any]) -> PushoverService | None:
"""Create a Pushover service from configuration.
Args:
config: Full application configuration dictionary
Returns:
PushoverService instance if Pushover is configured, None otherwise
"""
pushover_config = config.get("pushover")
if not pushover_config:
_LOGGER.info("Pushover not configured, push notification service disabled")
return None
try:
pushover_cfg = PushoverConfig(pushover_config)
service = PushoverService(pushover_cfg)
_LOGGER.info("Pushover service initialized successfully")
return service
except Exception:
_LOGGER.exception("Failed to initialize Pushover service")
return None

View File

@@ -0,0 +1,119 @@
"""Worker coordination utilities for multi-worker FastAPI deployments.
This module provides utilities to ensure singleton services (schedulers, background tasks)
run on only one worker when using uvicorn --workers N.
"""
import fcntl
import os
from pathlib import Path
from typing import ContextManager
from .logging_config import get_logger
_LOGGER = get_logger(__name__)
class WorkerLock:
"""File-based lock to coordinate worker processes.
Only one worker can hold the lock at a time. This ensures singleton
services like schedulers only run on one worker.
"""
def __init__(self, lock_file: str = "/tmp/alpinebits_primary_worker.lock"):
"""Initialize the worker lock.
Args:
lock_file: Path to the lock file
"""
self.lock_file = Path(lock_file)
self.lock_fd = None
self.is_primary = False
def acquire(self) -> bool:
"""Try to acquire the primary worker lock.
Returns:
True if lock was acquired (this is the primary worker)
False if lock is held by another worker
"""
try:
# Create lock file if it doesn't exist
self.lock_file.parent.mkdir(parents=True, exist_ok=True)
# Open lock file
self.lock_fd = open(self.lock_file, "w")
# Try to acquire exclusive lock (non-blocking)
fcntl.flock(self.lock_fd.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
# Write PID to lock file for debugging
self.lock_fd.write(f"{os.getpid()}\n")
self.lock_fd.flush()
self.is_primary = True
_LOGGER.info(
"Acquired primary worker lock (pid=%d, lock_file=%s)",
os.getpid(),
self.lock_file,
)
return True
except (IOError, OSError) as e:
# Lock is held by another process
if self.lock_fd:
self.lock_fd.close()
self.lock_fd = None
self.is_primary = False
_LOGGER.info(
"Could not acquire primary worker lock - another worker is primary (pid=%d)",
os.getpid(),
)
return False
def release(self) -> None:
"""Release the primary worker lock."""
if self.lock_fd and self.is_primary:
try:
fcntl.flock(self.lock_fd.fileno(), fcntl.LOCK_UN)
self.lock_fd.close()
# Try to remove lock file (best effort)
try:
self.lock_file.unlink()
except Exception:
pass
_LOGGER.info("Released primary worker lock (pid=%d)", os.getpid())
except Exception:
_LOGGER.exception("Error releasing primary worker lock")
finally:
self.lock_fd = None
self.is_primary = False
def __enter__(self) -> "WorkerLock":
"""Context manager entry."""
self.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Context manager exit."""
self.release()
def is_primary_worker() -> tuple[bool, WorkerLock | None]:
"""Determine if this worker should run singleton services.
Uses file-based locking to coordinate between workers.
Returns:
Tuple of (is_primary, lock_object)
- is_primary: True if this is the primary worker
- lock_object: WorkerLock instance (must be kept alive)
"""
lock = WorkerLock()
is_primary = lock.acquire()
return is_primary, lock