Source code for pulsar.managers.stateful

import contextlib
import datetime
import os
import threading
import time

try:
    # If galaxy-lib or Galaxy 19.05 present.
    from galaxy.tools.deps.dependencies import DependenciesDescription
except ImportError:
    # If galaxy-tool-util or Galaxy 19.09 present.
    from galaxy.tool_util.deps.dependencies import DependenciesDescription

import logging

from pulsar.client.util import filter_destination_params
from pulsar.managers import (
    ManagerProxy,
    status,
)
from pulsar.managers.util.retry import RetryActionExecutor
from .staging import (
    postprocess,
    preprocess,
)

log = logging.getLogger(__name__)

DEFAULT_DO_MONITOR = False

DECACTIVATE_FAILED_MESSAGE = "Failed to deactivate job with job id %s. May cause problems on next Pulsar start."
ACTIVATE_FAILED_MESSAGE = "Failed to activate job with job id %s. This job may not recover properly upon Pulsar restart."

JOB_FILE_FINAL_STATUS = "final_status"
JOB_FILE_POSTPROCESSED = "postprocessed"
JOB_FILE_PREPROCESSED = "preprocessed"
JOB_FILE_PREPROCESSING_FAILED = "preprocessing_failed"
JOB_METADATA_RUNNING = "running"

ACTIVE_STATUS_PREPROCESSING = "preprocessing"
ACTIVE_STATUS_LAUNCHED = "launched"

DEFAULT_MIN_POLLING_INTERVAL = 0.5


