Source code for pulsar.managers.util.drmaa
import contextlib
import logging
import threading
try:
from drmaa import (
JobControlAction,
Session,
)
except OSError as e:
LOAD_ERROR_MESSAGE = "OSError - problem loading shared library [%s]." % e
Session = None
except ImportError as e:
LOAD_ERROR_MESSAGE = "ImportError - problem importing library (`pip install drmaa` may fix this) [%s]." % e
# Will not be able to use DRMAA
Session = None
except RuntimeError as e:
LOAD_ERROR_MESSAGE = "RuntimeError - problem importing library (setting DRMAA_LIBRARY_PATH may fix this) [%s]." % e
Session = None
NO_DRMAA_MESSAGE = "Attempt to use DRMAA, but DRMAA Python library cannot be loaded. "
log = logging.getLogger(__name__)
[docs]
class DrmaaSessionFactory:
"""
Abstraction used to production DrmaaSession wrappers.
"""
def __init__(self):
self.session_constructor = Session
[docs]
def get(self, **kwds):
session_constructor = self.session_constructor
if session_constructor is None:
raise Exception(NO_DRMAA_MESSAGE + LOAD_ERROR_MESSAGE)
return DrmaaSession(session_constructor, **kwds)
class DrmaaSession:
"""
Abstraction around `drmaa` module `Session` objects.
"""
session_lock = threading.Lock()
session_count = 0
session = None
def __init__(self, session_constructor, **kwds):
with self._session_lock():
if DrmaaSession.session is None:
if DrmaaSession.session_count != 0:
log.warn("DrmaaSession.session is None but session_count is non-zero - logic error occurred.")
log.debug("Initializing DRMAA session from thread %s", threading.current_thread().name)
DrmaaSession.session = session_constructor()
DrmaaSession.session.initialize()
DrmaaSession.session_count += 1
@contextlib.contextmanager
def _session_lock(self):
with DrmaaSession.session_lock:
yield
def run_job(self, **kwds):
"""
Create a DRMAA job template, populate with specified properties,
run the job, and return the external_job_id.
"""
template = DrmaaSession.session.createJobTemplate()
try:
for key in kwds:
setattr(template, key, kwds[key])
with DrmaaSession.session_lock:
return DrmaaSession.session.runJob(template)
finally:
DrmaaSession.session.deleteJobTemplate(template)
def kill(self, external_job_id):
with DrmaaSession.session_lock:
return DrmaaSession.session.control(str(external_job_id), JobControlAction.TERMINATE)
def job_status(self, external_job_id):
return DrmaaSession.session.jobStatus(str(external_job_id))
def close(self):
with self._session_lock():
if DrmaaSession.session_count == 0:
log.warn("close() called with zero active session counted - logic error.")
return
DrmaaSession.session_count -= 1
if DrmaaSession.session_count == 0:
if DrmaaSession.session is None:
log.warn("close() called with a non-zero session count but no session is defined.")
return
DrmaaSession.session.exit()
DrmaaSession.session = None
__all__ = ['DrmaaSessionFactory']