377 lines
12 KiB
Python
377 lines
12 KiB
Python
import os
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from annotatedyaml.loader import Secrets
|
|
from annotatedyaml.loader import load_yaml as load_annotated_yaml
|
|
from voluptuous import (
|
|
PREVENT_EXTRA,
|
|
All,
|
|
Boolean,
|
|
In,
|
|
Length,
|
|
MultipleInvalid,
|
|
Optional,
|
|
Range,
|
|
Required,
|
|
Schema,
|
|
)
|
|
|
|
from alpine_bits_python.const import (
|
|
CONF_ALPINE_BITS_AUTH,
|
|
CONF_DATABASE,
|
|
CONF_GOOGLE_ACCOUNT,
|
|
CONF_HOTEL_ID,
|
|
CONF_HOTEL_NAME,
|
|
CONF_LOGGING,
|
|
CONF_LOGGING_FILE,
|
|
CONF_LOGGING_LEVEL,
|
|
CONF_META_ACCOUNT,
|
|
CONF_PASSWORD,
|
|
CONF_PUSH_ENDPOINT,
|
|
CONF_PUSH_TOKEN,
|
|
CONF_PUSH_URL,
|
|
CONF_PUSH_USERNAME,
|
|
CONF_SERVER,
|
|
CONF_SERVER_CODE,
|
|
CONF_SERVER_CODECONTEXT,
|
|
CONF_SERVER_COMPANYNAME,
|
|
CONF_SERVER_RES_ID_SOURCE_CONTEXT,
|
|
CONF_USERNAME,
|
|
ENV_ALPINE_BITS_CONFIG_PATH,
|
|
)
|
|
|
|
# --- Voluptuous schemas ---
|
|
database_schema = Schema(
|
|
{Required("url"): str, Optional("schema"): str}, extra=PREVENT_EXTRA
|
|
)
|
|
|
|
|
|
logger_schema = Schema(
|
|
{
|
|
Required(CONF_LOGGING_LEVEL, default="INFO"): str,
|
|
Optional(CONF_LOGGING_FILE): str, # If not provided, log to console
|
|
},
|
|
extra=PREVENT_EXTRA,
|
|
)
|
|
|
|
|
|
def ensure_string(value):
|
|
"""Ensure the value is a string."""
|
|
if isinstance(value, str):
|
|
return value
|
|
return str(value)
|
|
|
|
|
|
server_info = Schema(
|
|
{
|
|
Required(CONF_SERVER_CODECONTEXT, default="ADVERTISING"): ensure_string,
|
|
Required(CONF_SERVER_CODE, default="70597314"): ensure_string,
|
|
Required(CONF_SERVER_COMPANYNAME, default="99tales Gmbh"): ensure_string,
|
|
Required(CONF_SERVER_RES_ID_SOURCE_CONTEXT, default="99tales"): ensure_string,
|
|
}
|
|
)
|
|
|
|
|
|
hotel_auth_schema = Schema(
|
|
{
|
|
Required(CONF_HOTEL_ID): ensure_string,
|
|
Required(CONF_HOTEL_NAME): str,
|
|
Required(CONF_USERNAME): str,
|
|
Required(CONF_PASSWORD): str,
|
|
Optional(CONF_META_ACCOUNT): str,
|
|
Optional(CONF_GOOGLE_ACCOUNT): str,
|
|
Optional(CONF_PUSH_ENDPOINT): {
|
|
Required(CONF_PUSH_URL): str,
|
|
Required(CONF_PUSH_TOKEN): str,
|
|
Optional(CONF_PUSH_USERNAME): str,
|
|
},
|
|
},
|
|
extra=PREVENT_EXTRA,
|
|
)
|
|
|
|
basic_auth_schema = Schema(All([hotel_auth_schema], Length(min=1)))
|
|
|
|
# Email SMTP configuration schema
|
|
smtp_schema = Schema(
|
|
{
|
|
Required("host", default="localhost"): str,
|
|
Required("port", default=587): Range(min=1, max=65535),
|
|
Optional("username"): str,
|
|
Optional("password"): str,
|
|
Required("use_tls", default=True): Boolean(),
|
|
Required("use_ssl", default=False): Boolean(),
|
|
},
|
|
extra=PREVENT_EXTRA,
|
|
)
|
|
|
|
# Email daily report configuration schema
|
|
daily_report_schema = Schema(
|
|
{
|
|
Required("enabled", default=False): Boolean(),
|
|
Optional("recipients", default=[]): [str],
|
|
Required("send_time", default="08:00"): str,
|
|
Required("include_stats", default=True): Boolean(),
|
|
Required("include_errors", default=True): Boolean(),
|
|
},
|
|
extra=PREVENT_EXTRA,
|
|
)
|
|
|
|
# Email error alerts configuration schema
|
|
error_alerts_schema = Schema(
|
|
{
|
|
Required("enabled", default=False): Boolean(),
|
|
Optional("recipients", default=[]): [str],
|
|
Required("error_threshold", default=5): Range(min=1),
|
|
Required("buffer_minutes", default=15): Range(min=1),
|
|
Required("cooldown_minutes", default=15): Range(min=0),
|
|
Required("log_levels", default=["ERROR", "CRITICAL"]): [
|
|
In(["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"])
|
|
],
|
|
},
|
|
extra=PREVENT_EXTRA,
|
|
)
|
|
|
|
# Email monitoring configuration schema
|
|
monitoring_schema = Schema(
|
|
{
|
|
Optional("daily_report", default={}): daily_report_schema,
|
|
Optional("error_alerts", default={}): error_alerts_schema,
|
|
},
|
|
extra=PREVENT_EXTRA,
|
|
)
|
|
|
|
# Complete email configuration schema
|
|
email_schema = Schema(
|
|
{
|
|
Optional("smtp", default={}): smtp_schema,
|
|
Required("from_address", default="noreply@example.com"): str,
|
|
Required("from_name", default="AlpineBits Server"): str,
|
|
Optional("timeout", default=10): Range(min=1, max=300),
|
|
Optional("monitoring", default={}): monitoring_schema,
|
|
},
|
|
extra=PREVENT_EXTRA,
|
|
)
|
|
|
|
# Pushover daily report configuration schema
|
|
pushover_daily_report_schema = Schema(
|
|
{
|
|
Required("enabled", default=False): Boolean(),
|
|
Required("send_time", default="08:00"): str,
|
|
Required("include_stats", default=True): Boolean(),
|
|
Required("include_errors", default=True): Boolean(),
|
|
Required("priority", default=0): Range(
|
|
min=-2, max=2
|
|
), # Pushover priority levels
|
|
},
|
|
extra=PREVENT_EXTRA,
|
|
)
|
|
|
|
# Pushover error alerts configuration schema
|
|
pushover_error_alerts_schema = Schema(
|
|
{
|
|
Required("enabled", default=False): Boolean(),
|
|
Required("error_threshold", default=5): Range(min=1),
|
|
Required("buffer_minutes", default=15): Range(min=1),
|
|
Required("cooldown_minutes", default=15): Range(min=0),
|
|
Required("log_levels", default=["ERROR", "CRITICAL"]): [
|
|
In(["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"])
|
|
],
|
|
Required("priority", default=1): Range(
|
|
min=-2, max=2
|
|
), # Pushover priority levels
|
|
},
|
|
extra=PREVENT_EXTRA,
|
|
)
|
|
|
|
# Pushover monitoring configuration schema
|
|
pushover_monitoring_schema = Schema(
|
|
{
|
|
Optional("daily_report", default={}): pushover_daily_report_schema,
|
|
Optional("error_alerts", default={}): pushover_error_alerts_schema,
|
|
},
|
|
extra=PREVENT_EXTRA,
|
|
)
|
|
|
|
# Complete pushover configuration schema
|
|
pushover_schema = Schema(
|
|
{
|
|
Optional("user_key"): str, # Optional but required for pushover to work
|
|
Optional("api_token"): str, # Optional but required for pushover to work
|
|
Optional("monitoring", default={}): pushover_monitoring_schema,
|
|
},
|
|
extra=PREVENT_EXTRA,
|
|
)
|
|
|
|
# Unified notification method schema
|
|
notification_method_schema = Schema(
|
|
{
|
|
Required("type"): In(["email", "pushover"]),
|
|
Optional("address"): str, # For email
|
|
Optional("priority"): Range(min=-2, max=2), # For pushover
|
|
},
|
|
extra=PREVENT_EXTRA,
|
|
)
|
|
|
|
# Unified notification recipient schema
|
|
notification_recipient_schema = Schema(
|
|
{
|
|
Required("name"): str,
|
|
Required("methods"): [notification_method_schema],
|
|
},
|
|
extra=PREVENT_EXTRA,
|
|
)
|
|
|
|
# Unified daily report configuration schema (without recipients)
|
|
unified_daily_report_schema = Schema(
|
|
{
|
|
Required("enabled", default=False): Boolean(),
|
|
Required("send_time", default="08:00"): str,
|
|
Required("include_stats", default=True): Boolean(),
|
|
Required("include_errors", default=True): Boolean(),
|
|
},
|
|
extra=PREVENT_EXTRA,
|
|
)
|
|
|
|
# Unified error alerts configuration schema (without recipients)
|
|
unified_error_alerts_schema = Schema(
|
|
{
|
|
Required("enabled", default=False): Boolean(),
|
|
Required("error_threshold", default=5): Range(min=1),
|
|
Required("buffer_minutes", default=15): Range(min=1),
|
|
Required("cooldown_minutes", default=15): Range(min=0),
|
|
Required("log_levels", default=["ERROR", "CRITICAL"]): [
|
|
In(["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"])
|
|
],
|
|
},
|
|
extra=PREVENT_EXTRA,
|
|
)
|
|
|
|
# Unified notifications configuration schema
|
|
notifications_schema = Schema(
|
|
{
|
|
Required("recipients", default=[]): [notification_recipient_schema],
|
|
Optional("daily_report", default={}): unified_daily_report_schema,
|
|
Optional("error_alerts", default={}): unified_error_alerts_schema,
|
|
},
|
|
extra=PREVENT_EXTRA,
|
|
)
|
|
|
|
config_schema = Schema(
|
|
{
|
|
Required(CONF_DATABASE): database_schema,
|
|
Required(CONF_ALPINE_BITS_AUTH): basic_auth_schema,
|
|
Required(CONF_SERVER): server_info,
|
|
Required(CONF_LOGGING): logger_schema,
|
|
Optional("email"): email_schema, # Email is optional (service config only)
|
|
Optional(
|
|
"pushover"
|
|
): pushover_schema, # Pushover is optional (service config only)
|
|
Optional("notifications"): notifications_schema, # Unified notification config
|
|
Optional("api_tokens", default=[]): [str], # API tokens for bearer auth
|
|
},
|
|
extra=PREVENT_EXTRA,
|
|
)
|
|
|
|
DEFAULT_CONFIG_FILE = "config.yaml"
|
|
|
|
|
|
class Config:
|
|
"""Class to load and hold the configuration."""
|
|
|
|
def __init__(
|
|
self,
|
|
config_folder: str | Path | None = None,
|
|
config_name: str = DEFAULT_CONFIG_FILE,
|
|
testing_mode: bool = False,
|
|
):
|
|
if config_folder is None:
|
|
config_folder = os.environ.get(ENV_ALPINE_BITS_CONFIG_PATH)
|
|
if not config_folder:
|
|
config_folder = Path(__file__).parent.joinpath("../../config").resolve()
|
|
if isinstance(config_folder, str):
|
|
config_folder = Path(config_folder)
|
|
self.config_folder = config_folder
|
|
self.config_path = config_folder / config_name
|
|
self.secrets = Secrets(config_folder)
|
|
self.testing_mode = testing_mode
|
|
self._load_config()
|
|
|
|
def _load_config(self):
|
|
stuff = load_annotated_yaml(self.config_path, secrets=self.secrets)
|
|
try:
|
|
validated = config_schema(stuff)
|
|
except MultipleInvalid as e:
|
|
raise ValueError(f"Config validation error: {e}")
|
|
self.database = validated["database"]
|
|
self.basic_auth = validated["alpine_bits_auth"]
|
|
self.config = validated
|
|
|
|
def get(self, key, default=None):
|
|
return self.config.get(key, default)
|
|
|
|
@property
|
|
def db_url(self) -> str:
|
|
return self.database["url"]
|
|
|
|
@property
|
|
def hotel_id(self) -> str:
|
|
return self.basic_auth["hotel_id"]
|
|
|
|
@property
|
|
def hotel_name(self) -> str:
|
|
return self.basic_auth["hotel_name"]
|
|
|
|
@property
|
|
def users(self) -> list[dict[str, str]]:
|
|
return self.basic_auth["users"]
|
|
|
|
|
|
# For backward compatibility
|
|
def load_config():
|
|
return Config().config
|
|
|
|
|
|
def get_username_for_hotel(config: dict, hotel_code: str) -> str:
|
|
"""Get the username associated with a hotel_code from config."""
|
|
return next(h.get("username") for h in config.get("alpine_bits_auth", []) if h.get("hotel_id") == hotel_code)
|
|
|
|
|
|
def get_advertising_account_ids(
|
|
config: dict[str, Any], hotel_code: str, fbclid: str | None, gclid: str | None
|
|
) -> tuple[str | None, str | None]:
|
|
"""Get advertising account IDs based on hotel config and click IDs.
|
|
|
|
Args:
|
|
config: Application configuration dict
|
|
hotel_code: Hotel identifier to look up in config
|
|
fbclid: Facebook click ID (if present, meta_account_id will be returned)
|
|
gclid: Google click ID (if present, google_account_id will be returned)
|
|
|
|
Returns:
|
|
Tuple of (meta_account_id, google_account_id) based on conditional logic:
|
|
- meta_account_id is set only if fbclid is present AND hotel has
|
|
meta_account configured
|
|
- google_account_id is set only if gclid is present AND hotel has
|
|
google_account configured
|
|
|
|
"""
|
|
meta_account_id = None
|
|
google_account_id = None
|
|
|
|
# Look up hotel in config
|
|
alpine_bits_auth = config.get("alpine_bits_auth", [])
|
|
for hotel in alpine_bits_auth:
|
|
if hotel.get(CONF_HOTEL_ID) == hotel_code:
|
|
# Conditionally set meta_account_id if fbclid is present
|
|
if fbclid:
|
|
meta_account_id = hotel.get(CONF_META_ACCOUNT)
|
|
|
|
# Conditionally set google_account_id if gclid is present
|
|
if gclid:
|
|
google_account_id = hotel.get(CONF_GOOGLE_ACCOUNT)
|
|
|
|
break
|
|
|
|
return meta_account_id, google_account_id
|