Source code for pulsar.manager_endpoint_util

""" Composite actions over managers shared between HTTP endpoint (routes.py)
and message queue.
"""

import logging
import os

from galaxy.util import unicodify

from pulsar import __version__ as pulsar_version
from pulsar.client.setup_handler import build_job_config
from pulsar.managers import (
    PULSAR_UNKNOWN_RETURN_CODE,
    status,
)
from pulsar.managers.staging import realized_dynamic_file_sources
from pulsar.managers.stateful import ACTIVE_STATUS_PREPROCESSING

log = logging.getLogger(__name__)


[docs] def status_dict(manager, job_id): job_status = manager.get_status(job_id) return full_status(manager, job_status, job_id)
[docs] def full_status(manager, job_status, job_id): if status.is_job_done(job_status): full_status = __job_complete_dict(job_status, manager, job_id) else: full_status = {"complete": "false", "status": job_status, "job_id": job_id} return full_status
def __job_complete_dict(complete_status, manager, job_id): """ Build final dictionary describing completed job for consumption by Pulsar client. """ return_code = manager.return_code(job_id) if return_code == PULSAR_UNKNOWN_RETURN_CODE: return_code = None stdout_contents = unicodify(manager.stdout_contents(job_id)) stderr_contents = unicodify(manager.stderr_contents(job_id)) job_stdout_contents = unicodify(manager.job_stdout_contents(job_id).decode("utf-8")) job_stderr_contents = unicodify(manager.job_stderr_contents(job_id).decode("utf-8")) job_directory = manager.job_directory(job_id) as_dict = dict( job_id=job_id, complete="true", # Is this still used or is it legacy. status=complete_status, returncode=return_code, stdout=stdout_contents, stderr=stderr_contents, job_stdout=job_stdout_contents, job_stderr=job_stderr_contents, working_directory=job_directory.working_directory(), metadata_directory=job_directory.metadata_directory(), job_directory=job_directory.job_directory, working_directory_contents=job_directory.working_directory_contents(), metadata_directory_contents=job_directory.metadata_directory_contents(), outputs_directory_contents=job_directory.outputs_directory_contents(), job_directory_contents=job_directory.job_directory_contents(), system_properties=manager.system_properties(), pulsar_version=pulsar_version, realized_dynamic_file_sources=realized_dynamic_file_sources(job_directory) ) return as_dict
[docs] def submit_job(manager, job_config): """ Launch new job from specified config. May have been previously 'setup' if 'setup_params' in job_config is empty. """ # job_config is raw dictionary from JSON (from MQ or HTTP endpoint). job_id = job_config.get('job_id') if job_id and _is_duplicate_setup(manager, job_id): log.info( "Ignoring duplicate setup message for job_id %s (launch_config already " "persisted; this is most likely an MQ redelivery after Pulsar restart).", job_id, ) return try: command_line = job_config.get('command_line') setup_params = job_config.get('setup_params', {}) force_setup = job_config.get('setup') remote_staging = job_config.get('remote_staging', {}) dependencies_description = job_config.get('dependencies_description', None) env = job_config.get('env', []) submit_params = job_config.get('submit_params', {}) touch_outputs = job_config.get('touch_outputs', []) dynamic_file_sources = job_config.get("dynamic_file_sources", None) token_endpoint = job_config.get("token_endpoint", None) job_config = None if setup_params or force_setup: input_job_id = setup_params.get("job_id", job_id) tool_id = setup_params.get("tool_id", None) tool_version = setup_params.get("tool_version", None) job_config = setup_job( manager, input_job_id, tool_id, tool_version, ) if job_config is not None: job_directory = job_config["job_directory"] jobs_directory = os.path.abspath(os.path.join(job_directory, os.pardir)) command_line = command_line.replace('__PULSAR_JOBS_DIRECTORY__', jobs_directory) # TODO: Handle __PULSAR_JOB_DIRECTORY__ config files, metadata files, etc... manager.touch_outputs(job_id, touch_outputs) launch_config = { "remote_staging": remote_staging, "command_line": command_line, "dependencies_description": dependencies_description, "submit_params": submit_params, "env": env, "setup_params": setup_params, "dynamic_file_sources": dynamic_file_sources, "token_endpoint": token_endpoint, } manager.preprocess_and_launch(job_id, launch_config) except Exception: manager.handle_failure_before_launch(job_id) raise
[docs] def setup_job(manager, job_id, tool_id, tool_version): """ Setup new job from these inputs and return dict summarizing state (used to configure command line). """ job_id = manager.setup_job(job_id, tool_id, tool_version) return build_job_config( job_id=job_id, job_directory=manager.job_directory(job_id), system_properties=manager.system_properties(), tool_id=tool_id, tool_version=tool_version )
def _is_duplicate_setup(manager, job_id: str) -> bool: """Detect a redelivered setup message for a job that has already been launched. Setup messages can be redelivered when Pulsar restarts after acking an AMQP setup but before the broker recorded the ack, or any time the consumer crashes mid-processing. We only short-circuit when the job is in a state recovery can finish on its own — i.e. either the job has reached a terminal status, or it is still tracked by ``active_jobs`` and ``recover_active_jobs`` will resume it. If the prior run crashed *between* persisting ``launch_config`` and activating the job there is no recovery hook, so we let the redelivered message drive a fresh ``preprocess_and_launch``. """ try: job_directory = manager.job_directory(job_id) except (TypeError, ValueError, OSError): # Malformed job_id from a redelivered or corrupt message body. return False if not job_directory.exists(): return False if job_directory.has_metadata("final_status"): return True active_jobs = manager.active_jobs try: if job_id in set(active_jobs.active_job_ids()): return True if job_id in set(active_jobs.active_job_ids(active_status=ACTIVE_STATUS_PREPROCESSING)): return True except OSError: # active_job_ids() reads filesystem state; if the persistence # directory is unreadable we can't tell if it's a duplicate, so # let preprocess_and_launch run again. pass return False