pulsar.client package

Subpackages

Submodules

pulsar.client.action_mapper module

class pulsar.client.action_mapper.FileActionMapper(client=None, config=None)[source]

Bases: object

Objects of this class define how paths are mapped to actions.

>>> json_string = r'''{"paths": [       {"path": "/opt/galaxy", "action": "none"},       {"path": "/galaxy/data", "action": "transfer"},       {"path": "/cool/bamfiles/**/*.bam", "action": "copy", "match_type": "glob"},       {"path": ".*/dataset_\\d+.dat", "action": "copy", "match_type": "regex"}     ]}'''
>>> from tempfile import NamedTemporaryFile
>>> from os import unlink
>>> def mapper_for(default_action, config_contents):
...     f = NamedTemporaryFile(delete=False)
...     f.write(config_contents.encode('UTF-8'))
...     f.close()
...     mock_client = Bunch(default_file_action=default_action, action_config_path=f.name, files_endpoint=None)
...     mapper = FileActionMapper(mock_client)
...     as_dict = config=mapper.to_dict()
...     mapper = FileActionMapper(config=as_dict) # Serialize and deserialize it to make sure still works
...     unlink(f.name)
...     return mapper
>>> mapper = mapper_for(default_action='none', config_contents=json_string)
>>> # Test first config line above, implicit path prefix mapper
>>> action = mapper.action({'path': '/opt/galaxy/tools/filters/catWrapper.py'}, 'input')
>>> action.action_type == u'none'
True
>>> action.staging_needed
False
>>> # Test another (2nd) mapper, this one with a different action
>>> action = mapper.action({'path': '/galaxy/data/files/000/dataset_1.dat'}, 'input')
>>> action.action_type == u'transfer'
True
>>> action.staging_needed
True
>>> # Always at least copy work_dir outputs.
>>> action = mapper.action({'path': '/opt/galaxy/database/working_directory/45.sh'}, 'workdir')
>>> action.action_type == u'copy'
True
>>> action.staging_needed
True
>>> # Test glob mapper (matching test)
>>> mapper.action({'path': '/cool/bamfiles/projectABC/study1/patient3.bam'}, 'input').action_type == u'copy'
True
>>> # Test glob mapper (non-matching test)
>>> mapper.action({'path': '/cool/bamfiles/projectABC/study1/patient3.bam.bai'}, 'input').action_type == u'none'
True
>>> # Regex mapper test.
>>> mapper.action({'path': '/old/galaxy/data/dataset_10245.dat'}, 'input').action_type == u'copy'
True
>>> # Doesn't map unstructured paths by default
>>> mapper.action({'path': '/old/galaxy/data/dataset_10245.dat'}, 'unstructured').action_type == u'none'
True
>>> input_only_mapper = mapper_for(default_action="none", config_contents=r'''{"paths": [       {"path": "/", "action": "transfer", "path_types": "input"}     ] }''')
>>> input_only_mapper.action({'path': '/dataset_1.dat'}, 'input').action_type == u'transfer'
True
>>> input_only_mapper.action({'path': '/dataset_1.dat'}, 'output').action_type == u'none'
True
>>> unstructured_mapper = mapper_for(default_action="none", config_contents=r'''{"paths": [       {"path": "/", "action": "transfer", "path_types": "*any*"}     ] }''')
>>> unstructured_mapper.action({'path': '/old/galaxy/data/dataset_10245.dat'}, 'unstructured').action_type == u'transfer'
True
>>> match_type_only_mapper = mapper_for(default_action="none", config_contents=r'''{"paths": [       {"action": "transfer", "path_types": "input"},       {"action": "remote_copy", "path_types": "output"}     ] }''')
>>> input_action = match_type_only_mapper.action({}, 'input')
>>> input_action.action_type
'transfer'
>>> output_action = match_type_only_mapper.action({}, 'output')
>>> output_action.action_type
'remote_copy'
action(source, type, mapper=None)[source]
to_dict()[source]
unstructured_mappers()[source]

