Source code for pulsar.client.setup_handler

import os
from uuid import uuid4

from pulsar import __version__ as pulsar_version
from .util import filter_destination_params

REMOTE_SYSTEM_PROPERTY_PREFIX = "remote_property_"


[docs] def build(client, destination_args): """ Build a SetupHandler object for client from destination parameters. """ # Have defined a remote job directory, lets do the setup locally. if client.job_directory: handler = LocalSetupHandler(client, destination_args) else: handler = RemoteSetupHandler(client) return handler
class LocalSetupHandler: """ Parse destination params to infer job setup parameters (input/output directories, etc...). Default is to get this configuration data from the remote Pulsar server. Downside of this approach is that it requires more and more dependent configuraiton of Galaxy. Upside is that it is asynchronous and thus makes message queue driven configurations possible. Remote system properties (such as galaxy_home) can be specified in destination args by prefixing property with remote_property_ (e.g. remote_property_galaxy_home). """ def __init__(self, client, destination_args): self.client = client system_properties = self.__build_system_properties(destination_args) system_properties["separator"] = client.job_directory.separator self.system_properties = system_properties self.jobs_directory = destination_args["jobs_directory"] self.assign_ids = destination_args.get("assign_ids", "galaxy") def setup(self, job_id, tool_id=None, tool_version=None, preserve_galaxy_python_environment=None): if self.assign_ids == "uuid": job_id = uuid4().hex # Following is a gross hack but same gross hack in pulsar.client.staging.up if self.client.job_id != job_id: self.client.assign_job_id(job_id) return build_job_config( job_id=job_id, job_directory=self.client.job_directory, system_properties=self.system_properties, tool_id=tool_id, tool_version=tool_version, preserve_galaxy_python_environment=preserve_galaxy_python_environment, ) @property def local(self): """ """ return True def __build_system_properties(self, destination_params): return filter_destination_params(destination_params, REMOTE_SYSTEM_PROPERTY_PREFIX) class RemoteSetupHandler: """ Default behavior. Fetch setup information from remote Pulsar server. """ def __init__(self, client): self.client = client def setup(self, **setup_args): setup_args["use_metadata"] = "true" return self.client.remote_setup(**setup_args) @property def local(self): """ """ return False
[docs] def build_job_config(job_id, job_directory, system_properties={}, tool_id=None, tool_version=None, preserve_galaxy_python_environment=None): """ """ inputs_directory = job_directory.inputs_directory() working_directory = job_directory.working_directory() metadata_directory = job_directory.metadata_directory() outputs_directory = job_directory.outputs_directory() configs_directory = job_directory.configs_directory() tools_directory = job_directory.tool_files_directory() unstructured_files_directory = job_directory.unstructured_files_directory() sep = system_properties.get("sep", os.sep) job_config = { "job_directory": job_directory.path, "working_directory": working_directory, "metadata_directory": metadata_directory, "outputs_directory": outputs_directory, "configs_directory": configs_directory, "tools_directory": tools_directory, "inputs_directory": inputs_directory, "unstructured_files_directory": unstructured_files_directory, # Poorly named legacy attribute. Drop at some point. "path_separator": sep, "job_id": job_id, "system_properties": system_properties, "pulsar_version": pulsar_version, "preserve_galaxy_python_environment": preserve_galaxy_python_environment, } if tool_id: job_config["tool_id"] = tool_id if tool_version: job_config["tool_version"] = tool_version return job_config
__all__ = ['build_job_config', 'build']