Source code for pulsar.client.config_util

""" Generic interface for reading YAML/INI/JSON config files into nested dictionaries.
"""

import codecs

try:
    import yaml
except ImportError:
    yaml = None  # type: ignore
import json
from configparser import ConfigParser

CONFIG_TYPE_JSON = "json"
CONFIG_TYPE_YAML = "yaml"
CONFIG_TYPE_INI = "ini"

DEFAULT_CONFIG_TYPE = CONFIG_TYPE_YAML

JSON_EXTS = [".json"]
YAML_EXTS = [".yaml", ".yml"]
INI_EXTS = [".ini"]

EXT_MAP = {
    CONFIG_TYPE_JSON: JSON_EXTS,
    CONFIG_TYPE_YAML: YAML_EXTS,
    CONFIG_TYPE_INI: INI_EXTS,
}


[docs] def read_file(path, type=None, default_type=DEFAULT_CONFIG_TYPE): if path is None: raise ValueError("Undefined path supplied.") config_type = __find_type(path, type, default_type) return EXT_READERS[config_type](path)
def __find_type(path, explicit_type, default_type): if explicit_type: return explicit_type for config_type, config_exts in EXT_MAP.items(): for ext in config_exts: if path.endswith(ext): return config_type return default_type def __read_yaml(path): if yaml is None: raise ImportError("Attempting to read YAML configuration file - but PyYAML dependency unavailable.") with open(path, "rb") as f: return yaml.safe_load(f) def __read_ini(path): config = ConfigParser() config.read(path) return config._sections def __read_json(path): reader = codecs.getreader("utf-8") with open(path, "rb") as f: return json.load(reader(f)) EXT_READERS = { CONFIG_TYPE_JSON: __read_json, CONFIG_TYPE_YAML: __read_yaml, CONFIG_TYPE_INI: __read_ini, }