Return mappers that will map ‘unstructured’ files (i.e. go beyond mapping inputs, outputs, and config files).

class pulsar.client.action_mapper.MessageAction(contents, client=None)[source]

Bases: object

Sort of pseudo action describing “files” store in memory and transferred via message (HTTP, Python-call, MQ, etc…)

action_type = 'message'
classmethod from_dict(action_dict)[source]
staging = 'default'
property staging_action_local
property staging_needed
to_dict()[source]
write_to_path(path)[source]
class pulsar.client.action_mapper.RemoteTransferAction(source, file_lister=None, url=None)[source]

Bases: BaseAction

This action indicates the Pulsar server should transfer the file before execution via one of the remote transfer implementations. This is like a TransferAction, but it indicates the action requires network access to the staging server, and should be executed via ssh/rsync/etc

action_type: str = 'remote_transfer'
classmethod from_dict(action_dict)[source]
inject_url = True
staging = 'remote'
to_dict()[source]
write_from_path(pulsar_path)[source]
write_to_path(path)[source]
pulsar.client.action_mapper.from_dict(action_dict)[source]

pulsar.client.amqp_exchange module

class pulsar.client.amqp_exchange.PulsarExchange(url, manager_name, amqp_key_prefix=None, connect_ssl=None, timeout=0.2, publish_kwds={}, publish_uuid_store=None, consume_uuid_store=None, republish_time=30)[source]

Bases: object

Utility for publishing and consuming structured Pulsar queues using kombu. This is shared between the server and client - an exchange should be setup for each manager (or in the case of the client, each manager one wished to communicate with.)

Each Pulsar manager is defined solely by name in the scheme, so only one Pulsar should target each AMQP endpoint or care should be taken that unique manager names are used across Pulsar servers targeting the same AMQP endpoint - and in particular only one such Pulsar should define an default manager with name _default_.

ack_manager()[source]
property acks_enabled
connection(connection_string, **kwargs)[source]
consume(queue_name, callback, check=True, connection_kwargs={})[source]
heartbeat(connection)[source]
publish(name, payload)[source]
property url

pulsar.client.amqp_exchange_factory module

pulsar.client.amqp_exchange_factory.get_exchange(url, manager_name, params)[source]
pulsar.client.amqp_exchange_factory.parse_ack_kwds(params, manager_name)[source]
pulsar.client.amqp_exchange_factory.parse_amqp_connect_ssl_params(params)[source]
pulsar.client.amqp_exchange_factory.parse_amqp_publish_kwds(params)[source]

pulsar.client.client module

class pulsar.client.client.AwsBatchMessageCoexecutionJobClient(destination_params, job_id, client_manager)[source]

Bases: BasePollingCoexecutionJobClient, LaunchesAwsBatchContainersMixin

A client that co-executes pods via AWS Batch and depends on amqp for status updates.

class pulsar.client.client.AwsBatchPollingCoexecutionJobClient(destination_params, job_id, client_manager)[source]

Bases: BasePollingCoexecutionJobClient, LaunchesAwsBatchContainersMixin

A client that co-executes pods via AWS Batch and doesn’t depend on amqp for status updates.

class pulsar.client.client.BaseJobClient(destination_params, job_id)[source]

Bases: object

assign_job_id(job_id)[source]
ensure_library_available: Callable[[], None] | None = None
property prefer_local_staging
setup(tool_id=None, tool_version=None, preserve_galaxy_python_environment=None)[source]

Setup remote Pulsar server to run this job.

class pulsar.client.client.BaseMessageCoexecutionJobClient(destination_params, job_id, client_manager)[source]

Bases: BaseMessageJobClient

pulsar_container_image: str
class pulsar.client.client.BaseMessageJobClient(destination_params, job_id, client_manager)[source]

Bases: BaseRemoteConfiguredJobClient

clean()[source]
client_manager: MessagingClientManagerProtocol
full_status()[source]
class pulsar.client.client.BasePollingCoexecutionJobClient(destination_params, job_id, client_manager)[source]

