Added email monitoring

This commit is contained in:
Jonas Linter
2025-10-15 08:46:25 +02:00
parent bb900ab1ee
commit f22684d592
11 changed files with 2279 additions and 4 deletions

View File

@@ -1 +1,3 @@
This python project is managed by uv. Use uv run to execute app and tests.
The Configuration is handled in a config.yaml file. The annotatedyaml library is used to load secrets. !secret SOME_SECRET in the yaml file refers to a secret definition in a secrets.yaml file

View File

@@ -39,3 +39,47 @@ alpine_bits_auth:
hotel_name: "Residence Erika"
username: "erika"
password: !secret ERIKA_PASSWORD
# Email configuration for monitoring and alerts
email:
# SMTP server configuration
smtp:
host: "smtp.gmail.com" # Your SMTP server
port: 587 # Usually 587 for TLS, 465 for SSL
username: !secret EMAIL_USERNAME # SMTP username
password: !secret EMAIL_PASSWORD # SMTP password
use_tls: true # Use STARTTLS
use_ssl: false # Use SSL/TLS from start
# Email addresses
from_address: "noreply@99tales.com" # Sender address
from_name: "AlpineBits Monitor" # Sender display name
# Monitoring and alerting
monitoring:
# Daily report configuration
daily_report:
enabled: false # Set to true to enable daily reports
recipients:
- "admin@99tales.com"
- "dev@99tales.com"
send_time: "08:00" # Time to send daily report (24h format, local time)
include_stats: true # Include reservation/customer stats
include_errors: true # Include error summary
# Error alert configuration (hybrid approach)
error_alerts:
enabled: false # Set to true to enable error alerts
recipients:
- "alerts@99tales.com"
- "oncall@99tales.com"
# Alert is sent immediately if threshold is reached
error_threshold: 5 # Send immediate alert after N errors
# Otherwise, alert is sent after buffer time expires
buffer_minutes: 15 # Wait N minutes before sending buffered errors
# Cooldown period to prevent alert spam
cooldown_minutes: 15 # Wait N min before sending another alert
# Error severity levels to monitor
log_levels:
- "ERROR"
- "CRITICAL"

423
docs/EMAIL_MONITORING.md Normal file
View File

