"""Code run on the client side for unstaging complete Pulsar jobs."""
import fnmatch
from contextlib import contextmanager
from json import loads
from logging import getLogger
from os.path import (
join,
relpath,
)
from ..action_mapper import FileActionMapper
from ..staging import COMMAND_VERSION_FILENAME
log = getLogger(__name__)
[docs]
def finish_job(client, cleanup_job, job_completed_normally, client_outputs, pulsar_outputs):
"""Process for "un-staging" a complete Pulsar job.
This function is responsible for downloading results from remote
server and cleaning up Pulsar staging directory (if needed.)
"""
collection_failure_exceptions = []
if job_completed_normally:
output_collector = ClientOutputCollector(client)
action_mapper = FileActionMapper(client)
results_stager = ResultsCollector(output_collector, action_mapper, client_outputs, pulsar_outputs)
collection_failure_exceptions = results_stager.collect()
_clean(collection_failure_exceptions, cleanup_job, client)
return collection_failure_exceptions
class ClientOutputCollector:
def __init__(self, client):
self.client = client
def collect_output(self, results_collector, output_type, action, name):
# This output should have been handled by the Pulsar.
if not action.staging_action_local:
return False
working_directory = results_collector.client_outputs.working_directory
self.client.fetch_output(
path=action.path,
name=name,
working_directory=working_directory,
output_type=output_type,
action_type=action.action_type
)
return True
class ResultsCollector:
def __init__(self, output_collector, action_mapper, client_outputs, pulsar_outputs):
self.output_collector = output_collector
self.action_mapper = action_mapper
self.client_outputs = client_outputs
self.pulsar_outputs = pulsar_outputs
self.downloaded_working_directory_files = []
self.exception_tracker = DownloadExceptionTracker()
self.output_files = client_outputs.output_files
self.working_directory_contents = pulsar_outputs.working_directory_contents or []
self.metadata_directory_contents = pulsar_outputs.metadata_directory_contents or []
self.job_directory_contents = pulsar_outputs.job_directory_contents or []
def collect(self):
self.__collect_working_directory_outputs()
self.__collect_outputs()
self.__collect_version_file()
self.__collect_other_working_directory_files()
self.__collect_metadata_directory_files()
self.__collect_job_directory_files()
return self.exception_tracker.collection_failure_exceptions
def __collect_working_directory_outputs(self):
working_directory = self.client_outputs.working_directory
# Fetch explicit working directory outputs.
for source_file, output_file in self.client_outputs.work_dir_outputs:
name = relpath(source_file, working_directory)
if name not in self.working_directory_contents:
# Could be a glob
matching = fnmatch.filter(self.working_directory_contents, name)
if matching:
name = matching[0]
source_file = join(working_directory, name)
pulsar = self.pulsar_outputs.path_helper.remote_name(name)
if self._attempt_collect_output('output_workdir', path=output_file, name=pulsar):
self.downloaded_working_directory_files.append(pulsar)
# Remove from full output_files list so don't try to download directly.
try:
self.output_files.remove(output_file)
except ValueError:
raise Exception("Failed to remove {} from {}".format(output_file, self.output_files))
def __collect_outputs(self):
# Legacy Pulsar not returning list of files, iterate over the list of
# expected outputs for tool.
for output_file in self.output_files:
# Fetch output directly...
output_generated = self.pulsar_outputs.has_output_file(output_file)
if output_generated:
self._attempt_collect_output('output', output_file)
for galaxy_path, pulsar in self.pulsar_outputs.output_extras(output_file).items():
self._attempt_collect_output('output', path=galaxy_path, name=pulsar)
# else not output generated, do not attempt download.
def __collect_version_file(self):
version_file = self.client_outputs.version_file
pulsar_output_directory_contents = self.pulsar_outputs.output_directory_contents
if version_file and COMMAND_VERSION_FILENAME in pulsar_output_directory_contents:
self._attempt_collect_output('output', version_file, name=COMMAND_VERSION_FILENAME)
def __collect_other_working_directory_files(self):
self.__collect_directory_files(
self.client_outputs.working_directory,
self.working_directory_contents,
'output_workdir',
)
def __collect_metadata_directory_files(self):
self.__collect_directory_files(
self.client_outputs.metadata_directory,
self.metadata_directory_contents,
'output_metadata',
)
def __collect_job_directory_files(self):
self.__collect_directory_files(
self.client_outputs.job_directory,
self.job_directory_contents,
'output_jobdir',
)
def __realized_dynamic_file_source_references(self):
references = {"filename": [], "extra_files": []}
def record_references(from_dict):
if isinstance(from_dict, list):
for v in from_dict:
record_references(v)
elif isinstance(from_dict, dict):
for k, v in from_dict.items():
if k in references:
references[k].append(v)
if isinstance(v, (list, dict)):
record_references(v)
def parse_and_record_references(json_content):
try:
as_dict = loads(json_content)
record_references(as_dict)
except Exception as e:
log.warning("problem parsing galaxy.json %s" % e)
pass
realized_dynamic_file_sources = (self.pulsar_outputs.realized_dynamic_file_sources or [])
for realized_dynamic_file_source in realized_dynamic_file_sources:
contents = realized_dynamic_file_source["contents"]
source_type = realized_dynamic_file_source["type"]
assert source_type in ["galaxy", "legacy_galaxy"], source_type
if source_type == "galaxy":
parse_and_record_references(contents)
else:
for line in contents.splitlines():
parse_and_record_references(line)
return references
def __collect_directory_files(self, directory, contents, output_type):
if directory is None: # e.g. output_metadata_directory
return
dynamic_file_source_references = self.__realized_dynamic_file_source_references()
# Fetch remaining working directory outputs of interest.
for name in contents:
collect = False
if name in self.downloaded_working_directory_files:
continue
if self.client_outputs.dynamic_match(name):
collect = True
elif name in dynamic_file_source_references["filename"] or any(name.startswith(r) for r in dynamic_file_source_references["extra_files"]):
collect = True
if collect:
log.debug("collecting dynamic {} file {}".format(output_type, name))
output_file = join(directory, self.pulsar_outputs.path_helper.local_name(name))
if self._attempt_collect_output(output_type=output_type, path=output_file, name=name):
self.downloaded_working_directory_files.append(name)
def _attempt_collect_output(self, output_type, path, name=None):
# path is final path on galaxy server (client)
# name is the 'name' of the file on the Pulsar server (possible a relative)
# path.
collected = False
with self.exception_tracker():
action = self.action_mapper.action({"path": path}, output_type)
if self._collect_output(output_type, action, name):
collected = True
return collected
def _collect_output(self, output_type, action, name):
log.info("collecting output {} with action {}".format(name, action))
try:
return self.output_collector.collect_output(self, output_type, action, name)
except Exception as e:
if _allow_collect_failure(output_type):
log.warning(
"Allowed failure in postprocessing, will not force job failure but generally indicates a tool"
f" failure: {e}")
else:
raise
class DownloadExceptionTracker:
def __init__(self):
self.collection_failure_exceptions = []
@contextmanager
def __call__(self):
try:
yield
except Exception as e:
self.collection_failure_exceptions.append(e)
def _clean(collection_failure_exceptions, cleanup_job, client):
failed = (len(collection_failure_exceptions) > 0)
do_clean = (not failed and cleanup_job != "never") or cleanup_job == "always"
if do_clean:
message = "Cleaning up job (failed [%s], cleanup_job [%s])"
else:
message = "Skipping job cleanup (failed [%s], cleanup_job [%s])"
log.debug(message % (failed, cleanup_job))
if do_clean:
try:
client.clean()
except Exception:
log.warn("Failed to cleanup remote Pulsar job")
def _allow_collect_failure(output_type):
return output_type in ['output_workdir']
__all__ = ('finish_job',)