Source code for WDL.runtime.config

# Portions copied from AirflowConfigParser --
#  https://github.com/apache/airflow/blob/master/airflow/configuration.py
# Exposition --
#  https://medium.com/@tszumowski/delightful-designs-airflows-configuration-parser-1ef1a6b3d03c


import os
import configparser
import logging
import json
from fnmatch import fnmatchcase
from typing import Optional, List, Dict, Any, Callable, TypeVar, Set, Tuple, Iterable
from xdg import XDG_CONFIG_DIRS, XDG_CONFIG_HOME
from .._util import StructuredLogMessage as _

_T = TypeVar("_T")


[docs]class ConfigMissing(Exception): # configparser.No{Option,Section}Error have vague hard-coded error messages pass
class Section: _section: str _parent: "Loader" def __init__(self, parent: "Loader", section: str): self._parent = parent self._section = section def get(self, key: str, default: Optional[str] = None) -> str: return self._parent.get(self._section, key, default) def __getitem__(self, key: str) -> str: return self.get(key) def get_int(self, key: str, default: Optional[int] = None) -> int: return self._parent.get_int(self._section, key, default) def get_float(self, key: str, default: Optional[float] = None) -> float: return self._parent.get_float(self._section, key, default) def get_bool(self, key: str, default: Optional[bool] = None) -> bool: return self._parent.get_bool(self._section, key, default) def get_dict(self, key: str, default: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: return self._parent.get_dict(self._section, key, default) def get_list(self, key: str, default: Optional[List[Any]] = None) -> List[Any]: return self._parent.get_list(self._section, key, default)
[docs]class Loader: """ Runtime configuration options, identified by section & key, are sourced in the following priority order: 1. Supplied ``overrides`` dict, ``{"section": {"key": value}}`` 2. Environment variables ``MINIWDL__SECTION__KEY`` (uppercased with double-underscores) 3. Custom configuration file (mutually exclusive): a. filename given to ``__init__`` b. file named by environment variable ``MINIWDL_CFG`` c. ``miniwdl.cfg`` in XDG_CONFIG_HOME or XDG_CONFIG_DIRS, usually ``~/.config/miniwdl.cfg`` 4. ``WDL/runtime/config_templates/default.cfg`` from installed package If ``filenames`` is an empty list, then no custom configuration file is used. Or ``filenames`` may be a prioritized list of candidate filenames instead of (a-c) above; the first extant file is used, if any. """ _logger: logging.Logger _defaults: configparser.ConfigParser _options: configparser.ConfigParser _overrides: configparser.ConfigParser _used: Set[Tuple[str, str]] _used_env: Set[str] cfg_filename: Optional[str] = None def __init__( self, logger: logging.Logger, filenames: Optional[List[str]] = None, overrides: Optional[Dict[str, Dict[str, str]]] = None, ): self._logger = logger self._used = set() self._used_env = set(["MINIWDL_CFG"]) common_kws = {} self._defaults = configparser.ConfigParser(**common_kws) self._options = configparser.ConfigParser(**common_kws) self._overrides = configparser.ConfigParser(**common_kws) # load default.cfg default_cfg = os.path.join(os.path.dirname(__file__), "config_templates", "default.cfg") self._logger.debug(_("read configuration defaults", filename=default_cfg)) self._defaults.read(default_cfg) # load miniwdl.cfg, if any if filenames is None: if "MINIWDL_CFG" in os.environ: filenames = os.environ["MINIWDL_CFG"].split(":") else: filenames = [ str(dn.joinpath("miniwdl.cfg")) for dn in reversed(XDG_CONFIG_DIRS + [XDG_CONFIG_HOME]) ] if filenames: self._logger.debug(_("searching for configuration files", filenames=filenames)) filenames = [fn for fn in filenames if os.path.isfile(fn)] filenames = filenames[:1] if filenames: self._logger.info(_("read configuration file", path=filenames[0])) self._options.read(filenames) self.cfg_filename = filenames[0] else: self._logger.info("no configuration file found") # load overrides if overrides: self.override(overrides) def override(self, options: Dict[str, Dict[str, Any]]) -> None: options2 = {} for section in options: if options[section]: options2[section] = {} for key in options[section]: v = options[section][key] if isinstance(v, (list, dict, bool)) or v is None: options2[section][key] = json.dumps(v) else: options2[section][key] = str(v) if options2: self._logger.debug(_("applying configuration overrides", **options2)) self._overrides.read_dict(options2)
[docs] def plugin_defaults(self, options: Dict[str, Dict[str, Any]]) -> None: """ Last-priority default settings; typically added by plugins (whose options won't be represented in default.cfg). (DEPRECATED) """ options2 = {} for section in options: for key in options[section]: if not self._defaults.has_option(section, key): options2section = options2.setdefault(section, {}) v = options[section][key] if isinstance(v, (list, dict, bool)) or v is None: options2section[key] = json.dumps(v) else: options2section[key] = str(v) else: self._logger.warning( _( "ignored plugin's attempt to change existing default configuration option", section=section, option=key, ) ) if options2: self._logger.debug(_("applying plugin configuration defaults", **options2)) self._defaults.read_dict(options2)
def get(self, section: str, key: str, default: Optional[str] = None) -> str: section = str(section).lower() key = str(key).lower() common_kws = {} ans = None # first check overrides if self._overrides.has_option(section, key): ans = self._overrides.get(section, key, **common_kws) else: # ...then environment env_key = _env_var_name(section, key) if env_key in os.environ: ans = os.environ[env_key] self._used_env.add(env_key) # ...then the config file elif self._options.has_option(section, key): ans = self._options.get(section, key, **common_kws) # ...then the default config elif self._defaults.has_option(section, key): ans = self._defaults.get(section, key, **common_kws) if ans is None: if default is not None: return default if not self.has_section(section): raise ConfigMissing(f"missing config section [{section}]") raise ConfigMissing(f"missing config option [{section}] {key}") self._used.add((section, key)) ans = _strip(ans) if (section, key) not in [("task_runtime", "placeholder_regex")]: # expand environment variables unless in a hard-coded list of exceptions ans = _expand_env_var(ans) return ans def has_section(self, section: str) -> bool: return ( self._defaults.has_section(section) or self._options.has_section(section) or self._overrides.has_section(section) ) def has_option(self, section: str, option: str) -> bool: try: self.get(section, option) return True except ConfigMissing: return False def __getitem__(self, section: str) -> Section: return Section(self, section) def _parse( self, section: str, key: str, ty: str, parse: Callable[[str], _T], default: Optional[_T] = None, ) -> _T: try: ans = self.get(section, key) except ConfigMissing: if default is not None: return default raise try: return parse(ans) except: self._logger.debug( _( "failed to parse configuration option", section=section, key=key, value=ans, expected_type=ty, ) ) raise ValueError(f"configuration option [{section}] {key} should be {ty}") def get_int(self, section: str, key: str, default: Optional[int] = None) -> int: return self._parse(section, key, "int", int, default) def get_float(self, section: str, key: str, default: Optional[float] = None) -> float: return self._parse(section, key, "float", float, default) def get_bool(self, section: str, key: str, default: Optional[bool] = None) -> bool: return self._parse(section, key, "bool", _parse_bool, default) def get_dict( self, section: str, key: str, default: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: return self._parse(section, key, "JSON dict", _parse_dict, default) def get_list(self, section: str, key: str, default: Optional[List[Any]] = None) -> List[Any]: return self._parse(section, key, "JSON list", _parse_list, default) def get_all(self, defaults=True): options = set() for section in self._overrides.sections(): options |= set((section, key) for key in self._overrides.options(section)) for section in self._options.sections(): options |= set((section, key) for key in self._options.options(section)) for section in self._defaults.sections(): options |= set((section, key) for key in self._defaults.options(section)) all = {} for section, key in options: value = self.get(section, key) default_value = None if not defaults and self._defaults.has_option(section, key): default_value = self._defaults.get(section, key) default_value = _expand_env_var(_strip(default_value)) if defaults or value != default_value: all[section] = all.get(section, dict()) all[section][key] = value return all
[docs] def log_all(self): """ Write a debug log message with all options """ self._logger.debug(_("configuration", **self.get_all()))
[docs] def log_unused_options(self): """ After a run, log warnings about any options that were set, but were never accessed and don't correspond to any default option. """ options = set() for section in self._overrides.sections(): options |= set((section, key) for key in self._overrides.options(section)) for section in self._options.sections(): options |= set((section, key) for key in self._options.options(section)) known_options = set() for section in self._defaults.sections(): known_options |= set((section, key) for key in self._defaults.options(section)) unused = options - self._used - known_options if unused: self._logger.warning( _("unused configuration", options=[f"{section}:{key}" for (section, key) in unused]) ) evs = set(k for k in os.environ if k.upper().startswith("MINIWDL_")) ev_unused = evs - self._used_env if ev_unused: self._logger.warning(_("unused environment", variables=list(ev_unused)))
def _strip(value): ans = value if ans: ans = value.strip() if len(ans) >= 2 and ( ( ans.startswith("'") and ans.endswith("'") or (ans.startswith('"') and ans.endswith('"')) ) ): ans = ans[1:-1] return ans def _expand_env_var(env_var: str) -> str: """ Expands (potentially nested) env vars by repeatedly applying `expandvars` and `expanduser` until interpolation stops having any effect. """ interpolated = os.path.expanduser(os.path.expandvars(str(env_var))) if interpolated == env_var: return interpolated return _expand_env_var(interpolated) def _env_var_name(section: str, key: str) -> str: return f"MINIWDL__{section.upper()}__{key.upper()}" def _parse_bool(v: str) -> bool: v = v.lower() if v in ("t", "y", "1", "true", "yes"): return True if v in ("f", "n", "0", "false", "no"): return False assert False def _parse_dict(v: str) -> Dict[str, Any]: ans = json.loads(v) assert isinstance(ans, dict) return ans def _parse_list(v: str) -> List[Any]: if not v.startswith("["): return [v] ans = json.loads(v) assert isinstance(ans, list) return ans def default_plugins() -> "Dict[str,List[importlib_metadata.EntryPoint]]": import importlib_metadata # delayed heavy import return { "file_download": [ importlib_metadata.EntryPoint( group="miniwdl.plugin.file_download", name="s3", value="WDL.runtime.download:awscli_downloader", ), importlib_metadata.EntryPoint( group="miniwdl.plugin.file_download", name="gs", value="WDL.runtime.download:gsutil_downloader", ), ], "directory_download": [ importlib_metadata.EntryPoint( group="miniwdl.plugin.directory_download", name="s3", value="WDL.runtime.download:awscli_directory_downloader", ), importlib_metadata.EntryPoint( group="miniwdl.plugin.directory_download", name="gs", value="WDL.runtime.download:gsutil_directory_downloader", ), ], "task": [], "workflow": [], "container_backend": [ importlib_metadata.EntryPoint( group="miniwdl.plugin.container_backend", name="docker_swarm", value="WDL.runtime.backend.docker_swarm:SwarmContainer", ), importlib_metadata.EntryPoint( group="miniwdl.plugin.container_backend", name="singularity", value="WDL.runtime.backend.singularity:SingularityContainer", ), importlib_metadata.EntryPoint( group="miniwdl.plugin.container_backend", name="podman", value="WDL.runtime.backend.podman:PodmanContainer", ), importlib_metadata.EntryPoint( group="miniwdl.plugin.container_backend", name="udocker", value="WDL.runtime.backend.udocker:UdockerContainer", ), ], "cache_backend": [ importlib_metadata.EntryPoint( group="miniwdl.plugin.cache_backend", name="dir", value="WDL.runtime.cache:CallCache", ) ], } def load_all_plugins(cfg: Loader, group: str) -> Iterable[Tuple[bool, Any]]: import importlib_metadata # delayed heavy import defaults = default_plugins() assert group in defaults, group enable_patterns = cfg["plugins"].get_list("enable_patterns") disable_patterns = cfg["plugins"].get_list("disable_patterns") for plugin in defaults[group] + list( importlib_metadata.entry_points(group=f"miniwdl.plugin.{group}") ): enabled = next( (pat for pat in enable_patterns if fnmatchcase(plugin.value, pat)), False ) and not next((pat for pat in disable_patterns if fnmatchcase(plugin.value, pat)), False) yield (enabled, plugin) def load_plugins(cfg: Loader, group: str) -> Iterable[Tuple[str, Callable[..., Any]]]: yield from ( (plugin.name, plugin.load()) for enabled, plugin in load_all_plugins(cfg, group) if enabled )