Bases: BaseRemoteConfiguredJobClient

pulsar_container_image: str
class pulsar.client.client.BaseRemoteConfiguredJobClient(destination_params, job_id, client_manager)[source]

Bases: BaseJobClient

client_manager: ClientManagerProtocol
class pulsar.client.client.ClientManagerProtocol(*args, **kwargs)[source]

Bases: Protocol

manager_name: str
class pulsar.client.client.CoexecutionContainerCommand(image, command, args, working_directory, ports)[source]

Bases: NamedTuple

args: List[str]

Alias for field number 2

command: str

Alias for field number 1

image: str

Alias for field number 0

ports: List[int] | None

Alias for field number 4

working_directory: str

Alias for field number 3

class pulsar.client.client.CoexecutionLaunchMixin(destination_params, job_id, client_manager)[source]

Bases: BaseRemoteConfiguredJobClient

execution_type: ExecutionType
launch(command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, dynamic_file_sources=None, container_info=None, token_endpoint=None, pulsar_app_config=None) ExternalId | None[source]
pulsar_container_image: str
class pulsar.client.client.ExecutionType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: str, Enum

PARALLEL = 'parallel'
SEQUENTIAL = 'sequential'
class pulsar.client.client.InputCachingJobClient(destination_params, job_id, job_manager_interface, client_cacher)[source]

Bases: JobClient

Beta client that cache’s staged files to prevent duplication.

cache_insert(**kwargs)
cache_required(**kwargs)
file_available(**kwargs)
class pulsar.client.client.JobClient(destination_params, job_id, job_manager_interface)[source]

Bases: BaseJobClient

Objects of this client class perform low-level communication with a remote Pulsar server.

Parameters

destination_paramsdict or str

connection parameters, either url with dict containing url (and optionally private_token).

job_idstr

Galaxy job/task id.

clean()[source]

Cleanup the remote job.

fetch_output(path, name, working_directory, action_type, output_type)[source]

Fetch (transfer, copy, etc…) an output from the remote Pulsar server.

Parameters

pathstr

Local path of the dataset.

namestr

Remote name of file (i.e. path relative to remote staging output or working directory).

working_directorystr

Local working_directory for the job.

action_typestr

Where to find file on Pulsar (output_workdir or output). legacy is also an option in this case Pulsar is asked for location - this will only be used if targetting an older Pulsar server that didn’t return statuses allowing this to be inferred.

full_status()[source]

Return a dictionary summarizing final state of job.

get_status()[source]
job_ip()[source]

Return a entry point ports dict (if applicable).

kill()[source]

Cancel remote job, either removing from the queue or killing it.

launch(command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, dynamic_file_sources=None, token_endpoint=None)[source]

Queue up the execution of the supplied command_line on the remote server. Called launch for historical reasons, should be renamed to enqueue or something like that.

Parameters

command_linestr

Command to execute.

put_file(path, input_type, name=None, contents=None, action_type='transfer')[source]
raw_check_complete(**kwargs)
remote_setup(**kwargs)
class pulsar.client.client.K8sMessageCoexecutionJobClient(destination_params, job_id, client_manager)[source]

Bases: BaseMessageCoexecutionJobClient, LaunchesK8ContainersMixin

A client that co-executes pods via Kubernetes and depends on amqp for status updates.

class pulsar.client.client.K8sPollingCoexecutionJobClient(destination_params, job_id, client_manager)[source]

Bases: BasePollingCoexecutionJobClient, LaunchesK8ContainersMixin

A client that co-executes pods via Kubernetes and doesn’t depend on amqp.

full_status()[source]
raw_check_complete()[source]
class pulsar.client.client.LaunchesAwsBatchContainersMixin(destination_params, job_id, client_manager)[source]

Bases: CoexecutionLaunchMixin

execution_type: ExecutionType = 'sequential'
class pulsar.client.client.LaunchesK8ContainersMixin(destination_params, job_id, client_manager)[source]

Bases: CoexecutionLaunchMixin

