Source code for pulsar.managers.base.directory

import logging
import os
import stat

from galaxy.util import asbool

from pulsar.managers import PULSAR_UNKNOWN_RETURN_CODE
from pulsar.managers.base import BaseManager
from ..util.env import env_to_statement
from ..util.job_script import job_script

log = logging.getLogger(__name__)

# TODO: Rename these to abstract out the fact they are files - pulsar
# should be able to replace metadata backing with non-file stuff now that
# the abstractions are fairly well utilized.
JOB_FILE_RETURN_CODE = "return_code"
TOOL_FILE_STANDARD_OUTPUT = os.path.join("metadata", "tool_stdout")
TOOL_FILE_STANDARD_ERROR = os.path.join("metadata", "tool_stderr")
JOB_FILE_STANDARD_OUTPUT = os.path.join("metadata", "job_stdout")
JOB_FILE_STANDARD_ERROR = os.path.join("metadata", "job_stderr")
JOB_FILE_TOOL_ID = "tool_id"
JOB_FILE_TOOL_VERSION = "tool_version"
JOB_FILE_CANCELLED = "cancelled"
JOB_FILE_COMMAND_LINE = "command_line"
CREATE_TMP_PATTERN = '''$([ ! -e '{0}/tmp' ] || mv '{0}/tmp' '{0}'/tmp.$(date +%Y%m%d-%H%M%S) ; mkdir '{0}/tmp'; echo '{0}/tmp')'''


