Source code for pulsar.managers.queued

import multiprocessing
import os
import queue
import threading
import traceback
from logging import getLogger

from pulsar.client.util import MonitorStyle
from pulsar.managers.unqueued import Manager

log = getLogger(__name__)

STOP_SIGNAL = object()
RUN = object()
# Number of concurrent jobs used by default for
# QueueManager.
DEFAULT_NUM_CONCURRENT_JOBS = 1

JOB_FILE_COMMAND_LINE = "command_line"


[docs] class QueueManager(Manager): """ A job manager that queues up jobs directly (i.e. does not use an external queuing software such PBS, SGE, etc...). """ manager_type = "queued_python" def __init__(self, name, app, **kwds): super().__init__(name, app, **kwds) num_concurrent_jobs = kwds.get('num_concurrent_jobs', DEFAULT_NUM_CONCURRENT_JOBS) if num_concurrent_jobs == '*': num_concurrent_jobs = multiprocessing.cpu_count() else: num_concurrent_jobs = int(num_concurrent_jobs) self._init_worker_threads(num_concurrent_jobs) def _init_worker_threads(self, num_concurrent_jobs): self.work_queue = queue.Queue() self.work_threads = [] for _ in range(num_concurrent_jobs): worker = threading.Thread(target=self.run_next) worker.daemon = True worker.start() self.work_threads.append(worker)
[docs] def launch(self, job_id, command_line, submit_params=None, dependencies_description=None, env=[], setup_params=None): command_line = self._prepare_run( job_id, command_line, dependencies_description=dependencies_description, env=env, setup_params=setup_params ) try: self._write_command_line(job_id, command_line) except Exception: log.info("Failed to persist command line for job %s, will not be able to recover." % job_id) self.work_queue.put((RUN, (job_id, command_line)))
def _recover_active_job(self, job_id): command_line = self.read_command_line(job_id) if command_line: self.work_queue.put((RUN, (job_id, command_line))) else: raise Exception("Cannot recover job with id %s" % job_id)
[docs] def shutdown(self, timeout=None): for _ in range(len(self.work_threads)): self.work_queue.put((STOP_SIGNAL, None)) for worker in self.work_threads: worker.join(timeout) if worker.is_alive(): log.warn("Failed to stop worker thread [%s]" % worker)
[docs] def run_next(self): """ Run the next item in the queue (a job waiting to run). """ while 1: (op, obj) = self.work_queue.get() if op is STOP_SIGNAL: return try: (job_id, command_line) = obj try: os.remove(self._job_file(job_id, JOB_FILE_COMMAND_LINE)) except Exception: log.exception("Running command but failed to delete - command may rerun on Pulsar boot.") # _run will not do anything if job has been cancelled. self._run(job_id, command_line, montior=MonitorStyle.FOREGROUND) except Exception: log.warn("Uncaught exception running job with job_id %s" % job_id) traceback.print_exc()