Source code for pulsar.managers.base.external
import logging
from string import Template
from pulsar.managers import status
from .directory import DirectoryBaseManager
DEFAULT_JOB_NAME_TEMPLATE = "pulsar_$job_id"
JOB_FILE_EXTERNAL_ID = "external_id"
FAILED_TO_LOAD_EXTERNAL_ID = object()
log = logging.getLogger(__name__)
[docs]
class ExternalBaseManager(DirectoryBaseManager):
""" Base class for managers that interact with external distributed
resource managers.
"""
def __init__(self, name, app, **kwds):
super().__init__(name, app, **kwds)
self._external_ids = {}
self.job_name_template = kwds.get('job_name_template', DEFAULT_JOB_NAME_TEMPLATE)
[docs]
def clean(self, job_id):
super().clean(job_id)
[docs]
def kill(self, job_id):
self._record_cancel(job_id)
external_id = self._external_id(job_id)
if external_id:
try:
self._kill_external(external_id)
except Exception:
log.exception("Failed to kill job with id %s and external id %s", job_id, external_id)
[docs]
def get_status(self, job_id):
if self._was_cancelled(job_id):
return status.CANCELLED
external_id = self._external_id(job_id)
if not external_id:
log.warning("Failed to find external id for job_id %s", job_id)
return status.LOST
return self._get_status_external(external_id)
def _register_external_id(self, job_id, external_id):
if isinstance(external_id, bytes):
external_id = external_id.decode("utf-8")
self._job_directory(job_id).store_metadata(JOB_FILE_EXTERNAL_ID, external_id)
self._external_ids[job_id] = external_id
return external_id
def _external_id(self, job_id):
return self._external_ids.get(job_id, None)
def _job_name(self, job_id):
env = self._job_template_env(job_id)
return Template(self.job_name_template).safe_substitute(env)
def _recover_active_job(self, job_id):
external_id = self._job_directory(job_id).load_metadata(JOB_FILE_EXTERNAL_ID, FAILED_TO_LOAD_EXTERNAL_ID)
if external_id and external_id is not FAILED_TO_LOAD_EXTERNAL_ID:
self._external_ids[job_id] = external_id
else:
raise Exception("Could not determine external ID for job_id [%s]" % job_id)
def _deactivate_job(self, job_id):
del self._external_ids[job_id]