[docs] class DirectoryBaseManager(BaseManager): def _job_file(self, job_id, name): return self._job_directory(job_id)._job_file(name)
[docs] def return_code(self, job_id): return_code_str = self._read_job_file(job_id, JOB_FILE_RETURN_CODE, default=PULSAR_UNKNOWN_RETURN_CODE) return int(return_code_str) if return_code_str and return_code_str != PULSAR_UNKNOWN_RETURN_CODE else return_code_str
[docs] def stdout_contents(self, job_id): try: return self._read_job_file(job_id, TOOL_FILE_STANDARD_OUTPUT, size=self.maximum_stream_size) except FileNotFoundError: # Could be old job finishing up, drop in 2024? return self._read_job_file(job_id, "tool_stdout", size=self.maximum_stream_size, default=b"")
[docs] def stderr_contents(self, job_id): try: return self._read_job_file(job_id, TOOL_FILE_STANDARD_ERROR, size=self.maximum_stream_size) except FileNotFoundError: # Could be old job finishing up, drop in 2024? return self._read_job_file(job_id, "tool_stderr", size=self.maximum_stream_size, default=b"")
[docs] def job_stdout_contents(self, job_id): return self._read_job_file(job_id, JOB_FILE_STANDARD_OUTPUT, size=self.maximum_stream_size, default=b"")
[docs] def job_stderr_contents(self, job_id): return self._read_job_file(job_id, JOB_FILE_STANDARD_ERROR, size=self.maximum_stream_size, default=b"")
[docs] def read_command_line(self, job_id): command_line = self._read_job_file(job_id, JOB_FILE_COMMAND_LINE) if command_line.startswith(b'"'): # legacy JSON... import json command_line = json.loads(command_line) return command_line
def _tool_stdout_path(self, job_id): return self._job_file(job_id, TOOL_FILE_STANDARD_OUTPUT) def _tool_stderr_path(self, job_id): return self._job_file(job_id, TOOL_FILE_STANDARD_ERROR) def _job_stdout_path(self, job_id): return self._job_file(job_id, JOB_FILE_STANDARD_OUTPUT) def _job_stderr_path(self, job_id): return self._job_file(job_id, JOB_FILE_STANDARD_ERROR) def _return_code_path(self, job_id): return self._job_file(job_id, JOB_FILE_RETURN_CODE) def _setup_job_for_job_id(self, job_id, tool_id, tool_version): self._setup_job_directory(job_id) tool_id = str(tool_id) if tool_id else "" tool_version = str(tool_version) if tool_version else "" authorization = self._get_authorization(job_id, tool_id) authorization.authorize_setup() self._write_tool_info(job_id, tool_id, tool_version) return job_id def _read_job_file(self, job_id, name, **kwds): return self._job_directory(job_id).read_file(name, **kwds) def _write_job_file(self, job_id, name, contents): return self._job_directory(job_id).write_file(name, contents) def _write_return_code_if_unset(self, job_id, return_code): return_code_str = self._read_job_file(job_id, JOB_FILE_RETURN_CODE, default=PULSAR_UNKNOWN_RETURN_CODE) if return_code_str == PULSAR_UNKNOWN_RETURN_CODE: self._write_job_file(job_id, JOB_FILE_RETURN_CODE, str(return_code)) def _write_tool_info(self, job_id, tool_id, tool_version): job_directory = self._job_directory(job_id) job_directory.store_metadata(JOB_FILE_TOOL_ID, tool_id) job_directory.store_metadata(JOB_FILE_TOOL_VERSION, tool_version) def _write_command_line(self, job_id, command_line): self._write_job_file(job_id, JOB_FILE_COMMAND_LINE, command_line) def _record_cancel(self, job_id): try: self._job_directory(job_id).store_metadata(JOB_FILE_CANCELLED, True) except Exception: log.info("Failed to record job with id %s was cancelled." % job_id) def _was_cancelled(self, job_id): try: return self._job_directory(job_id).load_metadata(JOB_FILE_CANCELLED, None) except Exception: log.info("Failed to determine if job with id %s was cancelled, assuming no." % job_id) return False def _open_job_standard_output(self, job_id): return self._job_directory(job_id).open_file(JOB_FILE_STANDARD_OUTPUT, 'w') def _open_job_standard_error(self, job_id): return self._job_directory(job_id).open_file(JOB_FILE_STANDARD_ERROR, 'w') def _check_execution_with_tool_file(self, job_id, command_line): tool_id = self._tool_id(job_id) self._check_execution(job_id, tool_id, command_line) def _tool_id(self, job_id): tool_id = None job_directory = self._job_directory(job_id) if job_directory.has_metadata(JOB_FILE_TOOL_ID): tool_id = job_directory.load_metadata(JOB_FILE_TOOL_ID) return tool_id def _expand_command_line(self, job_id, command_line: str, dependencies_description, job_directory=None) -> str: command_line = super()._expand_command_line( job_id, command_line, dependencies_description, job_directory=job_directory ) if not self._is_windows: rc_path = self._return_code_path(job_id) CAPTURE_RETURN_CODE = "return_code=$?" command_line = f"{command_line}; {CAPTURE_RETURN_CODE}; echo $return_code > {rc_path};" return command_line # Helpers methods related to setting up job script files. def _setup_job_file(self, job_id, command_line, dependencies_description=None, env=[], setup_params=None): command_line = self._expand_command_line( job_id, command_line, dependencies_description, job_directory=self.job_directory(job_id).job_directory ) script_env = self._job_template_env(job_id, command_line=command_line, env=env, setup_params=setup_params) script = job_script(**script_env) return self._write_job_script(job_id, script) def _tmp_dir(self, job_id: str): # Code stolen from Galaxy's job wrapper. tmp_dir = self.tmp_dir try: if not tmp_dir or asbool(tmp_dir): working_directory = self.job_directory(job_id).job_directory return CREATE_TMP_PATTERN.format(working_directory) else: return tmp_dir except ValueError: # Catch case where tmp_dir is a complex expression and not a boolean value return tmp_dir def _job_template_env(self, job_id, command_line=None, env=[], setup_params=None): # TODO: Add option to ignore remote env. env = env + self.env_vars setup_params = setup_params or {} env_setup_commands = map(env_to_statement, env) job_template_env = { 'job_instrumenter': self.job_metrics.default_job_instrumenter, 'galaxy_virtual_env': self._galaxy_virtual_env(), 'galaxy_lib': self._galaxy_lib(), 'preserve_python_environment': setup_params.get('preserve_galaxy_python_environment', False), 'env_setup_commands': env_setup_commands, # job_diredctory not used by job_script and it calls the job directory working directory 'working_directory': self.job_directory(job_id).working_directory(), 'metadata_directory': self.job_directory(job_id).metadata_directory(), 'home_directory': self.job_directory(job_id).home_directory(), 'job_id': job_id, 'tmp_dir_creation_statement': self._tmp_dir(job_id), } if command_line: job_template_env['command'] = command_line return job_template_env def _write_job_script(self, job_id, contents): self._write_job_file(job_id, "command.sh", contents) script_path = self._job_file(job_id, "command.sh") os.chmod(script_path, stat.S_IEXEC | stat.S_IWRITE | stat.S_IREAD) return script_path