Source code for pulsar.client.staging.up

from enum import Enum
from logging import getLogger
from os import sep
from os.path import (
    abspath,
    basename,
    dirname,
    exists,
    join,
    relpath,
)
from re import (
    escape,
    findall,
)

from ..action_mapper import (
    FileActionMapper,
    MessageAction,
    path_type,
)
from ..job_directory import RemoteJobDirectory
from ..staging import (
    CLIENT_INPUT_PATH_TYPES,
    COMMAND_VERSION_FILENAME,
)
from ..util import (
    directory_files,
    ExternalId,
    PathHelper,
)

log = getLogger(__name__)


[docs] def submit_job(client, client_job_description, job_config=None): """ """ file_stager = FileStager(client, client_job_description, job_config) rebuilt_command_line = file_stager.get_command_line() job_id = file_stager.job_id launch_kwds = dict( command_line=rebuilt_command_line, dependencies_description=client_job_description.dependencies_description, env=client_job_description.env, ) container_info = None if client_job_description.container: container_info = { "container_id": client_job_description.container, } container_info["guest_ports"] = client_job_description.guest_ports launch_kwds["container_info"] = container_info if client_job_description.remote_pulsar_app_config: launch_kwds["pulsar_app_config"] = client_job_description.remote_pulsar_app_config if file_stager.job_config: launch_kwds["job_config"] = file_stager.job_config remote_staging = {} remote_staging_actions = file_stager.transfer_tracker.remote_staging_actions if remote_staging_actions: remote_staging["setup"] = remote_staging_actions # Somehow make the following optional. remote_staging["action_mapper"] = file_stager.action_mapper.to_dict() remote_staging["client_outputs"] = client_job_description.client_outputs.to_dict() if remote_staging: launch_kwds["remote_staging"] = remote_staging # potentially duplicated but we don't want to count on remote staging to include this # it needs to be in the response to Pulsar even Pulsar is inititing staging actions launch_kwds["dynamic_file_sources"] = client_job_description.client_outputs.dynamic_file_sources launch_kwds["token_endpoint"] = client.token_endpoint # for pulsar modalities that skip the explicit "setup" step, give them a chance to set an external # id from the submission process (e.g. to TES). launch_response = client.launch(**launch_kwds) if isinstance(launch_response, ExternalId): job_id = launch_response.external_id return job_id
class StageDirectoryType(Enum): CONTENTS = 1 # transfer just the contents of the directory WHOLE_DIRECTORY = 2 # transfer the whole directory into the target, preserve name class FileStager: """ Objects of the FileStager class interact with an Pulsar client object to stage the files required to run jobs on a remote Pulsar server. **Parameters** client : JobClient Pulsar client object. client_job_description : client_job_description Description of client view of job to stage and execute remotely. """ def __init__(self, client, client_job_description, job_config): """ """ self.client = client self.command_line = client_job_description.command_line self.config_files = client_job_description.config_files self.client_inputs = client_job_description.client_inputs self.output_files = client_job_description.output_files if client_job_description.tool is not None: self.tool_id = client_job_description.tool.id self.tool_version = client_job_description.tool.version self.tool_dir = abspath(client_job_description.tool.tool_dir) else: self.tool_id = None self.tool_version = None self.tool_dir = None self.working_directory = client_job_description.working_directory self.metadata_directory = client_job_description.metadata_directory self.version_file = client_job_description.version_file self.arbitrary_files = client_job_description.arbitrary_files self.rewrite_paths = client_job_description.rewrite_paths self.job_directory_files = client_job_description.job_directory_files self.tool_directory_required_files = client_job_description.tool_directory_required_files # Setup job inputs, these will need to be rewritten before # shipping off to remote Pulsar server. self.job_inputs = JobInputs(self.command_line, self.config_files) self.action_mapper = FileActionMapper(client) self.__handle_setup(job_config) self.__setup_touch_outputs(client_job_description.touch_outputs) self.transfer_tracker = TransferTracker( client, self.path_helper, self.action_mapper, self.job_inputs, self.rewrite_paths, self.job_directory, ) self.__initialize_referenced_tool_files() if self.rewrite_paths: self.__initialize_referenced_arbitrary_files() self.__upload_tool_files() self.__upload_job_directory_files() self.__upload_input_files() self.__upload_working_directory_files() self.__upload_metadata_directory_files() self.__upload_arbitrary_files() if self.rewrite_paths: self.__initialize_output_file_renames() self.__initialize_task_output_file_renames() self.__initialize_config_file_renames() self.__initialize_version_file_rename() self.__handle_rewrites() self.__upload_rewritten_config_files() def __handle_setup(self, job_config): if not job_config: job_config = self.client.setup(self.tool_id, self.tool_version) self.new_working_directory = job_config['working_directory'] self.new_outputs_directory = job_config['outputs_directory'] self.new_tool_directory = job_config.get('tools_directory', None) self.new_configs_directory = job_config['configs_directory'] self.remote_separator = self.__parse_remote_separator(job_config) self.path_helper = PathHelper(self.remote_separator) # If remote Pulsar server assigned job id, use that otherwise # just use local job_id assigned. galaxy_job_id = self.client.job_id self.job_id = job_config.get('job_id', galaxy_job_id) if self.job_id != galaxy_job_id: # Remote Pulsar server assigned an id different than the # Galaxy job id, update client to reflect this. self.client.assign_job_id(self.job_id) self.job_config = job_config self.job_directory = self.__setup_job_directory() def __setup_touch_outputs(self, touch_outputs): self.job_config['touch_outputs'] = touch_outputs def __parse_remote_separator(self, job_config): separator = job_config.get("system_properties", {}).get("separator", None) if not separator: # Legacy Pulsar separator = job_config["path_separator"] # Poorly named return separator def __setup_job_directory(self): if self.client.job_directory: return self.client.job_directory elif self.job_config.get('job_directory', None): return RemoteJobDirectory( remote_staging_directory=self.job_config['job_directory'], remote_id=None, remote_sep=self.remote_separator, ) else: return None def __initialize_referenced_tool_files(self): if self.tool_directory_required_files: self.referenced_tool_files = [(join(self.tool_dir, x), x) for x in self.tool_directory_required_files.find_required_files(self.tool_dir)] else: # Was this following line only for interpreter, should we disable it of 16.04+ tools self.referenced_tool_files = [(x, None) for x in self.job_inputs.find_referenced_subfiles(self.tool_dir)] # If the tool was created with a correct $__tool_directory__ find those files and transfer new_tool_directory = self.new_tool_directory if not new_tool_directory: return for potential_tool_file in self.job_inputs.find_referenced_subfiles(new_tool_directory): local_file = potential_tool_file.replace(new_tool_directory, self.tool_dir) if exists(local_file): self.referenced_tool_files.append((local_file, None)) def __initialize_referenced_arbitrary_files(self): referenced_arbitrary_path_mappers = dict() for mapper in self.action_mapper.unstructured_mappers(): mapper_pattern = mapper.to_pattern() # TODO: Make more sophisticated, allow parent directories, # grabbing sibbling files based on patterns, etc... paths = self.job_inputs.find_pattern_references(mapper_pattern) for path in paths: if path not in referenced_arbitrary_path_mappers: referenced_arbitrary_path_mappers[path] = mapper for path, mapper in referenced_arbitrary_path_mappers.items(): action = self.action_mapper.action({"path": path}, path_type.UNSTRUCTURED, mapper) unstructured_map = action.unstructured_map(self.path_helper) self.arbitrary_files.update(unstructured_map) def __upload_tool_files(self): for (referenced_tool_file, name) in self.referenced_tool_files: self.transfer_tracker.handle_transfer_path(referenced_tool_file, path_type.TOOL, name=name) def __upload_job_directory_files(self): for job_directory_file in self.job_directory_files: self.transfer_tracker.handle_transfer_path(job_directory_file, path_type.JOBDIR) def __upload_arbitrary_files(self): for path, name in self.arbitrary_files.items(): self.transfer_tracker.handle_transfer_path(path, path_type.UNSTRUCTURED, name=name) def __upload_input_files(self): handled_inputs = set() for client_input in self.client_inputs: # TODO: use object identity to handle this. path = client_input.path if path in handled_inputs: continue if client_input.input_type == CLIENT_INPUT_PATH_TYPES.INPUT_PATH: self.__upload_input_file(client_input.action_source) handled_inputs.add(path) elif client_input.input_type == CLIENT_INPUT_PATH_TYPES.INPUT_EXTRA_FILES_PATH: self.__upload_input_extra_files(client_input.action_source) handled_inputs.add(path) elif client_input.input_type == CLIENT_INPUT_PATH_TYPES.INPUT_METADATA_PATH: self.__upload_input_metadata_file(client_input.action_source) handled_inputs.add(path) else: raise NotImplementedError() def __upload_input_file(self, input_action_source): if self.__stage_input(input_action_source): self.transfer_tracker.handle_transfer_source(input_action_source, path_type.INPUT) def __upload_input_extra_files(self, input_action_source): if self.__stage_input(input_action_source): # TODO: needs to happen else where if using remote object store staging # but we don't have the action type yet. self.transfer_tracker.handle_transfer_directory( path_type.INPUT, action_source=input_action_source, mode=StageDirectoryType.WHOLE_DIRECTORY ) def __upload_input_metadata_file(self, input_action_source): if self.__stage_input(input_action_source): # Name must match what is generated in remote_input_path_rewrite in path_mapper. remote_name = "metadata_%s" % basename(input_action_source['path']) self.transfer_tracker.handle_transfer_source(input_action_source, path_type.INPUT, name=remote_name) def __upload_working_directory_files(self): # Task manager stages files into working directory, these need to be # uploaded if present. directory = self.working_directory if directory and exists(directory): self.transfer_tracker.handle_transfer_directory(path_type.WORKDIR, directory=directory) def __upload_metadata_directory_files(self): directory = self.metadata_directory if directory and exists(directory): self.transfer_tracker.handle_transfer_directory(path_type.METADATA, directory=directory) def __initialize_version_file_rename(self): version_file = self.version_file if version_file: remote_path = self.path_helper.remote_join(self.new_outputs_directory, COMMAND_VERSION_FILENAME) self.transfer_tracker.register_rewrite(version_file, remote_path, path_type.OUTPUT) def __initialize_output_file_renames(self): for output_file in self.output_files: remote_path = self.path_helper.remote_join(self.new_outputs_directory, basename(output_file)) self.transfer_tracker.register_rewrite(output_file, remote_path, path_type.OUTPUT) def __initialize_task_output_file_renames(self): for output_file in self.output_files: name = basename(output_file) task_file = join(self.working_directory, name) remote_path = self.path_helper.remote_join(self.new_working_directory, name) self.transfer_tracker.register_rewrite(task_file, remote_path, path_type.OUTPUT_WORKDIR) def __initialize_config_file_renames(self): for config_file in self.config_files: remote_path = self.path_helper.remote_join(self.new_configs_directory, basename(config_file)) self.transfer_tracker.register_rewrite(config_file, remote_path, path_type.CONFIG) def __handle_rewrites(self): """ For each file that has been transferred and renamed, updated command_line and configfiles to reflect that rewrite. """ self.transfer_tracker.rewrite_input_paths() def __upload_rewritten_config_files(self): for config_file, new_config_contents in self.job_inputs.config_files.items(): self.transfer_tracker.handle_transfer_path(config_file, type=path_type.CONFIG, contents=new_config_contents) def get_command_line(self): """ Returns the rewritten version of the command line to execute suitable for remote host. """ return self.job_inputs.command_line def __stage_input(self, source): if not self.rewrite_paths: return True # If we have disabled path rewriting, just assume everything needs to be transferred, # else check to ensure the file is referenced before transferring it. return self.job_inputs.path_referenced(source['path']) class JobInputs: """ Abstractions over dynamic inputs created for a given job (namely the command to execute and created configfiles). **Parameters** command_line : str Local command to execute for this job. (To be rewritten.) config_files : str Config files created for this job. (To be rewritten.) >>> import tempfile >>> tf = tempfile.NamedTemporaryFile() >>> def setup_inputs(tf): ... open(tf.name, "w").write(u'''world /path/to/input '/path/to/moo' "/path/to/cow" the rest''') ... inputs = JobInputs(u"hello /path/to/input", [tf.name]) ... return inputs >>> inputs = setup_inputs(tf) >>> inputs.rewrite_paths(u"/path/to/input", u'C:\\input') >>> inputs.command_line == u'hello C:\\\\input' True >>> inputs.config_files[tf.name] == u'''world C:\\\\input '/path/to/moo' "/path/to/cow" the rest''' True >>> tf.close() >>> tf = tempfile.NamedTemporaryFile() >>> inputs = setup_inputs(tf) >>> sorted(inputs.find_referenced_subfiles('/path/to')) == [u'/path/to/cow', u'/path/to/input', u'/path/to/moo'] True >>> inputs.path_referenced('/path/to') True >>> inputs.path_referenced(u'/path/to') True >>> inputs.path_referenced('/path/to/input') True >>> inputs.path_referenced('/path/to/notinput') False >>> tf.close() """ def __init__(self, command_line, config_files): self.command_line = command_line self.config_files = {} for config_file in config_files or []: config_contents = _read(config_file) self.config_files[config_file] = config_contents def find_pattern_references(self, pattern): referenced_files = set() for input_contents in self.__items(): referenced_files.update(findall(pattern, input_contents)) return list(referenced_files) def find_referenced_subfiles(self, directory): """ Return list of files below specified `directory` in job inputs. Could use more sophisticated logic (match quotes to handle spaces, handle subdirectories, etc...). **Parameters** directory : str Full path to directory to search. """ if directory is None: return [] pattern = r'''[\'\"]?({}{}[^\s\'\"]+)[\'\"]?'''.format(escape(directory), escape(sep)) return self.find_pattern_references(pattern) def path_referenced(self, path): pattern = r"%s" % path found = False for input_contents in self.__items(): if findall(pattern, input_contents): found = True break return found def rewrite_paths(self, local_path, remote_path): """ Rewrite references to `local_path` with `remote_path` in job inputs. """ self.__rewrite_command_line(local_path, remote_path) self.__rewrite_config_files(local_path, remote_path) def __rewrite_command_line(self, local_path, remote_path): self.command_line = self.command_line.replace(local_path, remote_path) def __rewrite_config_files(self, local_path, remote_path): for config_file, contents in self.config_files.items(): self.config_files[config_file] = contents.replace(local_path, remote_path) def __items(self): items = [self.command_line] items.extend(self.config_files.values()) return items class TransferTracker: def __init__(self, client, path_helper, action_mapper, job_inputs, rewrite_paths, job_directory): self.client = client self.path_helper = path_helper self.action_mapper = action_mapper self.job_inputs = job_inputs self.rewrite_paths = rewrite_paths self.job_directory = job_directory self.file_renames = {} self.remote_staging_actions = [] def handle_transfer_path(self, path, type, name=None, contents=None): source = {"path": path} return self.handle_transfer_source(source, type, name=name, contents=contents) def handle_transfer_directory(self, type, directory=None, action_source=None, mode: StageDirectoryType = StageDirectoryType.CONTENTS): # TODO: needs to happen else where if using remote object store staging # but we don't have the action type yet. if directory is None: assert action_source is not None action = self.__action_for_transfer(action_source, type, None) if not action.staging_action_local and action.whole_directory_transfer_supported: # If we're going to transfer the whole directory remotely, don't walk the files # here. # We could still rewrite paths and just not transfer the files. assert not self.rewrite_paths self.__add_remote_staging_input(action, None, type) return directory = action_source['path'] else: assert action_source is None for directory_file_name in directory_files(directory): directory_file_path = join(directory, directory_file_name) rel_path_to = directory if mode == StageDirectoryType.CONTENTS else dirname(directory) remote_name = self.path_helper.remote_name(relpath(directory_file_path, rel_path_to)) self.handle_transfer_path(directory_file_path, type, name=remote_name) def handle_transfer_source(self, source, type, name=None, contents=None): action = self.__action_for_transfer(source, type, contents) if action.staging_needed: local_action = action.staging_action_local if local_action: path = source['path'] if not exists(path): message = "Pulsar: __upload_input_file called on empty or missing dataset." + \ " No such file: [%s]" % path log.debug(message) return response = self.client.put_file(path, type, name=name, contents=contents, action_type=action.action_type) def get_path(): return response['path'] else: path = source['path'] job_directory = self.job_directory assert job_directory, "job directory required for action %s" % action if not name: # TODO: consider fetching this from source so an actual input path # isn't needed. At least it isn't used though. name = basename(path) self.__add_remote_staging_input(action, name, type) def get_path(): return job_directory.calculate_path(name, type) register = self.rewrite_paths or type == 'tool' # Even if inputs not rewritten, tool must be. if register: self.register_rewrite_action(action, get_path(), force=True) elif self.rewrite_paths: path_rewrite = action.path_rewrite(self.path_helper) if path_rewrite: self.register_rewrite_action(action, path_rewrite, force=True) # else: # No action for this file def __add_remote_staging_input(self, action, name, type): input_dict = dict( name=name, type=type, action=action.to_dict(), ) self.remote_staging_actions.append(input_dict) def __action_for_transfer(self, source, type, contents): if contents: # If contents loaded in memory, no need to write out file and copy, # just transfer. action = MessageAction(contents=contents, client=self.client) else: path = source.get("path") if path is not None and not exists(path): message = "__action_for_transfer called on non-existent file - [%s]" % path log.warn(message) raise Exception(message) action = self.__action(source, type) return action def register_rewrite(self, local_path, remote_path, type, force=False): action = self.__action({"path": local_path}, type) self.register_rewrite_action(action, remote_path, force=force) def register_rewrite_action(self, action, remote_path, force=False): if action.staging_needed or force: path = getattr(action, 'path', None) if path: self.file_renames[path] = remote_path def rewrite_input_paths(self): """ For each file that has been transferred and renamed, updated command_line and configfiles to reflect that rewrite. """ for local_path, remote_path in self.file_renames.items(): self.job_inputs.rewrite_paths(local_path, remote_path) def __action(self, source, type): return self.action_mapper.action(source, type) def _read(path): """ Utility method to quickly read small files (config files and tool wrappers) into memory as bytes. """ input = open(path, encoding="utf-8") try: return input.read() finally: input.close() __all__ = ['submit_job']