@@ -0,0 +1,423 @@
# Email Monitoring and Alerting
This document describes the email monitoring and alerting system for the AlpineBits Python server.
## Overview
The email monitoring system provides two main features:
1. **Error Alerts**: Automatic email notifications when errors occur in the application
2. **Daily Reports**: Scheduled daily summary emails with statistics and error logs
## Architecture
### Components
- **EmailService** ([email_service.py](../src/alpine_bits_python/email_service.py)): Core SMTP email sending functionality
- **EmailAlertHandler** ([email_monitoring.py](../src/alpine_bits_python/email_monitoring.py)): Custom logging handler that captures errors and sends alerts
- **DailyReportScheduler** ([email_monitoring.py](../src/alpine_bits_python/email_monitoring.py)): Background task that sends daily reports
### How It Works
#### Error Alerts (Hybrid Approach)
The `EmailAlertHandler` uses a **hybrid threshold + time-based** approach:
1. **Immediate Alerts**: If the error threshold is reached (e.g., 5 errors), an alert email is sent immediately
2. **Buffered Alerts**: Otherwise, errors accumulate in a buffer and are sent after the buffer duration (e.g., 15 minutes)
3. **Cooldown Period**: After sending an alert, the system waits for a cooldown period before sending another alert to prevent spam
**Flow Diagram:**
```
Error occurs
Add to buffer
Buffer >= threshold? ──Yes──> Send immediate alert
↓ No ↓
Wait for buffer time Reset buffer
↓ ↓
Send buffered alert Enter cooldown
Reset buffer
```
#### Daily Reports
The `DailyReportScheduler` runs as a background task that:
1. Waits until the configured send time (e.g., 8:00 AM)
2. Collects statistics from the application
3. Gathers errors that occurred during the day
4. Formats and sends an email report
5. Clears the error log
6. Schedules the next report for the following day
## Configuration
### Email Configuration Keys
Add the following to your [config.yaml](../config/config.yaml):
```yaml
email:
# SMTP server configuration
smtp:
host: "smtp.gmail.com" # Your SMTP server hostname
port: 587 # SMTP port (587 for TLS, 465 for SSL)
username: !secret EMAIL_USERNAME # SMTP username (use !secret for env vars)
password: !secret EMAIL_PASSWORD # SMTP password (use !secret for env vars)
use_tls: true # Use STARTTLS encryption
use_ssl: false # Use SSL/TLS from start (mutually exclusive with use_tls)
# Sender information
from_address: "noreply@99tales.com"
from_name: "AlpineBits Monitor"
# Monitoring and alerting
monitoring:
# Daily report configuration
daily_report:
enabled: true # Enable/disable daily reports
recipients:
- "admin@99tales.com"
- "dev@99tales.com"
send_time: "08:00" # Time to send (24h format, local time)
include_stats: true # Include application statistics
include_errors: true # Include error summary
# Error alert configuration
error_alerts:
enabled: true # Enable/disable error alerts
recipients:
- "alerts@99tales.com"
- "oncall@99tales.com"
error_threshold: 5 # Send immediate alert after N errors
buffer_minutes: 15 # Wait N minutes before sending buffered errors
cooldown_minutes: 15 # Wait N minutes before sending another alert
log_levels: # Log levels to monitor
- "ERROR"
- "CRITICAL"
```
### Environment Variables
For security, store sensitive credentials in environment variables:
```bash
# Create a .env file (never commit this!)
EMAIL_USERNAME=your-smtp-username@gmail.com
EMAIL_PASSWORD=your-smtp-app-password
```
The `annotatedyaml` library automatically loads values marked with `!secret` from environment variables.
### Gmail Configuration
If using Gmail, you need to:
1. Enable 2-factor authentication on your Google account
2. Generate an "App Password" for SMTP access
3. Use the app password as `EMAIL_PASSWORD`
**Gmail Settings:**
```yaml
smtp:
host: "smtp.gmail.com"
port: 587
use_tls: true
use_ssl: false
```
### Other SMTP Providers
**SendGrid:**
```yaml
smtp:
host: "smtp.sendgrid.net"
port: 587
username: "apikey"
password: !secret SENDGRID_API_KEY
use_tls: true
```
**AWS SES:**
```yaml
smtp:
host: "email-smtp.us-east-1.amazonaws.com"
port: 587
username: !secret AWS_SES_USERNAME
password: !secret AWS_SES_PASSWORD
use_tls: true
```
## Usage
### Automatic Error Monitoring
Once configured, the system automatically captures all `ERROR` and `CRITICAL` log messages:
```python
from alpine_bits_python.logging_config import get_logger
_LOGGER = get_logger(__name__)
# This error will be captured and sent via email
_LOGGER.error("Database connection failed")
# This will also be captured
try:
risky_operation()
except Exception:
_LOGGER.exception("Operation failed") # Includes stack trace
```
### Triggering Test Alerts
To test your email configuration, you can manually trigger errors:
```python
import logging
_LOGGER = logging.getLogger(__name__)
# Generate multiple errors to trigger immediate alert (if threshold = 5)
for i in range(5):
_LOGGER.error(f"Test error {i + 1}")
```
### Daily Report Statistics
To include custom statistics in daily reports, set a stats collector function:
```python
async def collect_stats():
"""Collect application statistics for daily report."""
return {
"total_reservations": await count_reservations(),
"new_customers": await count_new_customers(),
"active_hotels": await count_active_hotels(),
"api_requests": get_request_count(),
}
# Register the collector
report_scheduler = app.state.report_scheduler
if report_scheduler:
report_scheduler.set_stats_collector(collect_stats)
```
## Email Templates
### Error Alert Email
**Subject:** 🚨 AlpineBits Error Alert: 5 errors (threshold exceeded)
**Body:**
```
Error Alert - 2025-10-15 14:30:45
======================================================================
Alert Type: Immediate Alert
Error Count: 5
Time Range: 14:25:00 to 14:30:00
Reason: (threshold of 5 exceeded)
======================================================================
Errors:
----------------------------------------------------------------------
[2025-10-15 14:25:12] ERROR: Database connection timeout
Module: db:245 (alpine_bits_python.db)
[2025-10-15 14:26:34] ERROR: Failed to process reservation
Module: api:567 (alpine_bits_python.api)
Exception:
Traceback (most recent call last):
...
----------------------------------------------------------------------
Generated by AlpineBits Email Monitoring at 2025-10-15 14:30:45
```
### Daily Report Email
**Subject:** AlpineBits Daily Report - 2025-10-15
**Body (HTML):**
```html
AlpineBits Daily Report
Date: 2025-10-15
Statistics
┌────────────────────────┬────────┐
│ Metric │ Value │
├────────────────────────┼────────┤
│ total_reservations │ 42 │
│ new_customers │ 15 │
│ active_hotels │ 4 │
│ api_requests │ 1,234 │
└────────────────────────┴────────┘
Errors (3)
┌──────────────┬──────────┬─────────────────────────┐
│ Time │ Level │ Message │
├──────────────┼──────────┼─────────────────────────┤
│ 08:15:23 │ ERROR │ Connection timeout │
│ 12:45:10 │ ERROR │ Invalid form data │
│ 18:30:00 │ CRITICAL │ Database unavailable │
└──────────────┴──────────┴─────────────────────────┘
Generated by AlpineBits Server
```
## Monitoring and Troubleshooting
### Check Email Configuration
```python
from alpine_bits_python.email_service import create_email_service
from alpine_bits_python.config_loader import load_config
config = load_config()
email_service = create_email_service(config)
if email_service:
print("✓ Email service configured")
else:
print("✗ Email service not configured")
```
### Test Email Sending
```python
import asyncio
from alpine_bits_python.email_service import EmailService, EmailConfig
async def test_email():
config = EmailConfig({
"smtp": {
"host": "smtp.gmail.com",
"port": 587,
"username": "your-email@gmail.com",
"password": "your-app-password",
"use_tls": True,
},
"from_address": "sender@example.com",
"from_name": "Test",
})
service = EmailService(config)
result = await service.send_email(
recipients=["recipient@example.com"],
subject="Test Email",
body="This is a test email from AlpineBits server.",
)
if result:
print("✓ Email sent successfully")
else:
print("✗ Email sending failed")
asyncio.run(test_email())
```
### Common Issues
**Issue: "Authentication failed"**
- Verify SMTP username and password are correct
- For Gmail, ensure you're using an App Password, not your regular password
- Check that 2FA is enabled on Gmail
**Issue: "Connection timeout"**
- Verify SMTP host and port are correct
- Check firewall rules allow outbound SMTP connections
- Try using port 465 with SSL instead of 587 with TLS
**Issue: "No email alerts received"**
- Check that `enabled: true` in config
- Verify recipient email addresses are correct
- Check application logs for email sending errors
- Ensure errors are being logged at ERROR or CRITICAL level
**Issue: "Too many emails being sent"**
- Increase `cooldown_minutes` to reduce alert frequency
- Increase `buffer_minutes` to batch more errors together
- Increase `error_threshold` to only alert on serious issues
## Performance Considerations
### SMTP is Blocking
Email sending uses the standard Python `smtplib`, which performs blocking I/O. To prevent blocking the async event loop:
- Email operations are automatically run in a thread pool executor
- This happens transparently via `loop.run_in_executor()`
- No performance impact on request handling
### Memory Usage
- Error buffer size is limited by `buffer_minutes` duration
- Old errors are automatically cleared after sending
- Daily report error log is cleared after each report
- Typical memory usage: <1 MB for error buffering
### Error Handling
- Email sending failures are logged but never crash the application
- If SMTP is unavailable, errors are logged to console/file as normal
- The logging handler has exception safety - it will never cause application failures
## Security Considerations
1. **Never commit credentials to git**
- Use `!secret` annotation in YAML
- Store credentials in environment variables
- Add `.env` to `.gitignore`
2. **Use TLS/SSL encryption**
- Always set `use_tls: true` or `use_ssl: true`
- Never send credentials in plaintext
3. **Limit email recipients**
- Only send alerts to authorized personnel
- Use dedicated monitoring email addresses
- Consider using distribution lists
4. **Sensitive data in logs**
- Be careful not to log passwords, API keys, or PII
- Error messages in emails may contain sensitive context
- Review log messages before enabling email alerts
## Testing
Run the test suite:
```bash
# Test email service only
uv run pytest tests/test_email_service.py -v
# Test with coverage
uv run pytest tests/test_email_service.py --cov=alpine_bits_python.email_service --cov=alpine_bits_python.email_monitoring
```
## Future Enhancements
Potential improvements for future versions:
- [ ] Support for email templates (Jinja2)
- [ ] Configurable retry logic for failed sends
- [ ] Email queuing for high-volume scenarios
- [ ] Integration with external monitoring services (PagerDuty, Slack)
- [ ] Weekly/monthly report options
- [ ] Custom alert rules based on error patterns
- [ ] Email attachments for detailed logs
- [ ] HTML email styling improvements
## References
- [Python smtplib Documentation](https://docs.python.org/3/library/smtplib.html)
- [Python logging Documentation](https://docs.python.org/3/library/logging.html)
- [Gmail SMTP Settings](https://support.google.com/mail/answer/7126229)
- [annotatedyaml Documentation](https://github.com/yourusername/annotatedyaml)

View File

@@ -0,0 +1,172 @@
# Email Monitoring Quick Start
Get email notifications for errors and daily reports in 5 minutes.
## 1. Configure SMTP Settings
Edit `config/config.yaml` and add:
```yaml
email:
smtp:
host: "smtp.gmail.com"
port: 587
username: !secret EMAIL_USERNAME
password: !secret EMAIL_PASSWORD
use_tls: true
from_address: "noreply@yourdomain.com"
from_name: "AlpineBits Monitor"
```
## 2. Set Environment Variables
Create a `.env` file in the project root:
```bash
EMAIL_USERNAME=your-email@gmail.com
EMAIL_PASSWORD=your-app-password
```
> **Note:** For Gmail, use an [App Password](https://support.google.com/accounts/answer/185833), not your regular password.
## 3. Enable Error Alerts
In `config/config.yaml`:
```yaml
email:
monitoring:
error_alerts:
enabled: true
recipients:
- "alerts@yourdomain.com"
error_threshold: 5
buffer_minutes: 15
cooldown_minutes: 15
```
**How it works:**
- Sends immediate alert after 5 errors
- Otherwise sends after 15 minutes
- Waits 15 minutes between alerts (cooldown)
## 4. Enable Daily Reports (Optional)
In `config/config.yaml`:
```yaml
email:
monitoring:
daily_report:
enabled: true
recipients:
- "admin@yourdomain.com"
send_time: "08:00"
include_stats: true
include_errors: true
```
## 5. Test Your Configuration
Run the test script:
```bash
uv run python examples/test_email_monitoring.py
```
This will:
- ✅ Send a test email
- ✅ Trigger an error alert
- ✅ Send a test daily report
## What You Get
### Error Alert Email
When errors occur, you'll receive:
```
🚨 AlpineBits Error Alert: 5 errors (threshold exceeded)
Error Count: 5
Time Range: 14:25:00 to 14:30:00
Errors:
----------------------------------------------------------------------
[2025-10-15 14:25:12] ERROR: Database connection timeout
Module: db:245
[2025-10-15 14:26:34] ERROR: Failed to process reservation
Module: api:567
Exception: ValueError: Invalid hotel code
```
### Daily Report Email
Every day at 8 AM, you'll receive:
```
📊 AlpineBits Daily Report - 2025-10-15
Statistics:
total_reservations: 42
new_customers: 15
active_hotels: 4
Errors (3):
[08:15:23] ERROR: Connection timeout
[12:45:10] ERROR: Invalid form data
[18:30:00] CRITICAL: Database unavailable
```
## Troubleshooting
### No emails received?
1. Check your SMTP credentials:
```bash
echo $EMAIL_USERNAME
echo $EMAIL_PASSWORD
```
2. Check application logs for errors:
```bash
tail -f alpinebits.log | grep -i email
```
3. Test SMTP connection manually:
```bash
uv run python -c "
import smtplib
with smtplib.SMTP('smtp.gmail.com', 587) as smtp:
smtp.starttls()
smtp.login('$EMAIL_USERNAME', '$EMAIL_PASSWORD')
print('✅ SMTP connection successful')
"
```
### Gmail authentication failed?
- Enable 2-factor authentication on your Google account
- Generate an App Password at https://myaccount.google.com/apppasswords
- Use the App Password (not your regular password)
### Too many emails?
- Increase `error_threshold` to only alert on serious issues
- Increase `buffer_minutes` to batch more errors together
- Increase `cooldown_minutes` to reduce alert frequency
## Next Steps
- Read the full [Email Monitoring Documentation](./EMAIL_MONITORING.md)
- Configure custom statistics for daily reports
- Set up multiple recipient groups
- Integrate with Slack or PagerDuty (coming soon)
## Support
For issues or questions:
- Check the [documentation](./EMAIL_MONITORING.md)
- Review [test examples](../examples/test_email_monitoring.py)
- Open an issue on GitHub

View File

@@ -0,0 +1,305 @@
"""Example script to test email monitoring functionality.
This script demonstrates how to:
1. Configure the email service
2. Send test emails
3. Trigger error alerts
4. Test daily report generation
Usage:
uv run python examples/test_email_monitoring.py
"""
import asyncio
import logging
from datetime import datetime
from alpine_bits_python.config_loader import load_config
from alpine_bits_python.email_monitoring import (
DailyReportScheduler,
EmailAlertHandler,
)
from alpine_bits_python.email_service import create_email_service
from alpine_bits_python.logging_config import get_logger, setup_logging
_LOGGER = get_logger(__name__)
async def test_basic_email():
"""Test 1: Send a basic test email."""
print("\n" + "=" * 60)
print("Test 1: Basic Email Sending")
print("=" * 60)
config = load_config()
email_service = create_email_service(config)
if not email_service:
print("❌ Email service not configured. Check your config.yaml")
return False
print("✓ Email service initialized")
# Get the first recipient from error_alerts config
email_config = config.get("email", {})
monitoring_config = email_config.get("monitoring", {})
error_alerts_config = monitoring_config.get("error_alerts", {})
recipients = error_alerts_config.get("recipients", [])
if not recipients:
print("❌ No recipients configured in error_alerts")
return False
print(f"✓ Sending test email to: {recipients[0]}")
success = await email_service.send_email(
recipients=[recipients[0]],
subject="AlpineBits Email Test - Basic",
body=f"""This is a test email from the AlpineBits server.
Timestamp: {datetime.now().isoformat()}
Test: Basic email sending
If you received this email, your SMTP configuration is working correctly!
---
AlpineBits Python Server
Email Monitoring System
""",
)
if success:
print("✅ Test email sent successfully!")
return True
else:
print("❌ Failed to send test email. Check logs for details.")
return False
async def test_error_alert_threshold():
"""Test 2: Trigger immediate error alert by exceeding threshold."""
print("\n" + "=" * 60)
print("Test 2: Error Alert - Threshold Trigger")
print("=" * 60)
config = load_config()
email_service = create_email_service(config)
if not email_service:
print("❌ Email service not configured")
return False
# Setup logging with email monitoring
loop = asyncio.get_running_loop()
email_handler, _ = setup_logging(config, email_service, loop)
if not email_handler:
print("❌ Error alert handler not configured")
return False
print(f"✓ Error alert handler configured (threshold: {email_handler.error_threshold})")
print(f" Recipients: {email_handler.recipients}")
# Generate errors to exceed threshold
threshold = email_handler.error_threshold
print(f"\n📨 Generating {threshold} errors to trigger immediate alert...")
logger = logging.getLogger("test.error.threshold")
for i in range(threshold):
logger.error(f"Test error #{i + 1} - Threshold test at {datetime.now().isoformat()}")
print(f" → Error {i + 1}/{threshold} logged")
await asyncio.sleep(0.1) # Small delay between errors
# Wait a bit for email to be sent
print("\n⏳ Waiting for alert email to be sent...")
await asyncio.sleep(3)
print("✅ Threshold test complete! Check your email for the alert.")
return True
async def test_error_alert_buffer():
"""Test 3: Trigger buffered error alert by waiting for buffer time."""
print("\n" + "=" * 60)
print("Test 3: Error Alert - Buffer Time Trigger")
print("=" * 60)
config = load_config()
email_service = create_email_service(config)
if not email_service:
print("❌ Email service not configured")
return False
# Setup logging with email monitoring
loop = asyncio.get_running_loop()
email_handler, _ = setup_logging(config, email_service, loop)
if not email_handler:
print("❌ Error alert handler not configured")
return False
print(f"✓ Error alert handler configured (buffer: {email_handler.buffer_minutes} minutes)")
# Generate fewer errors than threshold
num_errors = max(1, email_handler.error_threshold - 2)
print(f"\n📨 Generating {num_errors} errors (below threshold)...")
logger = logging.getLogger("test.error.buffer")
for i in range(num_errors):
logger.error(f"Test error #{i + 1} - Buffer test at {datetime.now().isoformat()}")
print(f" → Error {i + 1}/{num_errors} logged")
buffer_seconds = email_handler.buffer_minutes * 60
print(f"\n⏳ Waiting {email_handler.buffer_minutes} minute(s) for buffer to flush...")
print(" (This will send an email with all buffered errors)")
# Wait for buffer time + a bit extra
await asyncio.sleep(buffer_seconds + 2)
print("✅ Buffer test complete! Check your email for the alert.")
return True
async def test_daily_report():
"""Test 4: Generate and send a test daily report."""
print("\n" + "=" * 60)
print("Test 4: Daily Report")
print("=" * 60)
config = load_config()
email_service = create_email_service(config)
if not email_service:
print("❌ Email service not configured")
return False
# Create a daily report scheduler
daily_report_config = (
config.get("email", {})
.get("monitoring", {})
.get("daily_report", {})
)
if not daily_report_config.get("enabled"):
print("⚠️ Daily reports not enabled in config")
print(" Set email.monitoring.daily_report.enabled = true")
return False
scheduler = DailyReportScheduler(email_service, daily_report_config)
print(f"✓ Daily report scheduler configured")
print(f" Recipients: {scheduler.recipients}")
print(f" Send time: {scheduler.send_time}")
# Add some test statistics
test_stats = {
"total_reservations": 42,
"new_customers": 15,
"active_hotels": 4,
"api_requests_today": 1234,
"average_response_time_ms": 45,
"success_rate": "99.2%",
}
# Add some test errors
test_errors = [
{
"timestamp": "2025-10-15 08:15:23",
"level": "ERROR",
"message": "Connection timeout to external API",
},
{
"timestamp": "2025-10-15 12:45:10",
"level": "ERROR",
"message": "Invalid form data submitted",
},
{
"timestamp": "2025-10-15 18:30:00",
"level": "CRITICAL",
"message": "Database connection pool exhausted",
},
]
print("\n📊 Sending test daily report...")
print(f" Stats: {len(test_stats)} metrics")
print(f" Errors: {len(test_errors)} entries")
success = await email_service.send_daily_report(
recipients=scheduler.recipients,
stats=test_stats,
errors=test_errors,
)
if success:
print("✅ Daily report sent successfully!")
return True
else:
print("❌ Failed to send daily report. Check logs for details.")
return False
async def run_all_tests():
"""Run all email monitoring tests."""
print("\n" + "=" * 60)
print("AlpineBits Email Monitoring Test Suite")
print("=" * 60)
tests = [
("Basic Email", test_basic_email),
("Error Alert (Threshold)", test_error_alert_threshold),
("Error Alert (Buffer)", test_error_alert_buffer),
("Daily Report", test_daily_report),
]
results = []
for test_name, test_func in tests:
try:
result = await test_func()
results.append((test_name, result))
except Exception as e:
print(f"\n❌ Test '{test_name}' failed with exception: {e}")
results.append((test_name, False))
# Wait between tests to avoid rate limiting
await asyncio.sleep(2)
# Print summary
print("\n" + "=" * 60)
print("Test Summary")
print("=" * 60)
passed = sum(1 for _, result in results if result)
total = len(results)
for test_name, result in results:
status = "✅ PASS" if result else "❌ FAIL"
print(f"{status}: {test_name}")
print(f"\nTotal: {passed}/{total} tests passed")
if passed == total:
print("\n🎉 All tests passed!")
else:
print(f"\n⚠️ {total - passed} test(s) failed")
def main():
"""Main entry point."""
print("Starting email monitoring tests...")
print("Make sure you have configured email settings in config.yaml")
print("and set EMAIL_USERNAME and EMAIL_PASSWORD environment variables.")
# Run the tests
try:
asyncio.run(run_all_tests())
except KeyboardInterrupt:
print("\n\n⚠️ Tests interrupted by user")
except Exception as e:
print(f"\n\n❌ Fatal error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()

View File

@@ -32,6 +32,7 @@ from .customer_service import CustomerService
from .db import Base, get_database_url
from .db import Customer as DBCustomer
from .db import Reservation as DBReservation
from .email_service import create_email_service
from .logging_config import get_logger, setup_logging
from .rate_limit import (
BURST_RATE_LIMIT,
@@ -185,8 +186,14 @@ async def lifespan(app: FastAPI):
_LOGGER.exception("Failed to load config: ")
config = {}
# Setup logging from config
setup_logging(config)
# Get event loop for email monitoring
loop = asyncio.get_running_loop()
# Initialize email service (before logging setup so it can be used by handlers)
email_service = create_email_service(config)
# Setup logging from config with email monitoring
email_handler, report_scheduler = setup_logging(config, email_service, loop)
_LOGGER.info("Application startup initiated")
DATABASE_URL = get_database_url(config)
@@ -198,6 +205,9 @@ async def lifespan(app: FastAPI):
app.state.config = config
app.state.alpine_bits_server = AlpineBitsServer(config)
app.state.event_dispatcher = event_dispatcher
app.state.email_service = email_service
app.state.email_handler = email_handler
app.state.report_scheduler = report_scheduler
# Register push listeners for hotels with push_endpoint
for hotel in config.get("alpine_bits_auth", []):
@@ -235,10 +245,31 @@ async def lifespan(app: FastAPI):
else:
_LOGGER.info("All existing customers already have hashed data")
# Start daily report scheduler if enabled
if report_scheduler:
report_scheduler.start()
_LOGGER.info("Daily report scheduler started")
_LOGGER.info("Application startup complete")
yield
# Optional: Dispose engine on shutdown
# Cleanup on shutdown
_LOGGER.info("Application shutdown initiated")
# Stop daily report scheduler
if report_scheduler:
report_scheduler.stop()
_LOGGER.info("Daily report scheduler stopped")
# Close email alert handler (flush any remaining errors)
if email_handler:
email_handler.close()
_LOGGER.info("Email alert handler closed")
# Dispose engine
await engine.dispose()
_LOGGER.info("Application shutdown complete")
async def get_async_session(request: Request):

View File

@@ -6,9 +6,12 @@ from annotatedyaml.loader import load_yaml as load_annotated_yaml
from voluptuous import (
PREVENT_EXTRA,
All,
Boolean,
In,
Length,
MultipleInvalid,
Optional,
Range,
Required,
Schema,
)
@@ -82,12 +85,74 @@ hotel_auth_schema = Schema(
basic_auth_schema = Schema(All([hotel_auth_schema], Length(min=1)))
# Email SMTP configuration schema
smtp_schema = Schema(
{
Required("host", default="localhost"): str,
Required("port", default=587): Range(min=1, max=65535),
Optional("username"): str,
Optional("password"): str,
Required("use_tls", default=True): Boolean(),
Required("use_ssl", default=False): Boolean(),
},
extra=PREVENT_EXTRA,
)
# Email daily report configuration schema
daily_report_schema = Schema(
{
Required("enabled", default=False): Boolean(),
Optional("recipients", default=[]): [str],
Required("send_time", default="08:00"): str,
Required("include_stats", default=True): Boolean(),
Required("include_errors", default=True): Boolean(),
},
extra=PREVENT_EXTRA,
)
# Email error alerts configuration schema
error_alerts_schema = Schema(
{
Required("enabled", default=False): Boolean(),
Optional("recipients", default=[]): [str],
Required("error_threshold", default=5): Range(min=1),
Required("buffer_minutes", default=15): Range(min=1),
Required("cooldown_minutes", default=15): Range(min=0),
Required("log_levels", default=["ERROR", "CRITICAL"]): [
In(["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"])
],
},
extra=PREVENT_EXTRA,
)
# Email monitoring configuration schema
monitoring_schema = Schema(
{
Optional("daily_report", default={}): daily_report_schema,
Optional("error_alerts", default={}): error_alerts_schema,
},
extra=PREVENT_EXTRA,
)
# Complete email configuration schema
email_schema = Schema(
{
Optional("smtp", default={}): smtp_schema,
Required("from_address", default="noreply@example.com"): str,
Required("from_name", default="AlpineBits Server"): str,
Optional("timeout", default=10): Range(min=1, max=300),
Optional("monitoring", default={}): monitoring_schema,
},
extra=PREVENT_EXTRA,
)
config_schema = Schema(
{
Required(CONF_DATABASE): database_schema,
Required(CONF_ALPINE_BITS_AUTH): basic_auth_schema,
Required(CONF_SERVER): server_info,
Required(CONF_LOGGING): logger_schema,
Optional("email"): email_schema, # Email is optional
},
extra=PREVENT_EXTRA,
)

View File

@@ -0,0 +1,442 @@
"""Email monitoring and alerting through logging integration.
This module provides a custom logging handler that accumulates errors and sends
email alerts based on configurable thresholds and time windows.
"""
import asyncio
import logging
import threading
from collections import deque
from datetime import datetime, timedelta
from typing import Any
from .email_service import EmailService
from .logging_config import get_logger
_LOGGER = get_logger(__name__)
class ErrorRecord:
"""Represents a single error log record for monitoring.
Attributes:
timestamp: When the error occurred
level: Log level (ERROR, CRITICAL, etc.)
logger_name: Name of the logger that generated the error
message: The error message
exception: Exception info if available
module: Module where error occurred
line_no: Line number where error occurred
"""
def __init__(self, record: logging.LogRecord):
"""Initialize from a logging.LogRecord.
Args:
record: The logging record to wrap
"""
self.timestamp = datetime.fromtimestamp(record.created)
self.level = record.levelname
self.logger_name = record.name
self.message = record.getMessage()
self.exception = record.exc_text if record.exc_info else None
self.module = record.module
self.line_no = record.lineno
self.pathname = record.pathname
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary format.
Returns:
Dictionary representation of the error
"""
return {
"timestamp": self.timestamp.strftime("%Y-%m-%d %H:%M:%S"),
"level": self.level,
"logger_name": self.logger_name,
"message": self.message,
"exception": self.exception,
"module": self.module,
"line_no": self.line_no,
"pathname": self.pathname,
}
def format_plain_text(self) -> str:
"""Format error as plain text for email.
Returns:
Formatted plain text string
"""
text = f"[{self.timestamp.strftime('%Y-%m-%d %H:%M:%S')}] {self.level}: {self.message}\n"
text += f" Module: {self.module}:{self.line_no} ({self.logger_name})\n"
if self.exception:
text += f" Exception:\n{self.exception}\n"
return text
class EmailAlertHandler(logging.Handler):
"""Custom logging handler that sends email alerts for errors.
This handler uses a hybrid approach:
- Accumulates errors in a buffer
- Sends immediately if error threshold is reached
- Otherwise sends after buffer duration expires
- Always sends buffered errors (no minimum threshold for time-based flush)
- Implements cooldown to prevent alert spam
The handler is thread-safe and works with asyncio event loops.
"""
def __init__(
self,
email_service: EmailService,
config: dict[str, Any],
loop: asyncio.AbstractEventLoop | None = None,
):
"""Initialize the email alert handler.
Args:
email_service: Email service instance for sending alerts
config: Configuration dictionary for error alerts
loop: Asyncio event loop (will use current loop if not provided)
"""
super().__init__()
self.email_service = email_service
self.config = config
self.loop = loop # Will be set when first error occurs if not provided
# Configuration
self.recipients = config.get("recipients", [])
self.error_threshold = config.get("error_threshold", 5)
self.buffer_minutes = config.get("buffer_minutes", 15)
self.cooldown_minutes = config.get("cooldown_minutes", 15)
self.log_levels = config.get("log_levels", ["ERROR", "CRITICAL"])
# State
self.error_buffer: deque[ErrorRecord] = deque()
self.last_sent = datetime.min # Last time we sent an alert
self._flush_task: asyncio.Task | None = None
self._lock = threading.Lock() # Thread-safe for multi-threaded logging
_LOGGER.info(
"EmailAlertHandler initialized: threshold=%d, buffer=%dmin, cooldown=%dmin",
self.error_threshold,
self.buffer_minutes,
self.cooldown_minutes,
)
def emit(self, record: logging.LogRecord) -> None:
"""Handle a log record.
This is called automatically by the logging system when an error is logged.
It's important that this method is fast and doesn't block.
Args:
record: The log record to handle
"""
# Only handle configured log levels
if record.levelname not in self.log_levels:
return
try:
# Ensure we have an event loop
if self.loop is None:
try:
self.loop = asyncio.get_running_loop()
except RuntimeError:
# No running loop, we'll need to handle this differently
_LOGGER.warning("No asyncio event loop available for email alerts")
return
# Add error to buffer (thread-safe)
with self._lock:
error_record = ErrorRecord(record)
self.error_buffer.append(error_record)
buffer_size = len(self.error_buffer)
# Determine if we should send immediately
should_send_immediately = buffer_size >= self.error_threshold
if should_send_immediately:
# Cancel any pending flush task
if self._flush_task and not self._flush_task.done():
self._flush_task.cancel()
# Schedule immediate flush
self._flush_task = asyncio.run_coroutine_threadsafe(
self._flush_buffer(immediate=True),
self.loop,
)
else:
# Schedule delayed flush if not already scheduled
if not self._flush_task or self._flush_task.done():
self._flush_task = asyncio.run_coroutine_threadsafe(
self._schedule_delayed_flush(),
self.loop,
)
except Exception:
# Never let the handler crash - just log and continue
_LOGGER.exception("Error in EmailAlertHandler.emit")
async def _schedule_delayed_flush(self) -> None:
"""Schedule a delayed buffer flush after buffer duration."""
await asyncio.sleep(self.buffer_minutes * 60)
await self._flush_buffer(immediate=False)
async def _flush_buffer(self, *, immediate: bool) -> None:
"""Flush the error buffer and send email alert.
Args:
immediate: Whether this is an immediate flush (threshold hit)
"""
# Check cooldown period
now = datetime.now()
time_since_last = (now - self.last_sent).total_seconds() / 60
if time_since_last < self.cooldown_minutes:
_LOGGER.info(
"Alert cooldown active (%.1f min remaining), buffering errors",
self.cooldown_minutes - time_since_last,
)
# Don't clear buffer - let errors accumulate until cooldown expires
return
# Get all buffered errors (thread-safe)
with self._lock:
if not self.error_buffer:
return
errors = list(self.error_buffer)
self.error_buffer.clear()
# Update last sent time
self.last_sent = now
# Format email
error_count = len(errors)
time_range = (
f"{errors[0].timestamp.strftime('%H:%M:%S')} to "
f"{errors[-1].timestamp.strftime('%H:%M:%S')}"
)
# Determine alert type for subject
alert_type = "Immediate Alert" if immediate else "Scheduled Alert"
if immediate:
emoji = "🚨"
reason = f"(threshold of {self.error_threshold} exceeded)"
else:
emoji = "⚠️"
reason = f"({self.buffer_minutes} minute buffer)"
subject = f"{emoji} AlpineBits Error {alert_type}: {error_count} errors {reason}"
# Build plain text body
body = f"Error Alert - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
body += "=" * 70 + "\n\n"
body += f"Alert Type: {alert_type}\n"
body += f"Error Count: {error_count}\n"
body += f"Time Range: {time_range}\n"
body += f"Reason: {reason}\n"
body += "\n" + "=" * 70 + "\n\n"
# Add individual errors
body += "Errors:\n"
body += "-" * 70 + "\n\n"
for error in errors:
body += error.format_plain_text()
body += "\n"
body += "-" * 70 + "\n"
body += f"Generated by AlpineBits Email Monitoring at {now.strftime('%Y-%m-%d %H:%M:%S')}\n"
# Send email
try:
success = await self.email_service.send_alert(
recipients=self.recipients,
subject=subject,
body=body,
)
if success:
_LOGGER.info(
"Email alert sent successfully: %d errors to %s",
error_count,
self.recipients,
)
else:
_LOGGER.error("Failed to send email alert for %d errors", error_count)
except Exception:
_LOGGER.exception("Exception while sending email alert")
def close(self) -> None:
"""Close the handler and flush any remaining errors.
This is called when the logging system shuts down.
"""
# Cancel any pending flush tasks
if self._flush_task and not self._flush_task.done():
self._flush_task.cancel()
# Flush any remaining errors immediately
if self.error_buffer and self.loop:
try:
asyncio.run_coroutine_threadsafe(
self._flush_buffer(immediate=False),
self.loop,
).result(timeout=5)
except Exception:
_LOGGER.exception("Error flushing buffer on close")
super().close()
class DailyReportScheduler:
"""Scheduler for sending daily reports at configured times.
This runs as a background task and sends daily reports containing
statistics and error summaries.
"""
def __init__(
self,
email_service: EmailService,
config: dict[str, Any],
):
"""Initialize the daily report scheduler.
Args:
email_service: Email service for sending reports
config: Configuration for daily reports
"""
self.email_service = email_service
self.config = config
self.recipients = config.get("recipients", [])
self.send_time = config.get("send_time", "08:00") # Default 8 AM
self.include_stats = config.get("include_stats", True)
self.include_errors = config.get("include_errors", True)
self._task: asyncio.Task | None = None
self._stats_collector = None # Will be set by application
self._error_log: list[dict[str, Any]] = []
_LOGGER.info(
"DailyReportScheduler initialized: send_time=%s, recipients=%s",
self.send_time,
self.recipients,
)
def start(self) -> None:
"""Start the daily report scheduler."""
if self._task is None or self._task.done():
self._task = asyncio.create_task(self._run())
_LOGGER.info("Daily report scheduler started")
def stop(self) -> None:
"""Stop the daily report scheduler."""
if self._task and not self._task.done():
self._task.cancel()
_LOGGER.info("Daily report scheduler stopped")
def log_error(self, error: dict[str, Any]) -> None:
"""Log an error for inclusion in daily report.
Args:
error: Error information dictionary
"""
self._error_log.append(error)
async def _run(self) -> None:
"""Run the daily report scheduler loop."""
while True:
try:
# Calculate time until next report
now = datetime.now()
target_hour, target_minute = map(int, self.send_time.split(":"))
# Calculate next send time
next_send = now.replace(
hour=target_hour,
minute=target_minute,
second=0,
microsecond=0,
)
# If time has passed today, schedule for tomorrow
if next_send <= now:
next_send += timedelta(days=1)
# Calculate sleep duration
sleep_seconds = (next_send - now).total_seconds()
_LOGGER.info(
"Next daily report scheduled for %s (in %.1f hours)",
next_send.strftime("%Y-%m-%d %H:%M:%S"),
sleep_seconds / 3600,
)
# Wait until send time
await asyncio.sleep(sleep_seconds)
# Send report
await self._send_report()
except asyncio.CancelledError:
_LOGGER.info("Daily report scheduler cancelled")
break
except Exception:
_LOGGER.exception("Error in daily report scheduler")
# Sleep a bit before retrying
await asyncio.sleep(60)
async def _send_report(self) -> None:
"""Send the daily report."""
stats = {}
# Collect statistics if enabled
if self.include_stats and self._stats_collector:
try:
stats = await self._stats_collector()
except Exception:
_LOGGER.exception("Error collecting statistics for daily report")
# Get errors if enabled
errors = self._error_log.copy() if self.include_errors else None
# Send report
try:
success = await self.email_service.send_daily_report(
recipients=self.recipients,
stats=stats,
errors=errors,
)
if success:
_LOGGER.info("Daily report sent successfully to %s", self.recipients)
# Clear error log after successful send
self._error_log.clear()
else:
_LOGGER.error("Failed to send daily report")
except Exception:
_LOGGER.exception("Exception while sending daily report")
def set_stats_collector(self, collector) -> None:
"""Set the statistics collector function.
Args:
collector: Async function that returns statistics dictionary
"""
self._stats_collector = collector

View File

@@ -0,0 +1,359 @@
"""Email service for sending alerts and reports.
This module provides email functionality for the AlpineBits application,
including error alerts and daily reports.
"""
import asyncio
import smtplib
import ssl
from datetime import datetime
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import Any
from pydantic import EmailStr, Field, field_validator
from .logging_config import get_logger
_LOGGER = get_logger(__name__)
class EmailConfig:
"""Configuration for email service.
Attributes:
smtp_host: SMTP server hostname
smtp_port: SMTP server port
smtp_username: SMTP authentication username
smtp_password: SMTP authentication password
use_tls: Use STARTTLS for encryption
use_ssl: Use SSL/TLS from the start
from_address: Sender email address
from_name: Sender display name
timeout: Connection timeout in seconds
"""
def __init__(self, config: dict[str, Any]):
"""Initialize email configuration from config dict.
Args:
config: Email configuration dictionary
"""
smtp_config = config.get("smtp", {})
self.smtp_host: str = smtp_config.get("host", "localhost")
self.smtp_port: int = smtp_config.get("port", 587)
self.smtp_username: str | None = smtp_config.get("username")
self.smtp_password: str | None = smtp_config.get("password")
self.use_tls: bool = smtp_config.get("use_tls", True)
self.use_ssl: bool = smtp_config.get("use_ssl", False)
self.from_address: str = config.get("from_address", "noreply@example.com")
self.from_name: str = config.get("from_name", "AlpineBits Server")
self.timeout: int = config.get("timeout", 10)
# Validate configuration
if self.use_tls and self.use_ssl:
msg = "Cannot use both TLS and SSL"
raise ValueError(msg)
class EmailService:
"""Service for sending emails via SMTP.
This service handles sending both plain text and HTML emails,
with support for TLS/SSL encryption and authentication.
"""
def __init__(self, config: EmailConfig):
"""Initialize email service.
Args:
config: Email configuration
"""
self.config = config
self._executor = None # Lazy-initialized thread pool for blocking SMTP
async def send_email(
self,
recipients: list[str],
subject: str,
body: str,
html_body: str | None = None,
) -> bool:
"""Send an email to recipients.
Args:
recipients: List of recipient email addresses
subject: Email subject line
body: Plain text email body
html_body: Optional HTML email body
Returns:
True if email was sent successfully, False otherwise
"""
if not recipients:
_LOGGER.warning("No recipients specified for email: %s", subject)
return False
try:
# Build message
msg = MIMEMultipart("alternative")
msg["Subject"] = subject
msg["From"] = f"{self.config.from_name} <{self.config.from_address}>"
msg["To"] = ", ".join(recipients)
msg["Date"] = datetime.now().strftime("%a, %d %b %Y %H:%M:%S %z")
# Attach plain text body
msg.attach(MIMEText(body, "plain"))
# Attach HTML body if provided
if html_body:
msg.attach(MIMEText(html_body, "html"))
# Send email in thread pool (SMTP is blocking)
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self._send_smtp, msg, recipients)
_LOGGER.info("Email sent successfully to %s: %s", recipients, subject)
return True
except Exception:
_LOGGER.exception("Failed to send email to %s: %s", recipients, subject)
return False
def _send_smtp(self, msg: MIMEMultipart, recipients: list[str]) -> None:
"""Send email via SMTP (blocking operation).
Args:
msg: Email message to send
recipients: List of recipient addresses
Raises:
Exception: If email sending fails
"""
if self.config.use_ssl:
# Connect with SSL from the start
context = ssl.create_default_context()
with smtplib.SMTP_SSL(
self.config.smtp_host,
self.config.smtp_port,
timeout=self.config.timeout,
context=context,
) as server:
if self.config.smtp_username and self.config.smtp_password:
server.login(self.config.smtp_username, self.config.smtp_password)
server.send_message(msg, self.config.from_address, recipients)
else:
# Connect and optionally upgrade to TLS
with smtplib.SMTP(
self.config.smtp_host,
self.config.smtp_port,
timeout=self.config.timeout,
) as server:
if self.config.use_tls:
context = ssl.create_default_context()
server.starttls(context=context)
if self.config.smtp_username and self.config.smtp_password:
server.login(self.config.smtp_username, self.config.smtp_password)
server.send_message(msg, self.config.from_address, recipients)
async def send_alert(
self,
recipients: list[str],
subject: str,
body: str,
) -> bool:
"""Send an alert email (convenience method).
Args:
recipients: List of recipient email addresses
subject: Email subject line
body: Email body text
Returns:
True if email was sent successfully, False otherwise
"""
return await self.send_email(recipients, subject, body)
async def send_daily_report(
self,
recipients: list[str],
stats: dict[str, Any],
errors: list[dict[str, Any]] | None = None,
) -> bool:
"""Send a daily report email.
Args:
recipients: List of recipient email addresses
stats: Dictionary containing statistics to include in report
errors: Optional list of errors to include
Returns:
True if email was sent successfully, False otherwise
"""
date_str = datetime.now().strftime("%Y-%m-%d")
subject = f"AlpineBits Daily Report - {date_str}"
# Build plain text body
body = f"AlpineBits Daily Report for {date_str}\n"
body += "=" * 60 + "\n\n"
# Add statistics
if stats:
body += "Statistics:\n"
body += "-" * 60 + "\n"
for key, value in stats.items():
body += f" {key}: {value}\n"
body += "\n"
# Add errors if present
if errors:
body += f"Errors ({len(errors)}):\n"
body += "-" * 60 + "\n"
for error in errors[:20]: # Limit to 20 most recent errors
timestamp = error.get("timestamp", "Unknown")
level = error.get("level", "ERROR")
message = error.get("message", "No message")
body += f" [{timestamp}] {level}: {message}\n"
if len(errors) > 20:
body += f" ... and {len(errors) - 20} more errors\n"
body += "\n"
body += "-" * 60 + "\n"
body += "Generated by AlpineBits Server\n"
# Build HTML body for better formatting
html_body = self._build_daily_report_html(date_str, stats, errors)
return await self.send_email(recipients, subject, body, html_body)
def _build_daily_report_html(
self,
date_str: str,
stats: dict[str, Any],
errors: list[dict[str, Any]] | None,
) -> str:
"""Build HTML version of daily report.
Args:
date_str: Date string for the report
stats: Statistics dictionary
errors: Optional list of errors
Returns:
HTML string for the email body
"""
html = f"""
<html>
<head>
<style>
body {{ font-family: Arial, sans-serif; }}
h1 {{ color: #333; }}
h2 {{ color: #666; margin-top: 20px; }}
table {{ border-collapse: collapse; width: 100%; }}
th, td {{ text-align: left; padding: 8px; border-bottom: 1px solid #ddd; }}
th {{ background-color: #f2f2f2; }}
.error {{ color: #d32f2f; }}
.warning {{ color: #f57c00; }}
.footer {{ margin-top: 30px; color: #999; font-size: 12px; }}
</style>
</head>
<body>
<h1>AlpineBits Daily Report</h1>
<p><strong>Date:</strong> {date_str}</p>
"""
# Add statistics table
if stats:
html += """
<h2>Statistics</h2>
<table>
<tr>
<th>Metric</th>
<th>Value</th>
</tr>
"""
for key, value in stats.items():
html += f"""
<tr>
<td>{key}</td>
<td>{value}</td>
</tr>
"""
html += "</table>"
# Add errors table
if errors:
html += f"""
<h2>Errors ({len(errors)})</h2>
<table>
<tr>
<th>Time</th>
<th>Level</th>
<th>Message</th>
</tr>
"""
for error in errors[:20]: # Limit to 20 most recent
timestamp = error.get("timestamp", "Unknown")
level = error.get("level", "ERROR")
message = error.get("message", "No message")
css_class = "error" if level == "ERROR" or level == "CRITICAL" else "warning"
html += f"""
<tr>
<td>{timestamp}</td>
<td class="{css_class}">{level}</td>
<td>{message}</td>
</tr>
"""
if len(errors) > 20:
html += f"""
<tr>
<td colspan="3"><em>... and {len(errors) - 20} more errors</em></td>
</tr>
"""
html += "</table>"
html += """
<div class="footer">
<p>Generated by AlpineBits Server</p>
</div>
</body>
</html>
"""
return html
def create_email_service(config: dict[str, Any]) -> EmailService | None:
"""Create an email service from configuration.
Args:
config: Full application configuration dictionary
Returns:
EmailService instance if email is configured, None otherwise
"""
email_config = config.get("email")
if not email_config:
_LOGGER.info("Email not configured, email service disabled")
return None
try:
email_cfg = EmailConfig(email_config)
service = EmailService(email_cfg)
_LOGGER.info("Email service initialized: %s:%s", email_cfg.smtp_host, email_cfg.smtp_port)
return service
except Exception:
_LOGGER.exception("Failed to initialize email service")
return None

View File

@@ -4,16 +4,32 @@ This module sets up logging based on config and provides a function to get
loggers from anywhere in the application.
"""
import asyncio
import logging
import sys
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from alpine_bits_python.email_monitoring import DailyReportScheduler, EmailAlertHandler
from alpine_bits_python.email_service import EmailService
def setup_logging(config: dict | None = None):
def setup_logging(
config: dict | None = None,
email_service: "EmailService | None" = None,
loop: asyncio.AbstractEventLoop | None = None,
) -> tuple["EmailAlertHandler | None", "DailyReportScheduler | None"]:
"""Configure logging based on application config.
Args:
config: Application configuration dict with optional 'logger' section
email_service: Optional email service for email alerts
loop: Optional asyncio event loop for email alerts
Returns:
Tuple of (email_alert_handler, daily_report_scheduler) if email monitoring
is enabled, otherwise (None, None)
Logger config format:
logger:
@@ -67,6 +83,49 @@ def setup_logging(config: dict | None = None):
root_logger.info("Logging configured at %s level", level)
# Setup email monitoring if configured
email_handler = None
report_scheduler = None
if email_service:
email_config = config.get("email", {})
monitoring_config = email_config.get("monitoring", {})
# Setup error alert handler
error_alerts_config = monitoring_config.get("error_alerts", {})
if error_alerts_config.get("enabled", False):
try:
# Import here to avoid circular dependencies
from alpine_bits_python.email_monitoring import EmailAlertHandler
email_handler = EmailAlertHandler(
email_service=email_service,
config=error_alerts_config,
loop=loop,
)
email_handler.setLevel(logging.ERROR)
root_logger.addHandler(email_handler)
root_logger.info("Email alert handler enabled for error monitoring")
except Exception:
root_logger.exception("Failed to setup email alert handler")
# Setup daily report scheduler
daily_report_config = monitoring_config.get("daily_report", {})
if daily_report_config.get("enabled", False):
try:
# Import here to avoid circular dependencies
from alpine_bits_python.email_monitoring import DailyReportScheduler
report_scheduler = DailyReportScheduler(
email_service=email_service,
config=daily_report_config,
)
root_logger.info("Daily report scheduler configured")
except Exception:
root_logger.exception("Failed to setup daily report scheduler")
return email_handler, report_scheduler
def get_logger(name: str) -> logging.Logger:
"""Get a logger instance for the given module name.

373
tests/test_email_service.py Normal file
View File

@@ -0,0 +1,373 @@
"""Tests for email service and monitoring functionality."""
import asyncio
import logging
from datetime import datetime
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from alpine_bits_python.email_monitoring import (
DailyReportScheduler,
EmailAlertHandler,
ErrorRecord,
)
from alpine_bits_python.email_service import EmailConfig, EmailService
class TestEmailConfig:
"""Tests for EmailConfig class."""
def test_email_config_initialization(self):
"""Test basic email configuration initialization."""
config = {
"smtp": {
"host": "smtp.example.com",
"port": 587,
"username": "test@example.com",
"password": "password123",
"use_tls": True,
"use_ssl": False,
},
"from_address": "sender@example.com",
"from_name": "Test Sender",
}
email_config = EmailConfig(config)
assert email_config.smtp_host == "smtp.example.com"
assert email_config.smtp_port == 587
assert email_config.smtp_username == "test@example.com"
assert email_config.smtp_password == "password123"
assert email_config.use_tls is True
assert email_config.use_ssl is False
assert email_config.from_address == "sender@example.com"
assert email_config.from_name == "Test Sender"
def test_email_config_defaults(self):
"""Test email configuration with default values."""
config = {}
email_config = EmailConfig(config)
assert email_config.smtp_host == "localhost"
assert email_config.smtp_port == 587
assert email_config.use_tls is True
assert email_config.use_ssl is False
assert email_config.from_address == "noreply@example.com"
def test_email_config_tls_ssl_conflict(self):
"""Test that TLS and SSL cannot both be enabled."""
config = {
"smtp": {
"use_tls": True,
"use_ssl": True,
}
}
with pytest.raises(ValueError, match="Cannot use both TLS and SSL"):
EmailConfig(config)
class TestEmailService:
"""Tests for EmailService class."""
@pytest.fixture
def email_config(self):
"""Provide a test email configuration."""
return EmailConfig(
{
"smtp": {
"host": "smtp.example.com",
"port": 587,
"username": "test@example.com",
"password": "password123",
"use_tls": True,
},
"from_address": "sender@example.com",
"from_name": "Test Sender",
}
)
@pytest.fixture
def email_service(self, email_config):
"""Provide an EmailService instance."""
return EmailService(email_config)
@pytest.mark.asyncio
async def test_send_email_success(self, email_service):
"""Test successful email sending."""
with patch.object(email_service, "_send_smtp") as mock_smtp:
result = await email_service.send_email(
recipients=["test@example.com"],
subject="Test Subject",
body="Test body",
)
assert result is True
assert mock_smtp.called
@pytest.mark.asyncio
async def test_send_email_no_recipients(self, email_service):
"""Test email sending with no recipients."""
result = await email_service.send_email(
recipients=[],
subject="Test Subject",
body="Test body",
)
assert result is False
@pytest.mark.asyncio
async def test_send_email_with_html(self, email_service):
"""Test email sending with HTML body."""
with patch.object(email_service, "_send_smtp") as mock_smtp:
result = await email_service.send_email(
recipients=["test@example.com"],
subject="Test Subject",
body="Plain text body",
html_body="<html><body>HTML body</body></html>",
)
assert result is True
assert mock_smtp.called
@pytest.mark.asyncio
async def test_send_alert(self, email_service):
"""Test send_alert convenience method."""
with patch.object(email_service, "send_email", new_callable=AsyncMock) as mock_send:
mock_send.return_value = True
result = await email_service.send_alert(
recipients=["test@example.com"],
subject="Alert Subject",
body="Alert body",
)
assert result is True
mock_send.assert_called_once_with(
["test@example.com"], "Alert Subject", "Alert body"
)
@pytest.mark.asyncio
async def test_send_daily_report(self, email_service):
"""Test daily report email generation and sending."""
with patch.object(email_service, "send_email", new_callable=AsyncMock) as mock_send:
mock_send.return_value = True
stats = {
"total_reservations": 42,
"new_customers": 15,
}
errors = [
{
"timestamp": "2025-10-15 10:30:00",
"level": "ERROR",
"message": "Test error message",
}
]
result = await email_service.send_daily_report(
recipients=["admin@example.com"],
stats=stats,
errors=errors,
)
assert result is True
assert mock_send.called
call_args = mock_send.call_args
assert "admin@example.com" in call_args[0][0]
assert "Daily Report" in call_args[0][1]
class TestErrorRecord:
"""Tests for ErrorRecord class."""
def test_error_record_creation(self):
"""Test creating an ErrorRecord from a logging record."""
log_record = logging.LogRecord(
name="test.logger",
level=logging.ERROR,
pathname="/path/to/file.py",
lineno=42,
msg="Test error message",
args=(),
exc_info=None,
)
error_record = ErrorRecord(log_record)
assert error_record.level == "ERROR"
assert error_record.logger_name == "test.logger"
assert error_record.message == "Test error message"
assert error_record.module == "file"
assert error_record.line_no == 42
def test_error_record_to_dict(self):
"""Test converting ErrorRecord to dictionary."""
log_record = logging.LogRecord(
name="test.logger",
level=logging.ERROR,
pathname="/path/to/file.py",
lineno=42,
msg="Test error",
args=(),
exc_info=None,
)
error_record = ErrorRecord(log_record)
error_dict = error_record.to_dict()
assert error_dict["level"] == "ERROR"
assert error_dict["message"] == "Test error"
assert error_dict["line_no"] == 42
assert "timestamp" in error_dict
def test_error_record_format_plain_text(self):
"""Test formatting ErrorRecord as plain text."""
log_record = logging.LogRecord(
name="test.logger",
level=logging.ERROR,
pathname="/path/to/file.py",
lineno=42,
msg="Test error",
args=(),
exc_info=None,
)
error_record = ErrorRecord(log_record)
formatted = error_record.format_plain_text()
assert "ERROR" in formatted
assert "Test error" in formatted
assert "file:42" in formatted
class TestEmailAlertHandler:
"""Tests for EmailAlertHandler class."""
@pytest.fixture
def mock_email_service(self):
"""Provide a mock email service."""
service = MagicMock(spec=EmailService)
service.send_alert = AsyncMock(return_value=True)
return service
@pytest.fixture
def handler_config(self):
"""Provide handler configuration."""
return {
"recipients": ["alert@example.com"],
"error_threshold": 3,
"buffer_minutes": 1, # Short for testing
"cooldown_minutes": 5,
"log_levels": ["ERROR", "CRITICAL"],
}
@pytest.fixture
def alert_handler(self, mock_email_service, handler_config):
"""Provide an EmailAlertHandler instance."""
loop = asyncio.new_event_loop()
handler = EmailAlertHandler(mock_email_service, handler_config, loop)
yield handler
loop.close()
def test_handler_initialization(self, alert_handler, handler_config):
"""Test handler initialization."""
assert alert_handler.error_threshold == 3
assert alert_handler.buffer_minutes == 1
assert alert_handler.cooldown_minutes == 5
assert alert_handler.recipients == ["alert@example.com"]
def test_handler_emit_below_threshold(self, alert_handler):
"""Test emitting errors below threshold."""
log_record = logging.LogRecord(
name="test",
level=logging.ERROR,
pathname="/test.py",
lineno=1,
msg="Error 1",
args=(),
exc_info=None,
)
# Emit 2 errors (below threshold of 3)
alert_handler.emit(log_record)
alert_handler.emit(log_record)
# Should buffer, not send immediately
assert len(alert_handler.error_buffer) == 2
def test_handler_ignores_non_error_levels(self, alert_handler):
"""Test that handler ignores INFO and WARNING levels."""
log_record = logging.LogRecord(
name="test",
level=logging.INFO,
pathname="/test.py",
lineno=1,
msg="Info message",
args=(),
exc_info=None,
)
alert_handler.emit(log_record)
# Should not buffer INFO messages
assert len(alert_handler.error_buffer) == 0
class TestDailyReportScheduler:
"""Tests for DailyReportScheduler class."""
@pytest.fixture
def mock_email_service(self):
"""Provide a mock email service."""
service = MagicMock(spec=EmailService)
service.send_daily_report = AsyncMock(return_value=True)
return service
@pytest.fixture
def scheduler_config(self):
"""Provide scheduler configuration."""
return {
"recipients": ["report@example.com"],
"send_time": "08:00",
"include_stats": True,
"include_errors": True,
}
@pytest.fixture
def scheduler(self, mock_email_service, scheduler_config):
"""Provide a DailyReportScheduler instance."""
return DailyReportScheduler(mock_email_service, scheduler_config)
def test_scheduler_initialization(self, scheduler, scheduler_config):
"""Test scheduler initialization."""
assert scheduler.send_time == "08:00"
assert scheduler.recipients == ["report@example.com"]
assert scheduler.include_stats is True
assert scheduler.include_errors is True
def test_scheduler_log_error(self, scheduler):
"""Test logging errors for daily report."""
error = {
"timestamp": datetime.now().isoformat(),
"level": "ERROR",
"message": "Test error",
}
scheduler.log_error(error)
assert len(scheduler._error_log) == 1
assert scheduler._error_log[0]["message"] == "Test error"
def test_scheduler_set_stats_collector(self, scheduler):
"""Test setting stats collector function."""
async def mock_collector():
return {"test": "stats"}
scheduler.set_stats_collector(mock_collector)
assert scheduler._stats_collector is mock_collector