Mixin to provide K8 launch and kill interaction.

clean()[source]
ensure_library_available()
execution_type: ExecutionType = 'parallel'
job_ip()[source]
kill()[source]
class pulsar.client.client.LaunchesTesContainersMixin(destination_params, job_id, client_manager)[source]

Bases: CoexecutionLaunchMixin

clean()[source]
ensure_library_available() None
execution_type: ExecutionType = 'sequential'
kill()[source]
raw_check_complete() Dict[str, Any][source]
class pulsar.client.client.MessageCLIJobClient(destination_params, job_id, client_manager, shell)[source]

Bases: BaseMessageJobClient

kill()[source]
launch(command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, dynamic_file_sources=None, token_endpoint=None)[source]
class pulsar.client.client.MessageJobClient(destination_params, job_id, client_manager)[source]

Bases: BaseMessageJobClient

get_status()[source]
kill()[source]
launch(command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, dynamic_file_sources=None, token_endpoint=None)[source]
class pulsar.client.client.MessagingClientManagerProtocol(*args, **kwargs)[source]

Bases: ClientManagerProtocol

status_cache: Dict[str, Dict[str, Any]]
exception pulsar.client.client.OutputNotFoundException(path)[source]

Bases: Exception

class pulsar.client.client.TesMessageCoexecutionJobClient(destination_params, job_id, client_manager)[source]

Bases: BaseMessageCoexecutionJobClient, LaunchesTesContainersMixin

A client that co-executes pods via GA4GH TES and doesn’t depend on amqp for status updates.

class pulsar.client.client.TesPollingCoexecutionJobClient(destination_params, job_id, client_manager)[source]

Bases: BasePollingCoexecutionJobClient, LaunchesTesContainersMixin

A client that co-executes pods via GA4GH TES and depends on amqp for status updates.

pulsar.client.client.tes_state_is_complete(state: TesState | None) bool[source]
pulsar.client.client.tes_state_to_pulsar_status(state: TesState | None) str[source]

pulsar.client.config_util module

Generic interface for reading YAML/INI/JSON config files into nested dictionaries.

pulsar.client.config_util.read_file(path, type=None, default_type='yaml')[source]

pulsar.client.decorators module

class pulsar.client.decorators.parseJson[source]

Bases: object

class pulsar.client.decorators.retry[source]

Bases: object

pulsar.client.destination module

pulsar.client.destination.submit_params(destination_params)[source]
>>> destination_params = {"private_token": "12345", "submit_native_specification": "-q batch"}
>>> result = submit_params(destination_params)
>>> result
{'native_specification': '-q batch'}
pulsar.client.destination.url_to_destination_params(url)[source]

Convert a legacy runner URL to a job destination

>>> params_simple = url_to_destination_params("http://localhost:8913/")
>>> params_simple["url"]
'http://localhost:8913/'
>>> params_simple["private_token"] is None
True
>>> advanced_url = "https://1234x@example.com:8914/managers/longqueue"
>>> params_advanced = url_to_destination_params(advanced_url)
>>> params_advanced["url"]
'https://example.com:8914/managers/longqueue/'
>>> params_advanced["private_token"]
'1234x'
>>> runner_url = "pulsar://http://localhost:8913/"
>>> runner_params = url_to_destination_params(runner_url)
>>> runner_params['url']
'http://localhost:8913/'

pulsar.client.exceptions module

Pulsar client exceptions

exception pulsar.client.exceptions.PulsarClientTransportError(code=None, message=None, transport_code=None, transport_message=None)[source]

Bases: Exception

CONNECTION_REFUSED = 'connection_refused'
INVALID_CODE_MESSAGE = 'Unknown transport error code: %s'
TIMEOUT = 'timeout'
UNKNOWN = 'unknown'
messages = {'connection_refused': 'Connection refused', 'timeout': 'Connection timed out', 'unknown': 'Unknown transport error'}

pulsar.client.job_directory module

class pulsar.client.job_directory.RemoteJobDirectory(remote_staging_directory, remote_id, remote_sep)[source]

