pulsar.client package¶
Subpackages¶
- pulsar.client.staging package
- pulsar.client.test package
- pulsar.client.transport package
Submodules¶
pulsar.client.action_mapper module¶
- class pulsar.client.action_mapper.FileActionMapper(client=None, config=None)[source]¶
Bases:
object
Objects of this class define how paths are mapped to actions.
>>> json_string = r'''{"paths": [ {"path": "/opt/galaxy", "action": "none"}, {"path": "/galaxy/data", "action": "transfer"}, {"path": "/cool/bamfiles/**/*.bam", "action": "copy", "match_type": "glob"}, {"path": ".*/dataset_\\d+.dat", "action": "copy", "match_type": "regex"} ]}''' >>> from tempfile import NamedTemporaryFile >>> from os import unlink >>> def mapper_for(default_action, config_contents): ... f = NamedTemporaryFile(delete=False) ... f.write(config_contents.encode('UTF-8')) ... f.close() ... mock_client = Bunch(default_file_action=default_action, action_config_path=f.name, files_endpoint=None) ... mapper = FileActionMapper(mock_client) ... as_dict = config=mapper.to_dict() ... mapper = FileActionMapper(config=as_dict) # Serialize and deserialize it to make sure still works ... unlink(f.name) ... return mapper >>> mapper = mapper_for(default_action='none', config_contents=json_string) >>> # Test first config line above, implicit path prefix mapper >>> action = mapper.action({'path': '/opt/galaxy/tools/filters/catWrapper.py'}, 'input') >>> action.action_type == u'none' True >>> action.staging_needed False >>> # Test another (2nd) mapper, this one with a different action >>> action = mapper.action({'path': '/galaxy/data/files/000/dataset_1.dat'}, 'input') >>> action.action_type == u'transfer' True >>> action.staging_needed True >>> # Always at least copy work_dir outputs. >>> action = mapper.action({'path': '/opt/galaxy/database/working_directory/45.sh'}, 'workdir') >>> action.action_type == u'copy' True >>> action.staging_needed True >>> # Test glob mapper (matching test) >>> mapper.action({'path': '/cool/bamfiles/projectABC/study1/patient3.bam'}, 'input').action_type == u'copy' True >>> # Test glob mapper (non-matching test) >>> mapper.action({'path': '/cool/bamfiles/projectABC/study1/patient3.bam.bai'}, 'input').action_type == u'none' True >>> # Regex mapper test. >>> mapper.action({'path': '/old/galaxy/data/dataset_10245.dat'}, 'input').action_type == u'copy' True >>> # Doesn't map unstructured paths by default >>> mapper.action({'path': '/old/galaxy/data/dataset_10245.dat'}, 'unstructured').action_type == u'none' True >>> input_only_mapper = mapper_for(default_action="none", config_contents=r'''{"paths": [ {"path": "/", "action": "transfer", "path_types": "input"} ] }''') >>> input_only_mapper.action({'path': '/dataset_1.dat'}, 'input').action_type == u'transfer' True >>> input_only_mapper.action({'path': '/dataset_1.dat'}, 'output').action_type == u'none' True >>> unstructured_mapper = mapper_for(default_action="none", config_contents=r'''{"paths": [ {"path": "/", "action": "transfer", "path_types": "*any*"} ] }''') >>> unstructured_mapper.action({'path': '/old/galaxy/data/dataset_10245.dat'}, 'unstructured').action_type == u'transfer' True >>> match_type_only_mapper = mapper_for(default_action="none", config_contents=r'''{"paths": [ {"action": "transfer", "path_types": "input"}, {"action": "remote_copy", "path_types": "output"} ] }''') >>> input_action = match_type_only_mapper.action({}, 'input') >>> input_action.action_type 'transfer' >>> output_action = match_type_only_mapper.action({}, 'output') >>> output_action.action_type 'remote_copy'
- class pulsar.client.action_mapper.MessageAction(contents, client=None)[source]¶
Bases:
object
Sort of pseudo action describing “files” store in memory and transferred via message (HTTP, Python-call, MQ, etc…)
- action_type = 'message'¶
- staging = 'default'¶
- property staging_action_local¶
- property staging_needed¶
- class pulsar.client.action_mapper.RemoteTransferAction(source, file_lister=None, url=None)[source]¶
Bases:
BaseAction
This action indicates the Pulsar server should transfer the file before execution via one of the remote transfer implementations. This is like a TransferAction, but it indicates the action requires network access to the staging server, and should be executed via ssh/rsync/etc
- action_type: str = 'remote_transfer'¶
- inject_url = True¶
- staging = 'remote'¶
pulsar.client.amqp_exchange module¶
- class pulsar.client.amqp_exchange.PulsarExchange(url, manager_name, amqp_key_prefix=None, connect_ssl=None, timeout=0.2, publish_kwds={}, publish_uuid_store=None, consume_uuid_store=None, republish_time=30)[source]¶
Bases:
object
Utility for publishing and consuming structured Pulsar queues using kombu. This is shared between the server and client - an exchange should be setup for each manager (or in the case of the client, each manager one wished to communicate with.)
Each Pulsar manager is defined solely by name in the scheme, so only one Pulsar should target each AMQP endpoint or care should be taken that unique manager names are used across Pulsar servers targeting the same AMQP endpoint - and in particular only one such Pulsar should define an default manager with name _default_.
- property acks_enabled¶
- property url¶
pulsar.client.amqp_exchange_factory module¶
pulsar.client.client module¶
- class pulsar.client.client.AwsBatchMessageCoexecutionJobClient(destination_params, job_id, client_manager)[source]¶
Bases:
BasePollingCoexecutionJobClient
,LaunchesAwsBatchContainersMixin
A client that co-executes pods via AWS Batch and depends on amqp for status updates.
- class pulsar.client.client.AwsBatchPollingCoexecutionJobClient(destination_params, job_id, client_manager)[source]¶
Bases:
BasePollingCoexecutionJobClient
,LaunchesAwsBatchContainersMixin
A client that co-executes pods via AWS Batch and doesn’t depend on amqp for status updates.
- class pulsar.client.client.BaseJobClient(destination_params, job_id)[source]¶
Bases:
object
- ensure_library_available: Callable[[], None] | None = None¶
- property prefer_local_staging¶
- class pulsar.client.client.BaseMessageCoexecutionJobClient(destination_params, job_id, client_manager)[source]¶
Bases:
BaseMessageJobClient
- pulsar_container_image: str¶
- class pulsar.client.client.BaseMessageJobClient(destination_params, job_id, client_manager)[source]¶
Bases:
BaseRemoteConfiguredJobClient
- client_manager: MessagingClientManagerProtocol¶
- class pulsar.client.client.BasePollingCoexecutionJobClient(destination_params, job_id, client_manager)[source]¶
Bases:
BaseRemoteConfiguredJobClient
- pulsar_container_image: str¶
- class pulsar.client.client.BaseRemoteConfiguredJobClient(destination_params, job_id, client_manager)[source]¶
Bases:
BaseJobClient
- client_manager: ClientManagerProtocol¶
- class pulsar.client.client.ClientManagerProtocol(*args, **kwargs)[source]¶
Bases:
Protocol
- manager_name: str¶
- class pulsar.client.client.CoexecutionContainerCommand(image, command, args, working_directory, ports)[source]¶
Bases:
NamedTuple
- args: List[str]¶
Alias for field number 2
- command: str¶
Alias for field number 1
- image: str¶
Alias for field number 0
- ports: List[int] | None¶
Alias for field number 4
- working_directory: str¶
Alias for field number 3
- class pulsar.client.client.CoexecutionLaunchMixin(destination_params, job_id, client_manager)[source]¶
Bases:
BaseRemoteConfiguredJobClient
- execution_type: ExecutionType¶
- launch(command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, dynamic_file_sources=None, container_info=None, token_endpoint=None, pulsar_app_config=None) ExternalId | None [source]¶
- pulsar_container_image: str¶
- class pulsar.client.client.ExecutionType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
Bases:
str
,Enum
- PARALLEL = 'parallel'¶
- SEQUENTIAL = 'sequential'¶
- class pulsar.client.client.InputCachingJobClient(destination_params, job_id, job_manager_interface, client_cacher)[source]¶
Bases:
JobClient
Beta client that cache’s staged files to prevent duplication.
- cache_insert(**kwargs)¶
- cache_required(**kwargs)¶
- file_available(**kwargs)¶
- class pulsar.client.client.JobClient(destination_params, job_id, job_manager_interface)[source]¶
Bases:
BaseJobClient
Objects of this client class perform low-level communication with a remote Pulsar server.
Parameters
- destination_paramsdict or str
connection parameters, either url with dict containing url (and optionally private_token).
- job_idstr
Galaxy job/task id.
- fetch_output(path, name, working_directory, action_type, output_type)[source]¶
Fetch (transfer, copy, etc…) an output from the remote Pulsar server.
Parameters
- pathstr
Local path of the dataset.
- namestr
Remote name of file (i.e. path relative to remote staging output or working directory).
- working_directorystr
Local working_directory for the job.
- action_typestr
Where to find file on Pulsar (output_workdir or output). legacy is also an option in this case Pulsar is asked for location - this will only be used if targetting an older Pulsar server that didn’t return statuses allowing this to be inferred.
- launch(command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, dynamic_file_sources=None, token_endpoint=None)[source]¶
Queue up the execution of the supplied command_line on the remote server. Called launch for historical reasons, should be renamed to enqueue or something like that.
Parameters
- command_linestr
Command to execute.
- raw_check_complete(**kwargs)¶
- remote_setup(**kwargs)¶
- class pulsar.client.client.K8sMessageCoexecutionJobClient(destination_params, job_id, client_manager)[source]¶
Bases:
BaseMessageCoexecutionJobClient
,LaunchesK8ContainersMixin
A client that co-executes pods via Kubernetes and depends on amqp for status updates.
- class pulsar.client.client.K8sPollingCoexecutionJobClient(destination_params, job_id, client_manager)[source]¶
Bases:
BasePollingCoexecutionJobClient
,LaunchesK8ContainersMixin
A client that co-executes pods via Kubernetes and doesn’t depend on amqp.
- class pulsar.client.client.LaunchesAwsBatchContainersMixin(destination_params, job_id, client_manager)[source]¶
Bases:
CoexecutionLaunchMixin
…
- execution_type: ExecutionType = 'sequential'¶
- class pulsar.client.client.LaunchesK8ContainersMixin(destination_params, job_id, client_manager)[source]¶
Bases:
CoexecutionLaunchMixin
Mixin to provide K8 launch and kill interaction.
- ensure_library_available()¶
- execution_type: ExecutionType = 'parallel'¶
- class pulsar.client.client.LaunchesTesContainersMixin(destination_params, job_id, client_manager)[source]¶
Bases:
CoexecutionLaunchMixin
- ensure_library_available() None ¶
- execution_type: ExecutionType = 'sequential'¶
- class pulsar.client.client.MessageCLIJobClient(destination_params, job_id, client_manager, shell)[source]¶
Bases:
BaseMessageJobClient
- class pulsar.client.client.MessageJobClient(destination_params, job_id, client_manager)[source]¶
Bases:
BaseMessageJobClient
- class pulsar.client.client.MessagingClientManagerProtocol(*args, **kwargs)[source]¶
Bases:
ClientManagerProtocol
- status_cache: Dict[str, Dict[str, Any]]¶
- class pulsar.client.client.TesMessageCoexecutionJobClient(destination_params, job_id, client_manager)[source]¶
Bases:
BaseMessageCoexecutionJobClient
,LaunchesTesContainersMixin
A client that co-executes pods via GA4GH TES and doesn’t depend on amqp for status updates.
- class pulsar.client.client.TesPollingCoexecutionJobClient(destination_params, job_id, client_manager)[source]¶
Bases:
BasePollingCoexecutionJobClient
,LaunchesTesContainersMixin
A client that co-executes pods via GA4GH TES and depends on amqp for status updates.
pulsar.client.config_util module¶
Generic interface for reading YAML/INI/JSON config files into nested dictionaries.
pulsar.client.decorators module¶
pulsar.client.destination module¶
- pulsar.client.destination.submit_params(destination_params)[source]¶
>>> destination_params = {"private_token": "12345", "submit_native_specification": "-q batch"} >>> result = submit_params(destination_params) >>> result {'native_specification': '-q batch'}
- pulsar.client.destination.url_to_destination_params(url)[source]¶
Convert a legacy runner URL to a job destination
>>> params_simple = url_to_destination_params("http://localhost:8913/") >>> params_simple["url"] 'http://localhost:8913/' >>> params_simple["private_token"] is None True >>> advanced_url = "https://1234x@example.com:8914/managers/longqueue" >>> params_advanced = url_to_destination_params(advanced_url) >>> params_advanced["url"] 'https://example.com:8914/managers/longqueue/' >>> params_advanced["private_token"] '1234x' >>> runner_url = "pulsar://http://localhost:8913/" >>> runner_params = url_to_destination_params(runner_url) >>> runner_params['url'] 'http://localhost:8913/'
pulsar.client.exceptions module¶
Pulsar client exceptions
- exception pulsar.client.exceptions.PulsarClientTransportError(code=None, message=None, transport_code=None, transport_message=None)[source]¶
Bases:
Exception
- CONNECTION_REFUSED = 'connection_refused'¶
- INVALID_CODE_MESSAGE = 'Unknown transport error code: %s'¶
- TIMEOUT = 'timeout'¶
- UNKNOWN = 'unknown'¶
- messages = {'connection_refused': 'Connection refused', 'timeout': 'Connection timed out', 'unknown': 'Unknown transport error'}¶
pulsar.client.job_directory module¶
- class pulsar.client.job_directory.RemoteJobDirectory(remote_staging_directory, remote_id, remote_sep)[source]¶
Bases:
object
Representation of a (potentially) remote Pulsar-style staging directory.
- calculate_path(remote_relative_path, input_type)[source]¶
Only for used by Pulsar client, should override for managers to enforce security and make the directory if needed.
- property path¶
- property separator¶
- pulsar.client.job_directory.get_mapped_file(directory, remote_path, allow_nested_files=False, local_path_module=<module 'posixpath' (frozen)>, mkdir=True, allow_globs=False)[source]¶
>>> import ntpath >>> get_mapped_file(r'C:\pulsar\staging\101', 'dataset_1_files/moo/cow', allow_nested_files=True, local_path_module=ntpath, mkdir=False) 'C:\\pulsar\\staging\\101\\dataset_1_files\\moo\\cow' >>> get_mapped_file(r'C:\pulsar\staging\101', 'dataset_1_files/moo/cow', allow_nested_files=False, local_path_module=ntpath) 'C:\\pulsar\\staging\\101\\cow' >>> get_mapped_file(r'C:\pulsar\staging\101', '../cow', allow_nested_files=True, local_path_module=ntpath, mkdir=False) Traceback (most recent call last): Exception: Attempt to read or write file outside an authorized directory.
pulsar.client.manager module¶
Entry point for client creation.
build_client_manager
in particular is the abstraction that should be used
to create a ClientManager
, that in return can create Pulsar clients for
specific actions.
- class pulsar.client.manager.ClientManager(**kwds: Dict[str, Any])[source]¶
Bases:
ClientManagerInterface
Factory class to create Pulsar clients.
This class was introduced for classes of clients that need to potential share state between multiple client connections.
- client_class: Type[BaseJobClient]¶
- get_client(destination_params, job_id, **kwargs)[source]¶
Build a client given specific destination parameters and job_id.
- job_manager_interface_class: Type[PulsarInterface]¶
- class pulsar.client.manager.HttpPulsarInterface(destination_params, transport)[source]¶
Bases:
PulsarInterface
- execute(command, args=None, data=None, input_path=None, output_path=None)[source]¶
Execute the correspond command against configured Pulsar job manager. Arguments are method parameters and data or input_path describe essentially POST bodies. If command results in a file, resulting path should be specified as output_path.
pulsar.client.object_client module¶
- class pulsar.client.object_client.ObjectStoreClient(pulsar_interface)[source]¶
Bases:
object
- create(**kwargs)¶
- delete(**kwargs)¶
- empty(**kwargs)¶
- exists(**kwargs)¶
- file_ready(**kwargs)¶
- get_data(**kwargs)¶
- get_filename(**kwargs)¶
- get_store_usage_percent(**kwargs)¶
- size(**kwargs)¶
- update_from_file(**kwargs)¶
pulsar.client.path_mapper module¶
- class pulsar.client.path_mapper.PathMapper(client, remote_job_config, local_working_directory, action_mapper=None)[source]¶
Bases:
object
Ties together a FileActionMapper and remote job configuration returned by the Pulsar setup method to pre-determine the location of files for staging on the remote Pulsar server.
This is not useful when rewrite_paths (as has traditionally been done with the Pulsar) because when doing that the Pulsar determines the paths as files are uploaded. When rewrite_paths is disabled however, the destination of files needs to be determined prior to transfer so an object of this class can be used.
pulsar.client.server_interface module¶
- class pulsar.client.server_interface.HttpPulsarInterface(destination_params, transport)[source]¶
Bases:
PulsarInterface
- execute(command, args=None, data=None, input_path=None, output_path=None)[source]¶
Execute the correspond command against configured Pulsar job manager. Arguments are method parameters and data or input_path describe essentially POST bodies. If command results in a file, resulting path should be specified as output_path.
- class pulsar.client.server_interface.LocalPulsarInterface(destination_params, job_manager=None, pulsar_app=None, file_cache=None, object_store=None)[source]¶
Bases:
PulsarInterface
- execute(command, args=None, data=None, input_path=None, output_path=None)[source]¶
Execute the correspond command against configured Pulsar job manager. Arguments are method parameters and data or input_path describe essentially POST bodies. If command results in a file, resulting path should be specified as output_path.
- class pulsar.client.server_interface.PulsarInterface[source]¶
Bases:
object
Abstract base class describes how synchronous client communicates with (potentially remote) Pulsar procedures. Obvious implementation is HTTP based but Pulsar objects wrapped in routes can also be directly communicated with if in memory.
- abstract execute(command, args=None, data=None, input_path=None, output_path=None)[source]¶
Execute the correspond command against configured Pulsar job manager. Arguments are method parameters and data or input_path describe essentially POST bodies. If command results in a file, resulting path should be specified as output_path.
pulsar.client.setup_handler module¶
pulsar.client.util module¶
- class pulsar.client.util.ClientJsonEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]¶
Bases:
JSONEncoder
- default(obj)[source]¶
Implement this method in a subclass such that it returns a serializable object for
o
, or calls the base implementation (to raise aTypeError
).For example, to support arbitrary iterators, you could implement default like this:
def default(self, o): try: iterable = iter(o) except TypeError: pass else: return list(iterable) # Let the base class default method raise the TypeError return JSONEncoder.default(self, o)
- class pulsar.client.util.MessageQueueUUIDStore(persistence_directory, subdirs=None)[source]¶
Bases:
object
Persistent dict-like object for persisting message queue UUIDs that are awaiting acknowledgement or that have been operated on.
- class pulsar.client.util.MonitorStyle(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
Bases:
str
,Enum
- BACKGROUND = 'background'¶
- FOREGROUND = 'foreground'¶
- NONE = 'none'¶
- class pulsar.client.util.PathHelper(separator, local_path_module=<module 'posixpath' (frozen)>)[source]¶
Bases:
object
>>> import posixpath >>> # Forcing local path to posixpath because Pulsar designed to be used with >>> # posix client. >>> posix_path_helper = PathHelper("/", local_path_module=posixpath) >>> windows_slash = "\\" >>> len(windows_slash) 1 >>> nt_path_helper = PathHelper(windows_slash, local_path_module=posixpath) >>> posix_path_helper.remote_name("moo/cow") 'moo/cow' >>> nt_path_helper.remote_name("moo/cow") 'moo\\cow' >>> posix_path_helper.local_name("moo/cow") 'moo/cow' >>> nt_path_helper.local_name("moo\\cow") 'moo/cow' >>> posix_path_helper.from_posix_with_new_base("/galaxy/data/bowtie/hg19.fa", "/galaxy/data/", "/work/galaxy/data") '/work/galaxy/data/bowtie/hg19.fa' >>> posix_path_helper.from_posix_with_new_base("/galaxy/data/bowtie/hg19.fa", "/galaxy/data", "/work/galaxy/data") '/work/galaxy/data/bowtie/hg19.fa' >>> posix_path_helper.from_posix_with_new_base("/galaxy/data/bowtie/hg19.fa", "/galaxy/data", "/work/galaxy/data/") '/work/galaxy/data/bowtie/hg19.fa'
- pulsar.client.util.copy(source, destination)[source]¶
Copy file from source to destination if needed (skip if source is destination).
- pulsar.client.util.directory_files(directory)[source]¶
>>> from tempfile import mkdtemp >>> from shutil import rmtree >>> from os.path import join >>> from os import makedirs >>> tempdir = mkdtemp() >>> with open(join(tempdir, "moo"), "w") as f: pass >>> directory_files(tempdir) ['moo'] >>> subdir = join(tempdir, "cow", "sub1") >>> makedirs(subdir) >>> with open(join(subdir, "subfile1"), "w") as f: pass >>> with open(join(subdir, "subfile2"), "w") as f: pass >>> sorted(directory_files(tempdir)) ['cow/sub1/subfile1', 'cow/sub1/subfile2', 'moo'] >>> rmtree(tempdir)
Module contents¶
pulsar client¶
This module contains logic for interfacing with an external Pulsar server.
Configuring Galaxy¶
Galaxy job runners are configured in Galaxy’s job_conf.xml
file. See job_conf.xml.sample_advanced
in your Galaxy code base or on
Github
for information on how to configure Galaxy to interact with the Pulsar.
Galaxy also supports an older, less rich configuration of job runners directly
in its main galaxy.ini
file. The following section describes how to
configure Galaxy to communicate with the Pulsar in this legacy mode.
Legacy¶
A Galaxy tool can be configured to be executed remotely via Pulsar by
adding a line to the galaxy.ini
file under the galaxy:tool_runners
section with the format:
<tool_id> = pulsar://http://<pulsar_host>:<pulsar_port>
As an example, if a host named remotehost is running the Pulsar server
application on port 8913
, then the tool with id test_tool
can
be configured to run remotely on remotehost by adding the following
line to galaxy.ini
:
test_tool = pulsar://http://remotehost:8913
Remember this must be added after the [galaxy:tool_runners]
header
in the galaxy.ini
file.
- class pulsar.client.ClientInput(path, input_type, object_store_ref=None)[source]¶
Bases:
object
- property action_source¶
- class pulsar.client.ClientInputs(client_inputs)[source]¶
Bases:
object
Abstraction describing input datasets for a job.
- class pulsar.client.ClientJobDescription(command_line, tool=None, config_files=None, input_files=None, client_inputs=None, client_outputs=None, working_directory=None, metadata_directory=None, dependencies_description=None, env=[], arbitrary_files=None, job_directory_files=None, tool_directory_required_files=None, rewrite_paths=True, touch_outputs=None, container=None, remote_pulsar_app_config=None, guest_ports=None)[source]¶
Bases:
object
A description of how client views job - command_line, inputs, etc..
Parameters
- command_linestr
The local command line to execute, this will be rewritten for the remote server.
- config_fileslist
List of Galaxy ‘configfile’s produced for this job. These will be rewritten and sent to remote server.
- input_fileslist
List of input files used by job. These will be transferred and references rewritten.
- client_outputsClientOutputs
Description of outputs produced by job (at least output files along with optional version string and working directory outputs.
- tool_dirstr
Directory containing tool to execute (if a wrapper is used, it will be transferred to remote server).
- working_directorystr
Local path created by Galaxy for running this job (job_wrapper.tool_working_directory).
- metadata_directorystr
Local path created by Galaxy for running this job (job_wrapper.working_directory).
- dependencies_descriptionlist
galaxy.tools.deps.dependencies.DependencyDescription object describing tool dependency context for remote depenency resolution.
- env: list
List of dict object describing environment variables to populate.
- version_filestr
Path to version file expected on the client server
- arbitrary_filesdict()
Additional non-input, non-tool, non-config, non-working directory files to transfer before staging job. This is most likely data indices but can be anything. For now these are copied into staging working directory but this will be reworked to find a better, more robust location.
- rewrite_pathsboolean
Indicates whether paths should be rewritten in job inputs (command_line and config files) while staging files).
- property input_files¶
- property output_files¶
- property tool_dependencies¶
- property version_file¶
- class pulsar.client.ClientOutputs(working_directory=None, output_files=[], work_dir_outputs=None, version_file=None, dynamic_outputs=None, metadata_directory=None, job_directory=None, dynamic_file_sources=None)[source]¶
Bases:
object
Abstraction describing the output datasets EXPECTED by the Galaxy job runner client.
- class pulsar.client.PathMapper(client, remote_job_config, local_working_directory, action_mapper=None)[source]¶
Bases:
object
Ties together a FileActionMapper and remote job configuration returned by the Pulsar setup method to pre-determine the location of files for staging on the remote Pulsar server.
This is not useful when rewrite_paths (as has traditionally been done with the Pulsar) because when doing that the Pulsar determines the paths as files are uploaded. When rewrite_paths is disabled however, the destination of files needs to be determined prior to transfer so an object of this class can be used.
- exception pulsar.client.PulsarClientTransportError(code=None, message=None, transport_code=None, transport_message=None)[source]¶
Bases:
Exception
- CONNECTION_REFUSED = 'connection_refused'¶
- INVALID_CODE_MESSAGE = 'Unknown transport error code: %s'¶
- TIMEOUT = 'timeout'¶
- UNKNOWN = 'unknown'¶
- messages = {'connection_refused': 'Connection refused', 'timeout': 'Connection timed out', 'unknown': 'Unknown transport error'}¶
- class pulsar.client.PulsarOutputs(working_directory_contents, output_directory_contents, metadata_directory_contents, job_directory_contents, remote_separator='/', realized_dynamic_file_sources=None)[source]¶
Bases:
object
Abstraction describing the output files PRODUCED by the remote Pulsar server.
- pulsar.client.finish_job(client, cleanup_job, job_completed_normally, client_outputs, pulsar_outputs)[source]¶
Process for “un-staging” a complete Pulsar job.
This function is responsible for downloading results from remote server and cleaning up Pulsar staging directory (if needed.)
- pulsar.client.url_to_destination_params(url)[source]¶
Convert a legacy runner URL to a job destination
>>> params_simple = url_to_destination_params("http://localhost:8913/") >>> params_simple["url"] 'http://localhost:8913/' >>> params_simple["private_token"] is None True >>> advanced_url = "https://1234x@example.com:8914/managers/longqueue" >>> params_advanced = url_to_destination_params(advanced_url) >>> params_advanced["url"] 'https://example.com:8914/managers/longqueue/' >>> params_advanced["private_token"] '1234x' >>> runner_url = "pulsar://http://localhost:8913/" >>> runner_params = url_to_destination_params(runner_url) >>> runner_params['url'] 'http://localhost:8913/'