Source code for pulsar.manager_factory
import configparser
import inspect
import logging
import os
import pulsar.managers
from pulsar.managers import stateful
log = logging.getLogger(__name__)
MANAGER_PREFIX = 'manager:'
DEFAULT_MANAGER_NAME = '_default_'
DEFAULT_MANAGER_TYPE = 'queued_python'
[docs]def build_managers(app, conf):
"""
Takes in a config file as outlined in job_managers.ini.sample and builds
a dictionary of job manager objects from them.
"""
# Load default options from config file that apply to all
# managers.
default_options = _get_default_options(conf)
manager_descriptions = ManagerDescriptions()
if "job_managers_config" in conf:
job_managers_config = conf.get("job_managers_config", None)
_populate_manager_descriptions_from_ini(manager_descriptions, job_managers_config)
elif "managers" in conf:
for manager_name, manager_options in conf["managers"].items():
manager_description = ManagerDescription.from_dict(manager_options, manager_name)
manager_descriptions.add(manager_description)
elif "manager" in conf:
manager_description = ManagerDescription.from_dict(conf["manager"])
manager_descriptions.add(manager_description)
else:
manager_descriptions.add(ManagerDescription())
manager_classes = _get_managers_dict()
managers = {}
for manager_name, manager_description in manager_descriptions.descriptions.items():
manager_options = dict(default_options)
manager_options.update(manager_description.manager_options)
manager_class = manager_classes[manager_description.manager_type]
manager = _build_manager(manager_class, app, manager_name, manager_options)
managers[manager_name] = manager
return managers
def _populate_manager_descriptions_from_ini(manager_descriptions, job_managers_config):
config = configparser.ConfigParser()
config.readfp(open(job_managers_config))
for section in config.sections():
if not section.startswith(MANAGER_PREFIX):
continue
manager_name = section[len(MANAGER_PREFIX):]
manager_description = ManagerDescription.from_ini_config(config, manager_name)
manager_descriptions.add(manager_description)
def _get_default_options(conf):
options = {}
for simple_key in ["assign_ids", "galaxy_home"]:
if simple_key in conf:
options[simple_key] = conf[simple_key]
options["debug"] = conf.get("debug", False)
maximum_stream_size = conf.get("maximum_stream_size", 1024 * 1024)
if maximum_stream_size:
options["maximum_stream_size"] = int(maximum_stream_size)
# mode to create job directories with, if None just use
# default (usually 0777 with umask applied).
job_directory_mode = conf.get("job_directory_mode", None)
options["job_directory_mode"] = None
if job_directory_mode is not None:
options["job_directory_mode"] = int(job_directory_mode, 8)
return options
def _build_manager(manager_class, app, name=DEFAULT_MANAGER_NAME, manager_options={}):
return stateful.StatefulManagerProxy(manager_class(name, app, **manager_options), **manager_options)
def _get_manager_modules():
"""
>>> 'pulsar.managers.queued_pbs' in _get_manager_modules()
True
>>> 'pulsar.managers.queued_drmaa' in _get_manager_modules()
True
"""
managers_dir = pulsar.managers.__path__[0]
module_names = []
for fname in os.listdir(managers_dir):
if not fname.startswith("_") and fname.endswith(".py"):
manager_module_name = "pulsar.managers.%s" % fname[:-len(".py")]
module_names.append(manager_module_name)
return module_names
def _load_manager_modules():
modules = []
for manager_module_name in _get_manager_modules():
try:
module = __import__(manager_module_name)
for comp in manager_module_name.split(".")[1:]:
module = getattr(module, comp)
modules.append(module)
except BaseException as exception:
exception_str = str(exception)
message = "{} manager module could not be loaded: {}".format(manager_module_name, exception_str)
log.warn(message)
continue
return modules
def _get_managers_dict():
"""
>>> from pulsar.managers.queued_pbs import PbsQueueManager
>>> _get_managers_dict()['queued_pbs'] == PbsQueueManager
True
>>> from pulsar.managers.queued_drmaa import DrmaaQueueManager
>>> _get_managers_dict()['queued_drmaa'] == DrmaaQueueManager
True
"""
managers = {}
for manager_module in _load_manager_modules():
for _, obj in inspect.getmembers(manager_module):
if inspect.isclass(obj) and hasattr(obj, 'manager_type'):
managers[obj.manager_type] = obj
return managers
[docs]class ManagerDescriptions:
def __init__(self):
self.descriptions = {}
[docs] def add(self, manager_description):
manager_name = manager_description.manager_name
if manager_name in self.descriptions:
raise Exception("Problem configuring job managers, multiple managers with name %s" % manager_name)
self.descriptions[manager_name] = manager_description
[docs]class ManagerDescription:
def __init__(self, manager_type=DEFAULT_MANAGER_TYPE, manager_name=DEFAULT_MANAGER_NAME, manager_options={}):
self.manager_type = manager_type
self.manager_name = manager_name
self.manager_options = manager_options
[docs] @staticmethod
def from_ini_config(config, manager_name):
section_name = '{}{}'.format(MANAGER_PREFIX, manager_name)
try:
manager_type = config.get(section_name, 'type')
except ValueError:
manager_type = DEFAULT_MANAGER_TYPE
# Merge default and specific manager options.
manager_options = {}
manager_options.update(dict(config.items(section_name)))
return ManagerDescription(manager_type, manager_name, manager_options)
[docs] @staticmethod
def from_dict(config, manager_name=None):
manager_type = config.get("type", DEFAULT_MANAGER_TYPE)
manager_name = manager_name or config.get("name") or DEFAULT_MANAGER_NAME
return ManagerDescription(manager_type, manager_name, config)