Bases: object

Representation of a (potentially) remote Pulsar-style staging directory.

calculate_path(remote_relative_path, input_type)[source]

Only for used by Pulsar client, should override for managers to enforce security and make the directory if needed.

configs_directory()[source]
default_tmp_directory()[source]
home_directory()[source]
inputs_directory()[source]
metadata_directory()[source]
outputs_directory()[source]
property path
property separator
tool_files_directory()[source]
unstructured_files_directory()[source]
working_directory()[source]
pulsar.client.job_directory.get_mapped_file(directory, remote_path, allow_nested_files=False, local_path_module=<module 'posixpath' (frozen)>, mkdir=True, allow_globs=False)[source]
>>> import ntpath
>>> get_mapped_file(r'C:\pulsar\staging\101', 'dataset_1_files/moo/cow', allow_nested_files=True, local_path_module=ntpath, mkdir=False)
'C:\\pulsar\\staging\\101\\dataset_1_files\\moo\\cow'
>>> get_mapped_file(r'C:\pulsar\staging\101', 'dataset_1_files/moo/cow', allow_nested_files=False, local_path_module=ntpath)
'C:\\pulsar\\staging\\101\\cow'
>>> get_mapped_file(r'C:\pulsar\staging\101', '../cow', allow_nested_files=True, local_path_module=ntpath, mkdir=False)
Traceback (most recent call last):
Exception: Attempt to read or write file outside an authorized directory.
pulsar.client.job_directory.verify_is_in_directory(path, directory, local_path_module=<module 'posixpath' (frozen)>)[source]

pulsar.client.manager module

Entry point for client creation.

build_client_manager in particular is the abstraction that should be used to create a ClientManager, that in return can create Pulsar clients for specific actions.

class pulsar.client.manager.ClientManager(**kwds: Dict[str, Any])[source]

Bases: ClientManagerInterface

Factory class to create Pulsar clients.

This class was introduced for classes of clients that need to potential share state between multiple client connections.

client_class: Type[BaseJobClient]
get_client(destination_params, job_id, **kwargs)[source]

Build a client given specific destination parameters and job_id.

job_manager_interface_class: Type[PulsarInterface]
class pulsar.client.manager.HttpPulsarInterface(destination_params, transport)[source]

Bases: PulsarInterface

execute(command, args=None, data=None, input_path=None, output_path=None)[source]

Execute the correspond command against configured Pulsar job manager. Arguments are method parameters and data or input_path describe essentially POST bodies. If command results in a file, resulting path should be specified as output_path.

class pulsar.client.manager.ObjectStoreClientManager(**kwds)[source]

Bases: object

get_client(client_params)[source]

pulsar.client.object_client module

class pulsar.client.object_client.ObjectStoreClient(pulsar_interface)[source]

Bases: object

create(**kwargs)
delete(**kwargs)
empty(**kwargs)
exists(**kwargs)
file_ready(**kwargs)
get_data(**kwargs)
get_filename(**kwargs)
get_store_usage_percent(**kwargs)
size(**kwargs)
update_from_file(**kwargs)

pulsar.client.path_mapper module

class pulsar.client.path_mapper.PathMapper(client, remote_job_config, local_working_directory, action_mapper=None)[source]

Bases: object

Ties together a FileActionMapper and remote job configuration returned by the Pulsar setup method to pre-determine the location of files for staging on the remote Pulsar server.

This is not useful when rewrite_paths (as has traditionally been done with the Pulsar) because when doing that the Pulsar determines the paths as files are uploaded. When rewrite_paths is disabled however, the destination of files needs to be determined prior to transfer so an object of this class can be used.

check_for_arbitrary_rewrite(local_path)[source]
remote_input_path_rewrite(local_path, client_input_path_type=None)[source]
remote_output_path_rewrite(local_path)[source]
remote_version_path_rewrite(local_path)[source]

pulsar.client.server_interface module

class pulsar.client.server_interface.HttpPulsarInterface(destination_params, transport)[source]

