"""
"""
from abc import (
ABCMeta,
abstractmethod,
)
PULSAR_UNKNOWN_RETURN_CODE = '__unknown__'
[docs]
class ManagerInterface:
"""
Defines the interface to various job managers.
"""
__metaclass__ = ABCMeta
[docs]
@abstractmethod
def setup_job(self, input_job_id, tool_id, tool_version):
"""
Setup a job directory for specified input (galaxy) job id, tool id,
and tool version.
"""
[docs]
@abstractmethod
def clean(self, job_id):
"""
Delete job directory and clean up resources associated with job with
id `job_id`.
"""
[docs]
@abstractmethod
def launch(self, job_id, command_line, submit_params={}, dependencies_description=None, env=[], setup_params=None):
"""
Called to indicate that the client is ready for this job with specified
job id and command line to be executed (i.e. run or queue this job
depending on implementation).
"""
[docs]
@abstractmethod
def get_status(self, job_id):
"""
Return status of job as string, currently supported statuses include
'cancelled', 'running', 'queued', and 'complete'.
"""
[docs]
@abstractmethod
def return_code(self, job_id):
"""
Return integer indicating return code of specified execution or
PULSAR_UNKNOWN_RETURN_CODE.
"""
[docs]
@abstractmethod
def stdout_contents(self, job_id):
"""
After completion, return contents of stdout of the tool script.
"""
[docs]
@abstractmethod
def stderr_contents(self, job_id):
"""
After completion, return contents of stderr of the tool script.
"""
[docs]
@abstractmethod
def job_stdout_contents(self, job_id):
"""
After completion, return contents of stdout of the job as produced by the job runner.
"""
[docs]
@abstractmethod
def job_stderr_contents(self, job_id):
"""
After completion, return contents of stderr of the job as produced by the job runner.
"""
[docs]
@abstractmethod
def kill(self, job_id):
"""
End or cancel execution of the specified job.
"""
[docs]
@abstractmethod
def job_directory(self, job_id):
""" Return a JobDirectory abstraction describing the state of the
job working directory.
"""
[docs]
class ManagerProxy:
"""
Subclass to build override proxy a manager and override specific
functionality.
"""
def __init__(self, manager):
self._proxied_manager = manager
[docs]
def setup_job(self, *args, **kwargs):
return self._proxied_manager.setup_job(*args, **kwargs)
[docs]
def clean(self, *args, **kwargs):
return self._proxied_manager.clean(*args, **kwargs)
[docs]
def launch(self, *args, **kwargs):
return self._proxied_manager.launch(*args, **kwargs)
[docs]
def get_status(self, *args, **kwargs):
return self._proxied_manager.get_status(*args, **kwargs)
[docs]
def return_code(self, *args, **kwargs):
return self._proxied_manager.return_code(*args, **kwargs)
[docs]
def stdout_contents(self, *args, **kwargs):
return self._proxied_manager.stdout_contents(*args, **kwargs)
[docs]
def stderr_contents(self, *args, **kwargs):
return self._proxied_manager.stderr_contents(*args, **kwargs)
[docs]
def job_stdout_contents(self, *args, **kwargs):
return self._proxied_manager.job_stdout_contents(*args, **kwargs)
[docs]
def job_stderr_contents(self, *args, **kwargs):
return self._proxied_manager.job_stderr_contents(*args, **kwargs)
[docs]
def kill(self, *args, **kwargs):
return self._proxied_manager.kill(*args, **kwargs)
[docs]
def shutdown(self, timeout=None):
""" Optional. """
try:
shutdown_method = self._proxied_manager.shutdown
except AttributeError:
return
shutdown_method(timeout)
[docs]
def job_directory(self, *args, **kwargs):
return self._proxied_manager.job_directory(*args, **kwargs)
[docs]
def system_properties(self):
return self._proxied_manager.system_properties()
@property
def object_store(self):
return self._proxied_manager.object_store
def __str__(self):
return "ManagerProxy[manager=%s]" % str(self._proxied_manager)