[docs] class StatefulManagerProxy(ManagerProxy): """ """ def __init__(self, manager, **manager_options): super().__init__(manager) min_polling_interval = float(manager_options.get("min_polling_interval", DEFAULT_MIN_POLLING_INTERVAL)) preprocess_retry_action_kwds = filter_destination_params(manager_options, "preprocess_action_") postprocess_retry_action_kwds = filter_destination_params(manager_options, "postprocess_action_") self.__preprocess_action_executor = RetryActionExecutor(**preprocess_retry_action_kwds) self.__postprocess_action_executor = RetryActionExecutor(**postprocess_retry_action_kwds) self.min_polling_interval = datetime.timedelta(0, min_polling_interval) self.active_jobs = ActiveJobs.from_manager(manager) self.__state_change_callback = self._default_status_change_callback self.__monitor = None
[docs] def set_state_change_callback(self, state_change_callback): self.__state_change_callback = state_change_callback self.__monitor = ManagerMonitor(self)
def _default_status_change_callback(self, status, job_id): log.info("Status of job [{}] changed to [{}]. No callbacks enabled.".format(job_id, status))
[docs] def trigger_state_change_callback(self, job_id): proxy_status = self.get_status(job_id) log.debug("Triggering state change callback with status %s by request for job_id %s", proxy_status, job_id) self.__state_change_callback(proxy_status, job_id)
@property def name(self): return self._proxied_manager.name
[docs] def setup_job(self, *args, **kwargs): job_id = self._proxied_manager.setup_job(*args, **kwargs) return job_id
def _persist_launch_config(self, job_id, launch_config): job_directory = self._proxied_manager.job_directory(job_id) job_directory.store_metadata("launch_config", launch_config)
[docs] def touch_outputs(self, job_id, touch_outputs): job_directory = self._proxied_manager.job_directory(job_id) for name in touch_outputs: path = job_directory.calculate_path(name, 'output') job_directory.open_file(path, mode='a')
[docs] def preprocess_and_launch(self, job_id, launch_config): self._persist_launch_config(job_id, launch_config) requires_preprocessing = launch_config.get("remote_staging") and launch_config["remote_staging"].get("setup") if requires_preprocessing: self.active_jobs.activate_job(job_id, active_status=ACTIVE_STATUS_PREPROCESSING) self._launch_prepreprocessing_thread(job_id, launch_config) else: with self._handling_of_preprocessing_state(job_id, launch_config): pass
def _launch_prepreprocessing_thread(self, job_id, launch_config): def do_preprocess(): with self._handling_of_preprocessing_state(job_id, launch_config): job_directory = self._proxied_manager.job_directory(job_id) staging_config = launch_config.get("remote_staging", {}) # TODO: swap out for a generic "job_extra_params" if 'action_mapper' in staging_config and \ 'ssh_key' in staging_config['action_mapper'] and \ 'setup' in staging_config: for action in staging_config['setup']: action['action'].update(ssh_key=staging_config['action_mapper']['ssh_key']) setup_config = staging_config.get("setup", []) preprocess(job_directory, setup_config, self.__preprocess_action_executor, object_store=self.object_store) self.active_jobs.deactivate_job(job_id, active_status=ACTIVE_STATUS_PREPROCESSING) new_thread_for_job(self, "preprocess", job_id, do_preprocess, daemon=False) @contextlib.contextmanager def _handling_of_preprocessing_state(self, job_id, launch_config): job_directory = self._proxied_manager.job_directory(job_id) try: yield launch_kwds = {} if launch_config.get("dependencies_description"): dependencies_description = DependenciesDescription.from_dict(launch_config["dependencies_description"]) launch_kwds["dependencies_description"] = dependencies_description for kwd in ["submit_params", "setup_params", "env"]: if kwd in launch_config: launch_kwds[kwd] = launch_config[kwd] self._proxied_manager.launch( job_id, launch_config["command_line"], **launch_kwds ) with job_directory.lock("status"): job_directory.store_metadata(JOB_FILE_PREPROCESSED, True) self.active_jobs.activate_job(job_id) except Exception as e: with job_directory.lock("status"): job_directory.store_metadata(JOB_FILE_PREPROCESSING_FAILED, True) job_directory.store_metadata("return_code", 1) job_directory.write_file("stderr", str(e)) self.__state_change_callback(status.FAILED, job_id) log.exception("Failed job preprocessing for job %s:", job_id)
[docs] def handle_failure_before_launch(self, job_id): self.__state_change_callback(status.FAILED, job_id)
[docs] def get_status(self, job_id): """ Compute status used proxied manager and handle state transitions and track additional state information needed. """ job_directory = self._proxied_manager.job_directory(job_id) if not job_directory.exists(): return status.LOST with job_directory.lock("status"): proxy_status, state_change = self.__proxy_status(job_directory, job_id) if state_change == "to_complete": self.__deactivate(job_id, proxy_status) elif state_change == "to_running": self.__state_change_callback(status.RUNNING, job_id) return self.__status(job_directory, proxy_status)
def __proxy_status(self, job_directory, job_id): """ Determine state with proxied job manager and if this job needs to be marked as deactivated (this occurs when job first returns a complete status from proxy. """ state_change = None if job_directory.has_metadata(JOB_FILE_PREPROCESSING_FAILED): proxy_status = status.FAILED job_directory.store_metadata(JOB_FILE_FINAL_STATUS, proxy_status) state_change = "to_complete" elif not job_directory.has_metadata(JOB_FILE_PREPROCESSED): proxy_status = status.PREPROCESSING elif job_directory.has_metadata(JOB_FILE_FINAL_STATUS): proxy_status = job_directory.load_metadata(JOB_FILE_FINAL_STATUS) else: proxy_status = self._proxied_manager.get_status(job_id) if proxy_status == status.RUNNING: if not job_directory.has_metadata(JOB_METADATA_RUNNING): job_directory.store_metadata(JOB_METADATA_RUNNING, True) state_change = "to_running" elif proxy_status in [status.COMPLETE, status.CANCELLED]: job_directory.store_metadata(JOB_FILE_FINAL_STATUS, proxy_status) state_change = "to_complete" return proxy_status, state_change def __status(self, job_directory, proxy_status): """ Use proxied manager's status to compute the real (stateful) status of job. """ if proxy_status == status.COMPLETE: if not job_directory.has_metadata(JOB_FILE_POSTPROCESSED): job_status = status.POSTPROCESSING else: job_status = status.COMPLETE else: job_status = proxy_status return job_status def __deactivate(self, job_id, proxy_status): self.active_jobs.deactivate_job(job_id) deactivate_method = getattr(self._proxied_manager, "_deactivate_job", None) if deactivate_method: try: deactivate_method(job_id) except Exception: log.exception("Failed to deactivate via proxied manager job %s" % job_id) if proxy_status == status.COMPLETE: self.__handle_postprocessing(job_id) def __handle_postprocessing(self, job_id): def do_postprocess(): postprocess_success = False job_directory = self._proxied_manager.job_directory(job_id) try: postprocess_success = postprocess(job_directory, self.__postprocess_action_executor) except Exception: log.exception("Failed to postprocess results for job id %s" % job_id) final_status = status.COMPLETE if postprocess_success else status.FAILED if job_directory.has_metadata(JOB_FILE_PREPROCESSING_FAILED): final_status = status.FAILED self.__state_change_callback(final_status, job_id) new_thread_for_job(self, "postprocess", job_id, do_postprocess, daemon=False)
[docs] def shutdown(self, timeout=None): if self.__monitor: try: self.__monitor.shutdown(timeout) except Exception: log.exception("Failed to shutdown job monitor for manager %s" % self.name) super().shutdown(timeout)
[docs] def recover_active_jobs(self): unqueue_preprocessing_ids = [] for job_id in self.active_jobs.active_job_ids(active_status=ACTIVE_STATUS_PREPROCESSING): job_directory = self._proxied_manager.job_directory(job_id) if not job_directory.has_metadata("launch_config"): log.warn("Failed to find launch parameters for job scheduled to prepreprocess [%s]" % job_id) unqueue_preprocessing_ids.append(job_id) elif job_directory.has_metadata(JOB_FILE_PREPROCESSED): log.warn("Job scheduled to prepreprocess [%s] already preprocessed, skipping" % job_id) unqueue_preprocessing_ids.append(job_id) elif job_directory.has_metadata(JOB_FILE_PREPROCESSING_FAILED): log.warn("Job scheduled to prepreprocess [%s] previously failed preprocessing, skipping" % job_id) unqueue_preprocessing_ids.append(job_id) else: launch_config = job_directory.load_metadata("launch_config") self._launch_prepreprocessing_thread(job_id, launch_config) for unqueue_preprocessing_id in unqueue_preprocessing_ids: self.active_jobs.deactivate_job(unqueue_preprocessing_id, active_status=ACTIVE_STATUS_PREPROCESSING) recover_method = getattr(self._proxied_manager, "_recover_active_job", None) if recover_method is None: return for job_id in self.active_jobs.active_job_ids(active_status=ACTIVE_STATUS_LAUNCHED): try: recover_method(job_id) except Exception: log.exception("Failed to recover active job %s" % job_id) self.__handle_recovery_problem(job_id)
def __handle_recovery_problem(self, job_id): # Make sure we tell the client we have lost this job. self.active_jobs.deactivate_job(job_id) self.__state_change_callback(status.LOST, job_id)
class ActiveJobs: """ Keeps track of active jobs (those that are not yet "complete"). Current implementation is file based, but could easily be made database-based instead. TODO: Keep jobs in memory after initial load so don't need to repeatedly hit disk to recover this information. """ @staticmethod def from_manager(manager): persistence_directory = manager.persistence_directory manager_name = manager.name return ActiveJobs(manager_name, persistence_directory) def __init__(self, manager_name, persistence_directory): if persistence_directory: active_job_directory = os.path.join(persistence_directory, "%s-active-jobs" % manager_name) if not os.path.exists(active_job_directory): os.makedirs(active_job_directory) preprocessing_job_directory = os.path.join(persistence_directory, "%s-preprocessing-jobs" % manager_name) if not os.path.exists(preprocessing_job_directory): os.makedirs(preprocessing_job_directory) else: active_job_directory = None preprocessing_job_directory = None self.launched_job_directory = active_job_directory self.preprocessing_job_directory = preprocessing_job_directory def active_job_ids(self, active_status=ACTIVE_STATUS_LAUNCHED): job_ids = [] target_directory = self._active_job_directory(active_status) if target_directory: job_ids = os.listdir(target_directory) return job_ids def activate_job(self, job_id, active_status=ACTIVE_STATUS_LAUNCHED): if self._active_job_directory(active_status): path = self._active_job_file(job_id, active_status=active_status) try: open(path, "w").close() except Exception: log.warn(ACTIVATE_FAILED_MESSAGE % job_id) def deactivate_job(self, job_id, active_status=ACTIVE_STATUS_LAUNCHED): if self._active_job_directory(active_status): path = self._active_job_file(job_id, active_status=active_status) if os.path.exists(path): try: os.remove(path) except Exception: log.warn(DECACTIVATE_FAILED_MESSAGE % job_id) def _active_job_directory(self, active_status): if active_status == ACTIVE_STATUS_LAUNCHED: target_directory = self.launched_job_directory elif active_status == ACTIVE_STATUS_PREPROCESSING: target_directory = self.preprocessing_job_directory else: raise Exception("Unknown active state encountered [%s]" % active_status) return target_directory def _active_job_file(self, job_id, active_status=ACTIVE_STATUS_LAUNCHED): return os.path.join(self._active_job_directory(active_status), job_id) class ManagerMonitor: """ Monitors active jobs of a StatefulManagerProxy. """ def __init__(self, stateful_manager): self.stateful_manager = stateful_manager self.active = True thread = new_thread_for_manager(self.stateful_manager, "[action=monitor]", self._run, True) self.thread = thread def shutdown(self, timeout=None): self.active = False self.thread.join(timeout) if self.thread.is_alive(): log.warn("Failed to join monitor thread [%s]" % self.thread) def _run(self): """ Main loop, repeatedly checking active jobs of stateful manager. """ while self.active: try: self._monitor_active_jobs() except Exception: log.exception("Failure in stateful manager monitor step.") def _monitor_active_jobs(self): active_job_ids = self.stateful_manager.active_jobs.active_job_ids() iteration_start = datetime.datetime.now() for active_job_id in active_job_ids: try: self._check_active_job_status(active_job_id) except Exception: log.exception("Failed checking active job status for job_id %s" % active_job_id) iteration_end = datetime.datetime.now() iteration_length = iteration_end - iteration_start if iteration_length < self.stateful_manager.min_polling_interval: to_sleep = (self.stateful_manager.min_polling_interval - iteration_length) microseconds = to_sleep.microseconds + (to_sleep.seconds + to_sleep.days * 24 * 3600) * (10 ** 6) total_seconds = microseconds / (10 ** 6) time.sleep(total_seconds) def _check_active_job_status(self, active_job_id): # Manager itself will handle state transitions when status changes, # just need to poll get_status self.stateful_manager.get_status(active_job_id) def new_thread_for_job(manager, action, job_id, target, daemon): name = "[action={}]-[job={}]".format(action, job_id) return new_thread_for_manager(manager, name, target, daemon) def new_thread_for_manager(manager, name, target, daemon): thread_name = "[manager={}]-{}".format(manager.name, name) thread = threading.Thread(name=thread_name, target=target) thread.daemon = daemon thread.start() return thread __all__ = ('StatefulManagerProxy',)