Bases: PulsarInterface

execute(command, args=None, data=None, input_path=None, output_path=None)[source]

Execute the correspond command against configured Pulsar job manager. Arguments are method parameters and data or input_path describe essentially POST bodies. If command results in a file, resulting path should be specified as output_path.

class pulsar.client.server_interface.LocalPulsarInterface(destination_params, job_manager=None, pulsar_app=None, file_cache=None, object_store=None)[source]

Bases: PulsarInterface

execute(command, args=None, data=None, input_path=None, output_path=None)[source]

Execute the correspond command against configured Pulsar job manager. Arguments are method parameters and data or input_path describe essentially POST bodies. If command results in a file, resulting path should be specified as output_path.

class pulsar.client.server_interface.PulsarInterface[source]

Bases: object

Abstract base class describes how synchronous client communicates with (potentially remote) Pulsar procedures. Obvious implementation is HTTP based but Pulsar objects wrapped in routes can also be directly communicated with if in memory.

abstract execute(command, args=None, data=None, input_path=None, output_path=None)[source]

Execute the correspond command against configured Pulsar job manager. Arguments are method parameters and data or input_path describe essentially POST bodies. If command results in a file, resulting path should be specified as output_path.

pulsar.client.setup_handler module

pulsar.client.setup_handler.build(client, destination_args)[source]

Build a SetupHandler object for client from destination parameters.

pulsar.client.setup_handler.build_job_config(job_id, job_directory, system_properties={}, tool_id=None, tool_version=None, preserve_galaxy_python_environment=None)[source]

pulsar.client.util module

class pulsar.client.util.ClientJsonEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]

Bases: JSONEncoder

default(obj)[source]

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this:

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
class pulsar.client.util.EventHolder(event, path, condition_manager)[source]

Bases: object

fail()[source]
release()[source]
class pulsar.client.util.ExternalId(external_id: str)[source]

Bases: object

external_id: str
class pulsar.client.util.MessageQueueUUIDStore(persistence_directory, subdirs=None)[source]

Bases: object

Persistent dict-like object for persisting message queue UUIDs that are awaiting acknowledgement or that have been operated on.

