import os from pathlib import Path from annotatedyaml.loader import Secrets from annotatedyaml.loader import load_yaml as load_annotated_yaml from voluptuous import ( PREVENT_EXTRA, All, Boolean, In, Length, MultipleInvalid, Optional, Range, Required, Schema, ) from alpine_bits_python.const import ( CONF_ALPINE_BITS_AUTH, CONF_DATABASE, CONF_HOTEL_ID, CONF_HOTEL_NAME, CONF_LOGGING, CONF_LOGGING_FILE, CONF_LOGGING_LEVEL, CONF_PASSWORD, CONF_PUSH_ENDPOINT, CONF_PUSH_TOKEN, CONF_PUSH_URL, CONF_PUSH_USERNAME, CONF_SERVER, CONF_SERVER_CODE, CONF_SERVER_CODECONTEXT, CONF_SERVER_COMPANYNAME, CONF_SERVER_RES_ID_SOURCE_CONTEXT, CONF_USERNAME, ENV_ALPINE_BITS_CONFIG_PATH, ) # --- Voluptuous schemas --- database_schema = Schema({Required("url"): str}, extra=PREVENT_EXTRA) logger_schema = Schema( { Required(CONF_LOGGING_LEVEL, default="INFO"): str, Optional(CONF_LOGGING_FILE): str, # If not provided, log to console }, extra=PREVENT_EXTRA, ) def ensure_string(value): """Ensure the value is a string.""" if isinstance(value, str): return value return str(value) server_info = Schema( { Required(CONF_SERVER_CODECONTEXT, default="ADVERTISING"): ensure_string, Required(CONF_SERVER_CODE, default="70597314"): ensure_string, Required(CONF_SERVER_COMPANYNAME, default="99tales Gmbh"): ensure_string, Required(CONF_SERVER_RES_ID_SOURCE_CONTEXT, default="99tales"): ensure_string, } ) hotel_auth_schema = Schema( { Required(CONF_HOTEL_ID): ensure_string, Required(CONF_HOTEL_NAME): str, Required(CONF_USERNAME): str, Required(CONF_PASSWORD): str, Optional(CONF_PUSH_ENDPOINT): { Required(CONF_PUSH_URL): str, Required(CONF_PUSH_TOKEN): str, Optional(CONF_PUSH_USERNAME): str, }, }, extra=PREVENT_EXTRA, ) basic_auth_schema = Schema(All([hotel_auth_schema], Length(min=1))) # Email SMTP configuration schema smtp_schema = Schema( { Required("host", default="localhost"): str, Required("port", default=587): Range(min=1, max=65535), Optional("username"): str, Optional("password"): str, Required("use_tls", default=True): Boolean(), Required("use_ssl", default=False): Boolean(), }, extra=PREVENT_EXTRA, ) # Email daily report configuration schema daily_report_schema = Schema( { Required("enabled", default=False): Boolean(), Optional("recipients", default=[]): [str], Required("send_time", default="08:00"): str, Required("include_stats", default=True): Boolean(), Required("include_errors", default=True): Boolean(), }, extra=PREVENT_EXTRA, ) # Email error alerts configuration schema error_alerts_schema = Schema( { Required("enabled", default=False): Boolean(), Optional("recipients", default=[]): [str], Required("error_threshold", default=5): Range(min=1), Required("buffer_minutes", default=15): Range(min=1), Required("cooldown_minutes", default=15): Range(min=0), Required("log_levels", default=["ERROR", "CRITICAL"]): [ In(["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]) ], }, extra=PREVENT_EXTRA, ) # Email monitoring configuration schema monitoring_schema = Schema( { Optional("daily_report", default={}): daily_report_schema, Optional("error_alerts", default={}): error_alerts_schema, }, extra=PREVENT_EXTRA, ) # Complete email configuration schema email_schema = Schema( { Optional("smtp", default={}): smtp_schema, Required("from_address", default="noreply@example.com"): str, Required("from_name", default="AlpineBits Server"): str, Optional("timeout", default=10): Range(min=1, max=300), Optional("monitoring", default={}): monitoring_schema, }, extra=PREVENT_EXTRA, ) # Pushover daily report configuration schema pushover_daily_report_schema = Schema( { Required("enabled", default=False): Boolean(), Required("send_time", default="08:00"): str, Required("include_stats", default=True): Boolean(), Required("include_errors", default=True): Boolean(), Required("priority", default=0): Range(min=-2, max=2), # Pushover priority levels }, extra=PREVENT_EXTRA, ) # Pushover error alerts configuration schema pushover_error_alerts_schema = Schema( { Required("enabled", default=False): Boolean(), Required("error_threshold", default=5): Range(min=1), Required("buffer_minutes", default=15): Range(min=1), Required("cooldown_minutes", default=15): Range(min=0), Required("log_levels", default=["ERROR", "CRITICAL"]): [ In(["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]) ], Required("priority", default=1): Range(min=-2, max=2), # Pushover priority levels }, extra=PREVENT_EXTRA, ) # Pushover monitoring configuration schema pushover_monitoring_schema = Schema( { Optional("daily_report", default={}): pushover_daily_report_schema, Optional("error_alerts", default={}): pushover_error_alerts_schema, }, extra=PREVENT_EXTRA, ) # Complete pushover configuration schema pushover_schema = Schema( { Optional("user_key"): str, # Optional but required for pushover to work Optional("api_token"): str, # Optional but required for pushover to work Optional("monitoring", default={}): pushover_monitoring_schema, }, extra=PREVENT_EXTRA, ) config_schema = Schema( { Required(CONF_DATABASE): database_schema, Required(CONF_ALPINE_BITS_AUTH): basic_auth_schema, Required(CONF_SERVER): server_info, Required(CONF_LOGGING): logger_schema, Optional("email"): email_schema, # Email is optional Optional("pushover"): pushover_schema, # Pushover is optional Optional("api_tokens", default=[]): [str], # API tokens for bearer auth }, extra=PREVENT_EXTRA, ) DEFAULT_CONFIG_FILE = "config.yaml" class Config: """Class to load and hold the configuration.""" def __init__( self, config_folder: str | Path | None = None, config_name: str = DEFAULT_CONFIG_FILE, testing_mode: bool = False, ): if config_folder is None: config_folder = os.environ.get(ENV_ALPINE_BITS_CONFIG_PATH) if not config_folder: config_folder = Path(__file__).parent.joinpath("../../config").resolve() if isinstance(config_folder, str): config_folder = Path(config_folder) self.config_folder = config_folder self.config_path = config_folder / config_name self.secrets = Secrets(config_folder) self.testing_mode = testing_mode self._load_config() def _load_config(self): stuff = load_annotated_yaml(self.config_path, secrets=self.secrets) try: validated = config_schema(stuff) except MultipleInvalid as e: raise ValueError(f"Config validation error: {e}") self.database = validated["database"] self.basic_auth = validated["alpine_bits_auth"] self.config = validated def get(self, key, default=None): return self.config.get(key, default) @property def db_url(self) -> str: return self.database["url"] @property def hotel_id(self) -> str: return self.basic_auth["hotel_id"] @property def hotel_name(self) -> str: return self.basic_auth["hotel_name"] @property def users(self) -> list[dict[str, str]]: return self.basic_auth["users"] # For backward compatibility def load_config(): return Config().config