Source code for pulsar.managers.queued_cli
"""
Pulsar job manager that uses a CLI interface to a job queue (e.g. Torque's qsub,
qstat, etc...).
"""
from logging import getLogger
from .base.external import ExternalBaseManager
from .util.cli import (
CliInterface,
split_params,
)
from .util.external import parse_external_id
from .util.job_script import job_script
log = getLogger(__name__)
[docs]
class CliQueueManager(ExternalBaseManager):
manager_type = "queued_cli"
def __init__(self, name, app, **kwds):
super().__init__(name, app, **kwds)
self.cli_interface = CliInterface()
self.shell_params, self.job_params = split_params(kwds)
[docs]
def launch(self, job_id, command_line, submit_params={}, dependencies_description=None, env=[], setup_params=None):
self._check_execution_with_tool_file(job_id, command_line)
shell, job_interface = self.__get_cli_plugins()
stdout_path = self._job_stdout_path(job_id)
stderr_path = self._job_stderr_path(job_id)
job_name = self._job_name(job_id)
command_line = self._expand_command_line(
job_id, command_line, dependencies_description, job_directory=self.job_directory(job_id).job_directory
)
job_script_kwargs = self._job_template_env(
job_id,
command_line=command_line,
env=env,
setup_params=setup_params
)
extra_kwargs = job_interface.job_script_kwargs(stdout_path, stderr_path, job_name)
job_script_kwargs.update(extra_kwargs)
script = job_script(**job_script_kwargs)
script_path = self._write_job_script(job_id, script)
submission_command = job_interface.submit(script_path)
cmd_out = shell.execute(submission_command)
if cmd_out.returncode != 0:
log.warn("Failed to submit job - command was:\n%s" % submission_command)
raise Exception("Failed to submit job, error was:\n%s" % cmd_out.stderr)
external_id = parse_external_id(cmd_out.stdout.strip())
if not external_id:
message_template = "Failed to obtain external id for job_id %s and submission_command %s"
message = message_template % (job_id, submission_command)
log.warn(message)
raise Exception("Failed to obtain external id")
self._register_external_id(job_id, external_id)
def __get_cli_plugins(self):
return self.cli_interface.get_plugins(self.shell_params, self.job_params)
def _kill_external(self, external_id):
shell, job_interface = self.__get_cli_plugins()
kill_command = job_interface.delete(external_id)
shell.execute(kill_command)
def _get_status_external(self, external_id):
shell, job_interface = self.__get_cli_plugins()
status_command = job_interface.get_single_status(external_id)
cmd_out = shell.execute(status_command)
state = job_interface.parse_single_status(cmd_out.stdout, external_id)
return state