Source code for pulsar.managers.base.base_drmaa

"""Module defines a base class for Pulsar managers using DRMAA."""
import logging

try:
    from drmaa import JobState
except (OSError, ImportError, RuntimeError):
    JobState = None

from pulsar.managers import status
from .external import ExternalBaseManager
from ..util.drmaa import DrmaaSessionFactory

log = logging.getLogger(__name__)

IGNORE_SUBMISSION_SPEC_MESSAGE = "Submission recieved native_specification but being overridden by manager specification."


[docs] class BaseDrmaaManager(ExternalBaseManager): """Base class for Pulsar managers using DRMAA.""" def __init__(self, name, app, **kwds): """Setup native specification and drmaa session factory.""" super().__init__(name, app, **kwds) self.native_specification = kwds.get('native_specification', None) drmaa_session_factory_class = kwds.get('drmaa_session_factory_class', DrmaaSessionFactory) drmaa_session_factory = drmaa_session_factory_class() self.drmaa_session = drmaa_session_factory.get()
[docs] def shutdown(self, timeout=None): """Cleanup DRMAA session and call shutdown of parent.""" try: super().shutdown(timeout) except Exception: pass self.drmaa_session.close()
def _get_status_external(self, external_id): drmaa_state = self.drmaa_session.job_status(external_id) return { JobState.UNDETERMINED: status.COMPLETE, JobState.QUEUED_ACTIVE: status.QUEUED, JobState.SYSTEM_ON_HOLD: status.QUEUED, JobState.USER_ON_HOLD: status.QUEUED, JobState.USER_SYSTEM_ON_HOLD: status.QUEUED, JobState.RUNNING: status.RUNNING, JobState.SYSTEM_SUSPENDED: status.QUEUED, JobState.USER_SUSPENDED: status.QUEUED, JobState.DONE: status.COMPLETE, JobState.FAILED: status.COMPLETE, # Should be a FAILED state here as well }[drmaa_state] def _build_template_attributes(self, job_id, command_line, dependencies_description=None, env=[], submit_params={}, setup_params=None): stdout_path = self._job_stdout_path(job_id) stderr_path = self._job_stderr_path(job_id) working_directory = self.job_directory(job_id).working_directory() attributes = { "remoteCommand": self._setup_job_file( job_id, command_line, dependencies_description=dependencies_description, env=env, setup_params=setup_params ), "jobName": self._job_name(job_id), "outputPath": ":%s" % stdout_path, "errorPath": ":%s" % stderr_path, "workingDirectory": working_directory, } submit_native_specification = submit_params.get("native_specification", None) native_specification = None if self.native_specification: native_specification = self.native_specification if submit_native_specification is not None: log.warn(IGNORE_SUBMISSION_SPEC_MESSAGE) elif submit_native_specification: native_specification = submit_params["native_specification"] if native_specification is not None: attributes["nativeSpecification"] = native_specification log.info("Submitting DRMAA job with nativeSpecification [%s]" % native_specification) else: log.debug("No native specification supplied, DRMAA job will be submitted with default parameters.") return attributes
__all__ = ("BaseDrmaaManager",)