Source code for pulsar.managers.staging.post

"""
"""
import logging
import os

from pulsar.client import (
    action_mapper,
    staging,
)
from pulsar.client.staging import PulsarOutputs
from pulsar.client.staging.down import ResultsCollector

log = logging.getLogger(__name__)


[docs] def postprocess(job_directory, action_executor): # Returns True if outputs were collected. try: if job_directory.has_metadata("launch_config"): staging_config = job_directory.load_metadata("launch_config").get("remote_staging", None) else: staging_config = None collected = __collect_outputs(job_directory, staging_config, action_executor) return collected finally: job_directory.write_file("postprocessed", "") return False
def __collect_outputs(job_directory, staging_config, action_executor): collected = True if "action_mapper" in staging_config: file_action_mapper = action_mapper.FileActionMapper(config=staging_config["action_mapper"]) client_outputs = staging.ClientOutputs.from_dict(staging_config["client_outputs"]) pulsar_outputs = __pulsar_outputs(job_directory) output_collector = PulsarServerOutputCollector(job_directory, action_executor) results_collector = ResultsCollector(output_collector, file_action_mapper, client_outputs, pulsar_outputs) collection_failure_exceptions = results_collector.collect() if collection_failure_exceptions: log.warn("Failures collecting results %s" % collection_failure_exceptions) collected = False return collected
[docs] def realized_dynamic_file_sources(job_directory): launch_config = job_directory.load_metadata("launch_config") if launch_config is None: log.warning(f"Failed to load launch_config from: {job_directory.job_directory}") return [] dynamic_file_sources = launch_config.get("dynamic_file_sources") realized_dynamic_file_sources = [] for dynamic_file_source in (dynamic_file_sources or []): dynamic_file_source_path = dynamic_file_source["path"] realized_dynamic_file_source = dynamic_file_source.copy() dynamic_file_source_bytes = job_directory.working_directory_file_contents(dynamic_file_source_path) if dynamic_file_source_bytes is not None: dynamic_file_source_contents = dynamic_file_source_bytes.decode("utf-8") realized_dynamic_file_source["contents"] = dynamic_file_source_contents realized_dynamic_file_sources.append(realized_dynamic_file_source) return realized_dynamic_file_sources
class PulsarServerOutputCollector: def __init__(self, job_directory, action_executor): self.job_directory = job_directory self.action_executor = action_executor def collect_output(self, results_collector, output_type, action, name): # Not using input path, this is because action knows it path # in this context. if action.staging_action_local: return # Galaxy (client) will collect output. if not name: # TODO: Would not work on Windows. Any use in allowing # remote_transfer action for Windows? name = os.path.basename(action.path) pulsar_path = self.job_directory.calculate_path(name, output_type) description = "staging out file {} via {}".format(pulsar_path, action) self.action_executor.execute(lambda: action.write_from_path(pulsar_path), description) def __pulsar_outputs(job_directory): working_directory_contents = job_directory.working_directory_contents() output_directory_contents = job_directory.outputs_directory_contents() metadata_directory_contents = job_directory.metadata_directory_contents() job_directory_contents = job_directory.job_directory_contents() return PulsarOutputs( working_directory_contents, output_directory_contents, metadata_directory_contents, job_directory_contents, realized_dynamic_file_sources=realized_dynamic_file_sources(job_directory), ) __all__ = ('postprocess', 'realized_dynamic_file_sources')