Source code for pulsar.managers.queued_external_drmaa

from getpass import getuser
from json import dumps

from .base.base_drmaa import BaseDrmaaManager
from .util.sudo import sudo_popen
from ..managers import status

try:
    from galaxy.tools.deps.commands import which
except ImportError:
    from galaxy.tool_util.deps.commands import which

from logging import getLogger

log = getLogger(__name__)

DEFAULT_CHOWN_WORKING_DIRECTORY_SCRIPT = "scripts/chown_working_directory.bash"
DEFAULT_DRMAA_KILL_SCRIPT = "scripts/drmaa_kill.bash"
DEFAULT_DRMAA_LAUNCH_SCRIPT = "scripts/drmaa_launch.bash"


[docs] class ExternalDrmaaQueueManager(BaseDrmaaManager): """ DRMAA backed queue manager. """ manager_type = "queued_external_drmaa" def __init__(self, name, app, **kwds): super().__init__(name, app, **kwds) self.chown_working_directory_script = _handle_default(kwds.get('chown_working_directory_script', None), "chown_working_directory") self.drmaa_kill_script = _handle_default(kwds.get('drmaa_kill_script', None), "drmaa_kill") self.drmaa_launch_script = _handle_default(kwds.get('drmaa_launch_script', None), "drmaa_launch") self.production = str(kwds.get('production', "true")).lower() != "false" self.reclaimed = {} self.user_map = {}
[docs] def launch(self, job_id, command_line, submit_params={}, dependencies_description=None, env=[], setup_params=None): self._check_execution_with_tool_file(job_id, command_line) attributes = self._build_template_attributes( job_id, command_line, dependencies_description=dependencies_description, env=env, submit_params=submit_params, setup_params=setup_params, ) print(open(attributes['remoteCommand']).read()) job_attributes_file = self._write_job_file(job_id, 'jt.json', dumps(attributes)) user = submit_params.get('user', None) log.info("Submit as user %s" % user) if not user: raise Exception("Must specify user submit parameter with this manager.") self.__change_ownership(job_id, user) external_id = self.__launch(job_attributes_file, user).strip() self.user_map[external_id] = user self._register_external_id(job_id, external_id)
def _kill_external(self, external_id): user = self.user_map[external_id] self.__sudo(self.drmaa_kill_script, "--external_id", external_id, user=user)
[docs] def get_status(self, job_id): external_id = self._external_id(job_id) if not external_id: raise KeyError("Failed to find external id for job_id %s" % job_id) external_status = super()._get_status_external(external_id) if external_status == status.COMPLETE and job_id not in self.reclaimed: self.reclaimed[job_id] = True self.__change_ownership(job_id, getuser()) return external_status
def __launch(self, job_attributes, user): return self.__sudo(self.drmaa_launch_script, "--job_attributes", str(job_attributes), user=user) def __change_ownership(self, job_id, username): cmds = [self.chown_working_directory_script, "--user", str(username)] if self.production: cmds.extend(["--job_id", job_id]) else: # In testing, the loading working directory from server.ini doesn't # work. Need to reimagine how to securely map job_id to working # direcotry between test cases and production. cmds.extend(["--job_directory", str(self._job_directory(job_id).path)]) # TODO: Verify ownership change. self.__sudo(*cmds) def __sudo(self, *cmds, **kwargs): p = sudo_popen(*cmds, **kwargs) stdout, stderr = p.communicate() assert p.returncode == 0, "{}, {}".format(stdout, stderr) return stdout
def _handle_default(value, script_name): """ There are two potential variants of these scripts, the Bash scripts that are meant to be run within PULSAR_ROOT for older-style installs and the binaries created by setup.py as part of a proper pulsar installation. This method first looks for the newer style variant of these scripts and returns the full path to them if needed and falls back to the bash scripts if these cannot be found. """ if value: return value installed_script = which("pulsar-%s" % script_name.replace("_", "-")) if installed_script: return installed_script else: return "scripts/%s.bash" % script_name