get_time(key)[source]
keys()[source]
set_time(key)[source]
class pulsar.client.util.MonitorStyle(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: str, Enum

BACKGROUND = 'background'
FOREGROUND = 'foreground'
NONE = 'none'
class pulsar.client.util.PathHelper(separator, local_path_module=<module 'posixpath' (frozen)>)[source]

Bases: object

>>> import posixpath
>>> # Forcing local path to posixpath because Pulsar designed to be used with
>>> # posix client.
>>> posix_path_helper = PathHelper("/", local_path_module=posixpath)
>>> windows_slash = "\\"
>>> len(windows_slash)
1
>>> nt_path_helper = PathHelper(windows_slash, local_path_module=posixpath)
>>> posix_path_helper.remote_name("moo/cow")
'moo/cow'
>>> nt_path_helper.remote_name("moo/cow")
'moo\\cow'
>>> posix_path_helper.local_name("moo/cow")
'moo/cow'
>>> nt_path_helper.local_name("moo\\cow")
'moo/cow'
>>> posix_path_helper.from_posix_with_new_base("/galaxy/data/bowtie/hg19.fa", "/galaxy/data/", "/work/galaxy/data")
'/work/galaxy/data/bowtie/hg19.fa'
>>> posix_path_helper.from_posix_with_new_base("/galaxy/data/bowtie/hg19.fa", "/galaxy/data", "/work/galaxy/data")
'/work/galaxy/data/bowtie/hg19.fa'
>>> posix_path_helper.from_posix_with_new_base("/galaxy/data/bowtie/hg19.fa", "/galaxy/data", "/work/galaxy/data/")
'/work/galaxy/data/bowtie/hg19.fa'
from_posix_with_new_base(posix_path, old_base, new_base)[source]
local_name(remote_name)[source]
remote_join(*args)[source]
remote_name(local_name)[source]
class pulsar.client.util.TransferEventManager[source]

Bases: object

acquire_event(path, force_clear=False)[source]
pulsar.client.util.copy(source, destination)[source]

Copy file from source to destination if needed (skip if source is destination).

pulsar.client.util.copy_to_path(object, path)[source]

Copy file-like object to path.

pulsar.client.util.directory_files(directory)[source]
>>> from tempfile import mkdtemp
>>> from shutil import rmtree
>>> from os.path import join
>>> from os import makedirs
>>> tempdir = mkdtemp()
>>> with open(join(tempdir, "moo"), "w") as f: pass
>>> directory_files(tempdir)
['moo']
>>> subdir = join(tempdir, "cow", "sub1")
>>> makedirs(subdir)
>>> with open(join(subdir, "subfile1"), "w") as f: pass
>>> with open(join(subdir, "subfile2"), "w") as f: pass
>>> sorted(directory_files(tempdir))
['cow/sub1/subfile1', 'cow/sub1/subfile2', 'moo']
>>> rmtree(tempdir)
pulsar.client.util.ensure_directory(file_path)[source]
pulsar.client.util.filter_destination_params(destination_params, prefix)[source]
pulsar.client.util.from_base64_json(data)[source]
pulsar.client.util.json_dumps(obj)[source]
pulsar.client.util.json_loads(obj)[source]
pulsar.client.util.to_base64_json(data)[source]
>>> enc = to_base64_json(dict(a=5))
>>> dec = from_base64_json(enc)
>>> dec["a"]
5
pulsar.client.util.unique_path_prefix(path)[source]

Module contents

pulsar client

This module contains logic for interfacing with an external Pulsar server.

Configuring Galaxy

Galaxy job runners are configured in Galaxy’s job_conf.xml file. See job_conf.xml.sample_advanced in your Galaxy code base or on Github for information on how to configure Galaxy to interact with the Pulsar.

Galaxy also supports an older, less rich configuration of job runners directly in its main galaxy.ini file. The following section describes how to configure Galaxy to communicate with the Pulsar in this legacy mode.

Legacy

A Galaxy tool can be configured to be executed remotely via Pulsar by adding a line to the galaxy.ini file under the galaxy:tool_runners section with the format:

<tool_id> = pulsar://http://<pulsar_host>:<pulsar_port>

As an example, if a host named remotehost is running the Pulsar server application on port 8913, then the tool with id test_tool can be configured to run remotely on remotehost by adding the following line to galaxy.ini:

test_tool = pulsar://http://remotehost:8913

Remember this must be added after the [galaxy:tool_runners] header in the galaxy.ini file.

class pulsar.client.ClientInput(path, input_type, object_store_ref=None)[source]

Bases: object

property action_source
class pulsar.client.ClientInputs(client_inputs)[source]

Bases: object

Abstraction describing input datasets for a job.

static for_simple_input_paths(input_files)[source]
class pulsar.client.ClientJobDescription(command_line, tool=None, config_files=None, input_files=None, client_inputs=None, client_outputs=None, working_directory=None, metadata_directory=None, dependencies_description=None, env=[], arbitrary_files=None, job_directory_files=None, tool_directory_required_files=None, rewrite_paths=True, touch_outputs=None, container=None, remote_pulsar_app_config=None, guest_ports=None)[source]

Bases: object

A description of how client views job - command_line, inputs, etc..

Parameters

command_linestr

The local command line to execute, this will be rewritten for the remote server.

config_fileslist

List of Galaxy ‘configfile’s produced for this job. These will be rewritten and sent to remote server.

input_fileslist

List of input files used by job. These will be transferred and references rewritten.

client_outputsClientOutputs

Description of outputs produced by job (at least output files along with optional version string and working directory outputs.

tool_dirstr

Directory containing tool to execute (if a wrapper is used, it will be transferred to remote server).

working_directorystr

Local path created by Galaxy for running this job (job_wrapper.tool_working_directory).

metadata_directorystr

Local path created by Galaxy for running this job (job_wrapper.working_directory).

dependencies_descriptionlist

galaxy.tools.deps.dependencies.DependencyDescription object describing tool dependency context for remote depenency resolution.

env: list

List of dict object describing environment variables to populate.

version_filestr

Path to version file expected on the client server

arbitrary_filesdict()

Additional non-input, non-tool, non-config, non-working directory files to transfer before staging job. This is most likely data indices but can be anything. For now these are copied into staging working directory but this will be reworked to find a better, more robust location.

rewrite_pathsboolean

Indicates whether paths should be rewritten in job inputs (command_line and config files) while staging files).

property input_files
property output_files
property tool_dependencies
property version_file
class pulsar.client.ClientOutputs(working_directory=None, output_files=[], work_dir_outputs=None, version_file=None, dynamic_outputs=None, metadata_directory=None, job_directory=None, dynamic_file_sources=None)[source]

Bases: object

Abstraction describing the output datasets EXPECTED by the Galaxy job runner client.

dynamic_match(filename)[source]
static from_dict(config_dict)[source]
to_dict()[source]
exception pulsar.client.OutputNotFoundException(path)[source]

Bases: Exception

class pulsar.client.PathMapper(client, remote_job_config, local_working_directory, action_mapper=None)[source]

Bases: object

Ties together a FileActionMapper and remote job configuration returned by the Pulsar setup method to pre-determine the location of files for staging on the remote Pulsar server.

This is not useful when rewrite_paths (as has traditionally been done with the Pulsar) because when doing that the Pulsar determines the paths as files are uploaded. When rewrite_paths is disabled however, the destination of files needs to be determined prior to transfer so an object of this class can be used.

check_for_arbitrary_rewrite(local_path)[source]
remote_input_path_rewrite(local_path, client_input_path_type=None)[source]
remote_output_path_rewrite(local_path)[source]
remote_version_path_rewrite(local_path)[source]
exception pulsar.client.PulsarClientTransportError(code=None, message=None, transport_code=None, transport_message=None)[source]

Bases: Exception

CONNECTION_REFUSED = 'connection_refused'
INVALID_CODE_MESSAGE = 'Unknown transport error code: %s'
TIMEOUT = 'timeout'
UNKNOWN = 'unknown'
messages = {'connection_refused': 'Connection refused', 'timeout': 'Connection timed out', 'unknown': 'Unknown transport error'}
class pulsar.client.PulsarOutputs(working_directory_contents, output_directory_contents, metadata_directory_contents, job_directory_contents, remote_separator='/', realized_dynamic_file_sources=None)[source]

Bases: object

Abstraction describing the output files PRODUCED by the remote Pulsar server.

static from_status_response(complete_response)[source]
has_output_file(output_file)[source]
output_extras(output_file)[source]

Returns dict mapping local path to remote name.

pulsar.client.build_client_manager(**kwargs: Dict[str, Any]) ClientManagerInterface[source]
pulsar.client.finish_job(client, cleanup_job, job_completed_normally, client_outputs, pulsar_outputs)[source]

Process for “un-staging” a complete Pulsar job.

This function is responsible for downloading results from remote server and cleaning up Pulsar staging directory (if needed.)

pulsar.client.submit_job(client, client_job_description, job_config=None)[source]
pulsar.client.url_to_destination_params(url)[source]

Convert a legacy runner URL to a job destination

>>> params_simple = url_to_destination_params("http://localhost:8913/")
>>> params_simple["url"]
'http://localhost:8913/'
>>> params_simple["private_token"] is None
True
>>> advanced_url = "https://1234x@example.com:8914/managers/longqueue"
>>> params_advanced = url_to_destination_params(advanced_url)
>>> params_advanced["url"]
'https://example.com:8914/managers/longqueue/'
>>> params_advanced["private_token"]
'1234x'
>>> runner_url = "pulsar://http://localhost:8913/"
>>> runner_params = url_to_destination_params(runner_url)
>>> runner_params['url']
'http://localhost:8913/'