pulsar.client package
Subpackages
- pulsar.client.staging package
- pulsar.client.test package
- pulsar.client.transport package
Submodules
pulsar.client.action_mapper module
- class pulsar.client.action_mapper.FileActionMapper(client=None, config=None)[source]
Bases:
objectObjects of this class define how paths are mapped to actions.
>>> json_string = r'''{"paths": [ {"path": "/opt/galaxy", "action": "none"}, {"path": "/galaxy/data", "action": "transfer"}, {"path": "/cool/bamfiles/**/*.bam", "action": "copy", "match_type": "glob"}, {"path": ".*/dataset_\\d+.dat", "action": "copy", "match_type": "regex"} ]}''' >>> from tempfile import NamedTemporaryFile >>> from os import unlink >>> def mapper_for(default_action, config_contents): ... f = NamedTemporaryFile(delete=False) ... f.write(config_contents.encode('UTF-8')) ... f.close() ... mock_client = Bunch(default_file_action=default_action, action_config_path=f.name, files_endpoint=None) ... mapper = FileActionMapper(mock_client) ... as_dict = config=mapper.to_dict() ... mapper = FileActionMapper(config=as_dict) # Serialize and deserialize it to make sure still works ... unlink(f.name) ... return mapper >>> mapper = mapper_for(default_action='none', config_contents=json_string) >>> # Test first config line above, implicit path prefix mapper >>> action = mapper.action({'path': '/opt/galaxy/tools/filters/catWrapper.py'}, 'input') >>> action.action_type == u'none' True >>> action.staging_needed False >>> # Test another (2nd) mapper, this one with a different action >>> action = mapper.action({'path': '/galaxy/data/files/000/dataset_1.dat'}, 'input') >>> action.action_type == u'transfer' True >>> action.staging_needed True >>> # Always at least copy work_dir outputs. >>> action = mapper.action({'path': '/opt/galaxy/database/working_directory/45.sh'}, 'workdir') >>> action.action_type == u'copy' True >>> action.staging_needed True >>> # Always at least copy input metadata files. >>> action = mapper.action({'path': '/opt/galaxy/database/working_directory/metadata'}, 'metadata') >>> action.action_type == u'copy' True >>> action.staging_needed True >>> # Test glob mapper (matching test) >>> mapper.action({'path': '/cool/bamfiles/projectABC/study1/patient3.bam'}, 'input').action_type == u'copy' True >>> # Test glob mapper (non-matching test) >>> mapper.action({'path': '/cool/bamfiles/projectABC/study1/patient3.bam.bai'}, 'input').action_type == u'none' True >>> # Regex mapper test. >>> mapper.action({'path': '/old/galaxy/data/dataset_10245.dat'}, 'input').action_type == u'copy' True >>> # Doesn't map unstructured paths by default >>> mapper.action({'path': '/old/galaxy/data/dataset_10245.dat'}, 'unstructured').action_type == u'none' True >>> input_only_mapper = mapper_for(default_action="none", config_contents=r'''{"paths": [ {"path": "/", "action": "transfer", "path_types": "input"} ] }''') >>> input_only_mapper.action({'path': '/dataset_1.dat'}, 'input').action_type == u'transfer' True >>> input_only_mapper.action({'path': '/dataset_1.dat'}, 'output').action_type == u'none' True >>> unstructured_mapper = mapper_for(default_action="none", config_contents=r'''{"paths": [ {"path": "/", "action": "transfer", "path_types": "*any*"} ] }''') >>> unstructured_mapper.action({'path': '/old/galaxy/data/dataset_10245.dat'}, 'unstructured').action_type == u'transfer' True >>> match_type_only_mapper = mapper_for(default_action="none", config_contents=r'''{"paths": [ {"action": "transfer", "path_types": "input"}, {"action": "remote_copy", "path_types": "output"} ] }''') >>> input_action = match_type_only_mapper.action({}, 'input') >>> input_action.action_type 'transfer' >>> output_action = match_type_only_mapper.action({}, 'output') >>> output_action.action_type 'remote_copy'
- class pulsar.client.action_mapper.MessageAction(contents, client=None)[source]
Bases:
objectSort of pseudo action describing “files” store in memory and transferred via message (HTTP, Python-call, MQ, etc…)
- action_type = 'message'
- staging = 'default'
- property staging_action_local
- property staging_needed
- class pulsar.client.action_mapper.RemoteTransferAction(source, file_lister=None, url=None)[source]
Bases:
BaseActionThis action indicates the Pulsar server should transfer the file before execution via one of the remote transfer implementations. This is like a TransferAction, but it indicates the action requires network access to the staging server, and should be executed via ssh/rsync/etc
- action_type: str = 'remote_transfer'
- inject_url = True
- staging = 'remote'
pulsar.client.amqp_exchange module
- class pulsar.client.amqp_exchange.PulsarExchange(url, manager_name, amqp_key_prefix=None, connect_ssl=None, timeout=0.2, publish_kwds={}, publish_uuid_store=None, consume_uuid_store=None, republish_time=30, durable=False)[source]
Bases:
objectUtility for publishing and consuming structured Pulsar queues using kombu. This is shared between the server and client - an exchange should be setup for each manager (or in the case of the client, each manager one wished to communicate with.)
Each Pulsar manager is defined solely by name in the scheme, so only one Pulsar should target each AMQP endpoint or care should be taken that unique manager names are used across Pulsar servers targeting the same AMQP endpoint - and in particular only one such Pulsar should define an default manager with name _default_.
- property acks_enabled
- property url
pulsar.client.amqp_exchange_factory module
pulsar.client.client module
- class pulsar.client.client.AwsBatchMessageCoexecutionJobClient(destination_params, job_id, client_manager)[source]
Bases:
BasePollingCoexecutionJobClient,LaunchesAwsBatchContainersMixinA client that co-executes pods via AWS Batch and depends on amqp for status updates.
- class pulsar.client.client.AwsBatchPollingCoexecutionJobClient(destination_params, job_id, client_manager)[source]
Bases:
BasePollingCoexecutionJobClient,LaunchesAwsBatchContainersMixinA client that co-executes pods via AWS Batch and doesn’t depend on amqp for status updates.
- class pulsar.client.client.BaseJobClient(destination_params, job_id)[source]
Bases:
object- ensure_library_available: Callable[[], None] | None = None
- property prefer_local_staging
- class pulsar.client.client.BaseMessageCoexecutionJobClient(destination_params, job_id, client_manager)[source]
Bases:
BaseMessageJobClient- pulsar_container_image: str
- class pulsar.client.client.BaseMessageJobClient(destination_params, job_id, client_manager)[source]
Bases:
BaseRemoteConfiguredJobClient- client_manager: MessagingClientManagerProtocol
- class pulsar.client.client.BasePollingCoexecutionJobClient(destination_params, job_id, client_manager)[source]
Bases:
BaseRemoteConfiguredJobClient- pulsar_container_image: str
- class pulsar.client.client.BaseRemoteConfiguredJobClient(destination_params, job_id, client_manager)[source]
Bases:
BaseJobClient- client_manager: ClientManagerProtocol
- class pulsar.client.client.ClientManagerProtocol(*args, **kwargs)[source]
Bases:
Protocol- manager_name: str
- class pulsar.client.client.CoexecutionLaunchMixin(destination_params, job_id, client_manager)[source]
Bases:
BaseRemoteConfiguredJobClient- execution_type: ExecutionType
- launch(command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, dynamic_file_sources=None, container_info=None, token_endpoint=None, pulsar_app_config=None) ExternalId | None[source]
- pulsar_container_image: str
- class pulsar.client.client.ExecutionType(value)[source]
Bases:
str,Enum- PARALLEL = 'parallel'
- SEQUENTIAL = 'sequential'
- class pulsar.client.client.GcpMessageCoexecutionJobClient(destination_params, job_id, client_manager)[source]
Bases:
BaseMessageCoexecutionJobClient,LaunchesGcpContainersMixinA client that co-executes pods via GCP and depends on amqp for status updates.
- class pulsar.client.client.GcpPollingCoexecutionJobClient(destination_params, job_id, client_manager)[source]
Bases:
BasePollingCoexecutionJobClient,LaunchesGcpContainersMixinA client that co-executes pods via GCP and doesn’t depend on amqp.
- class pulsar.client.client.InputCachingJobClient(destination_params, job_id, job_manager_interface, client_cacher)[source]
Bases:
JobClientBeta client that cache’s staged files to prevent duplication.
- cache_insert(**kwargs)
- cache_required(**kwargs)
- file_available(**kwargs)
- class pulsar.client.client.JobClient(destination_params, job_id, job_manager_interface)[source]
Bases:
BaseJobClientObjects of this client class perform low-level communication with a remote Pulsar server.
Parameters
- destination_paramsdict or str
connection parameters, either url with dict containing url (and optionally private_token).
- job_idstr
Galaxy job/task id.
- fetch_output(path, name, working_directory, action_type, output_type)[source]
Fetch (transfer, copy, etc…) an output from the remote Pulsar server.
Parameters
- pathstr
Local path of the dataset.
- namestr
Remote name of file (i.e. path relative to remote staging output or working directory).
- working_directorystr
Local working_directory for the job.
- action_typestr
Where to find file on Pulsar (output_workdir or output). legacy is also an option in this case Pulsar is asked for location - this will only be used if targetting an older Pulsar server that didn’t return statuses allowing this to be inferred.
- launch(command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, dynamic_file_sources=None, token_endpoint=None)[source]
Queue up the execution of the supplied command_line on the remote server. Called launch for historical reasons, should be renamed to enqueue or something like that.
Parameters
- command_linestr
Command to execute.
- raw_check_complete(**kwargs)
- remote_setup(**kwargs)
- class pulsar.client.client.K8sMessageCoexecutionJobClient(destination_params, job_id, client_manager)[source]
Bases:
BaseMessageCoexecutionJobClient,LaunchesK8ContainersMixinA client that co-executes pods via Kubernetes and depends on amqp for status updates.
- class pulsar.client.client.K8sPollingCoexecutionJobClient(destination_params, job_id, client_manager)[source]
Bases:
BasePollingCoexecutionJobClient,LaunchesK8ContainersMixinA client that co-executes pods via Kubernetes and doesn’t depend on amqp.
- class pulsar.client.client.LaunchesAwsBatchContainersMixin(destination_params, job_id, client_manager)[source]
Bases:
CoexecutionLaunchMixin…
- execution_type: ExecutionType = 'sequential'
- class pulsar.client.client.LaunchesGcpContainersMixin(destination_params, job_id, client_manager)[source]
Bases:
CoexecutionLaunchMixin- ensure_library_available()
- execution_type: ExecutionType = 'parallel'
- class pulsar.client.client.LaunchesK8ContainersMixin(destination_params, job_id, client_manager)[source]
Bases:
CoexecutionLaunchMixinMixin to provide K8 launch and kill interaction.
- ensure_library_available()
- execution_type: ExecutionType = 'parallel'
- class pulsar.client.client.LaunchesTesContainersMixin(destination_params, job_id, client_manager)[source]
Bases:
CoexecutionLaunchMixin- ensure_library_available() None
- execution_type: ExecutionType = 'sequential'
- class pulsar.client.client.MessageCLIJobClient(destination_params, job_id, client_manager, shell)[source]
Bases:
BaseMessageJobClient
- class pulsar.client.client.MessageJobClient(destination_params, job_id, client_manager)[source]
Bases:
BaseMessageJobClient
- class pulsar.client.client.MessagingClientManagerProtocol(*args, **kwargs)[source]
Bases:
ClientManagerProtocol- status_cache: Dict[str, Dict[str, Any]]
- class pulsar.client.client.RelayJobClient(destination_params, job_id, client_manager)[source]
Bases:
BaseMessageJobClientClient that communicates with Pulsar via pulsar-relay.
This client posts control messages (setup, status, kill) to the relay, which are then consumed by the Pulsar server. File transfers happen directly between Pulsar and Galaxy via HTTP.
- get_status()[source]
Request job status by posting a status request message to the relay.
- Returns:
Cached status if available, None otherwise
- launch(command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, dynamic_file_sources=None, token_endpoint=None)[source]
Submit a job by posting a setup message to the relay.
- Args:
command_line: Command to execute on Pulsar dependencies_description: Tool dependencies env: Environment variables remote_staging: Remote staging configuration job_config: Job configuration dynamic_file_sources: Dynamic file sources token_endpoint: Token endpoint for file access
- Returns:
None (async operation)
- class pulsar.client.client.TesMessageCoexecutionJobClient(destination_params, job_id, client_manager)[source]
Bases:
BaseMessageCoexecutionJobClient,LaunchesTesContainersMixinA client that co-executes pods via GA4GH TES and doesn’t depend on amqp for status updates.
- class pulsar.client.client.TesPollingCoexecutionJobClient(destination_params, job_id, client_manager)[source]
Bases:
BasePollingCoexecutionJobClient,LaunchesTesContainersMixinA client that co-executes pods via GA4GH TES and depends on amqp for status updates.
pulsar.client.config_util module
Generic interface for reading YAML/INI/JSON config files into nested dictionaries.
pulsar.client.container_job_config module
Setup config objects for Pulsar client container jobs.
In a traditional batch Pulsar setup, job configuration is configured per destination by configuring the manager the Pulsar client connects to. In a container job setup, there is no Pulsar server component and the Pulsar client is responsible for configuring the job configuration. This module provides the necessary configuration objects and documents how to map Galaxy job environment configuration objects to the container scheduling infrastructure.
- class pulsar.client.container_job_config.BasicAuth(*, username: str, password: str)[source]
Bases:
BaseModel- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- password: str
- username: str
- class pulsar.client.container_job_config.CoexecutionContainerCommand(image, command, args, working_directory, ports)[source]
Bases:
NamedTuple- args: List[str]
Alias for field number 2
- command: str
Alias for field number 1
- image: str
Alias for field number 0
- ports: List[int] | None
Alias for field number 4
- working_directory: str
Alias for field number 3
- class pulsar.client.container_job_config.GcpJobParams(*, project_id: str, credentials_file: str | None = None, region: str = 'us-central1', walltime_limit: int = 86400, retry_count: int = 2, ssd_name: str | None = None, disk_size: int = 375, machine_type: str = 'n1-standard-1', labels: Dict[str, str] | None = None)[source]
Bases:
BaseModel- credentials_file: str | None
- disk_size: int
- labels: Dict[str, str] | None
- machine_type: str
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- project_id: str
- region: str
- retry_count: int
- ssd_name: str | None
- walltime_limit: int
- class pulsar.client.container_job_config.TesJobParams(*, cpu_cores: int | None = None, preemptible: bool | None = None, ram_gb: float | None = None, disk_gb: float | None = None, zones: List[str] | None = None, backend_parameters: Dict[str, str] | None = None, backend_parameters_strict: bool | None = False, tes_url: str, authorization: Literal['none', 'basic'] = 'none', basic_auth: BasicAuth | None = None)[source]
Bases:
TesResources- authorization: Literal['none', 'basic']
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- tes_url: str
- pulsar.client.container_job_config.attribute_docs(gcp_class_name: str, attribute: str) str | None[source]
Extracts the documentation string for a given attribute from a class docstring.
- Args:
cls: The class object containing the docstring. attr_name: The attribute name to extract documentation for.
- Returns:
A string containing the attribute’s documentation, or None if not found.
- pulsar.client.container_job_config.container_command_to_gcp_runnable(name: str, container: CoexecutionContainerCommand) Runnable[source]
- pulsar.client.container_job_config.gcp_galaxy_instance_id(destination_params: Dict[str, str]) str | None[source]
- pulsar.client.container_job_config.gcp_job_request(params: GcpJobParams, job: Job, job_name: str) CreateJobRequest[source]
- pulsar.client.container_job_config.gcp_job_template(params: GcpJobParams) Job[source]
- pulsar.client.container_job_config.parse_gcp_job_params(params: dict) GcpJobParams[source]
Parse GCP job parameters from a dictionary (e.g., Galaxy’s job destination/environment params).
- pulsar.client.container_job_config.parse_tes_job_params(params: dict) TesJobParams[source]
Parse GCP job parameters parameters from a dictionary (e.g., Galaxy’s job destination/environment params).
- pulsar.client.container_job_config.tes_client_from_params(tes_params: TesJobParams) TesClient[source]
- pulsar.client.container_job_config.tes_resources(tes_params: TesJobParams) TesResources[source]
pulsar.client.decorators module
pulsar.client.destination module
- pulsar.client.destination.submit_params(destination_params)[source]
>>> destination_params = {"private_token": "12345", "submit_native_specification": "-q batch"} >>> result = submit_params(destination_params) >>> result {'native_specification': '-q batch'}
- pulsar.client.destination.url_to_destination_params(url)[source]
Convert a legacy runner URL to a job destination
>>> params_simple = url_to_destination_params("http://localhost:8913/") >>> params_simple["url"] 'http://localhost:8913/' >>> params_simple["private_token"] is None True >>> advanced_url = "https://1234x@example.com:8914/managers/longqueue" >>> params_advanced = url_to_destination_params(advanced_url) >>> params_advanced["url"] 'https://example.com:8914/managers/longqueue/' >>> params_advanced["private_token"] '1234x' >>> runner_url = "pulsar://http://localhost:8913/" >>> runner_params = url_to_destination_params(runner_url) >>> runner_params['url'] 'http://localhost:8913/'
pulsar.client.exceptions module
Pulsar client exceptions
- exception pulsar.client.exceptions.PulsarClientTransportError(code=None, message=None, transport_code=None, transport_message=None)[source]
Bases:
Exception- CONNECTION_REFUSED = 'connection_refused'
- INVALID_CODE_MESSAGE = 'Unknown transport error code: %s'
- TIMEOUT = 'timeout'
- UNKNOWN = 'unknown'
- messages = {'connection_refused': 'Connection refused', 'timeout': 'Connection timed out', 'unknown': 'Unknown transport error'}
pulsar.client.job_directory module
- class pulsar.client.job_directory.RemoteJobDirectory(remote_staging_directory, remote_id, remote_sep)[source]
Bases:
objectRepresentation of a (potentially) remote Pulsar-style staging directory.
- calculate_path(remote_relative_path, input_type)[source]
Only for used by Pulsar client, should override for managers to enforce security and make the directory if needed.
- property path
- property separator
- pulsar.client.job_directory.get_mapped_file(directory, remote_path, allow_nested_files=False, local_path_module=<module 'posixpath' (frozen)>, mkdir=True, allow_globs=False)[source]
>>> import ntpath >>> get_mapped_file(r'C:\pulsar\staging\101', 'dataset_1_files/moo/cow', allow_nested_files=True, local_path_module=ntpath, mkdir=False) 'C:\\pulsar\\staging\\101\\dataset_1_files\\moo\\cow' >>> get_mapped_file(r'C:\pulsar\staging\101', 'dataset_1_files/moo/cow', allow_nested_files=False, local_path_module=ntpath) 'C:\\pulsar\\staging\\101\\cow' >>> get_mapped_file(r'C:\pulsar\staging\101', '../cow', allow_nested_files=True, local_path_module=ntpath, mkdir=False) Traceback (most recent call last): Exception: Attempt to read or write file outside an authorized directory.
pulsar.client.manager module
Entry point for client creation.
build_client_manager in particular is the abstraction that should be used
to create a ClientManager, that in return can create Pulsar clients for
specific actions.
- class pulsar.client.manager.ClientManager(job_manager: ManagerInterface | None = None, **kwds: Dict[str, Any])[source]
Bases:
ClientManagerInterfaceFactory class to create Pulsar clients.
This class was introduced for classes of clients that need to potential share state between multiple client connections.
- client_class: Type[BaseJobClient]
- get_client(destination_params, job_id, **kwargs)[source]
Build a client given specific destination parameters and job_id.
- job_manager_interface_class: Type[PulsarInterface]
- class pulsar.client.manager.HttpPulsarInterface(destination_params, transport)[source]
Bases:
PulsarInterface- execute(command, args=None, data=None, input_path=None, output_path=None)[source]
Execute the correspond command against configured Pulsar job manager. Arguments are method parameters and data or input_path describe essentially POST bodies. If command results in a file, resulting path should be specified as output_path.
pulsar.client.object_client module
- class pulsar.client.object_client.ObjectStoreClient(pulsar_interface)[source]
Bases:
object- create(**kwargs)
- delete(**kwargs)
- empty(**kwargs)
- exists(**kwargs)
- file_ready(**kwargs)
- get_data(**kwargs)
- get_filename(**kwargs)
- get_store_usage_percent(**kwargs)
- size(**kwargs)
- update_from_file(**kwargs)
pulsar.client.path_mapper module
- class pulsar.client.path_mapper.PathMapper(client, remote_job_config, local_working_directory, action_mapper=None)[source]
Bases:
objectTies together a FileActionMapper and remote job configuration returned by the Pulsar setup method to pre-determine the location of files for staging on the remote Pulsar server.
This is not useful when rewrite_paths (as has traditionally been done with the Pulsar) because when doing that the Pulsar determines the paths as files are uploaded. When rewrite_paths is disabled however, the destination of files needs to be determined prior to transfer so an object of this class can be used.
pulsar.client.relay_auth module
JWT authentication manager for pulsar-relay.
Handles token acquisition, caching, and automatic refresh.
- class pulsar.client.relay_auth.RelayAuthManager(relay_url: str, username: str, password: str)[source]
Bases:
objectManages JWT authentication tokens for pulsar-relay communication.
Features: - Thread-safe token caching - Automatic token refresh before expiry - Lazy authentication (only authenticates when needed)
pulsar.client.server_interface module
- class pulsar.client.server_interface.HttpPulsarInterface(destination_params, transport)[source]
Bases:
PulsarInterface- execute(command, args=None, data=None, input_path=None, output_path=None)[source]
Execute the correspond command against configured Pulsar job manager. Arguments are method parameters and data or input_path describe essentially POST bodies. If command results in a file, resulting path should be specified as output_path.
- class pulsar.client.server_interface.LocalPulsarInterface(destination_params, job_manager=None, pulsar_app=None, file_cache=None, object_store=None)[source]
Bases:
PulsarInterface- execute(command, args=None, data=None, input_path=None, output_path=None)[source]
Execute the correspond command against configured Pulsar job manager. Arguments are method parameters and data or input_path describe essentially POST bodies. If command results in a file, resulting path should be specified as output_path.
- class pulsar.client.server_interface.PulsarInterface[source]
Bases:
objectAbstract base class describes how synchronous client communicates with (potentially remote) Pulsar procedures. Obvious implementation is HTTP based but Pulsar objects wrapped in routes can also be directly communicated with if in memory.
- abstractmethod execute(command, args=None, data=None, input_path=None, output_path=None)[source]
Execute the correspond command against configured Pulsar job manager. Arguments are method parameters and data or input_path describe essentially POST bodies. If command results in a file, resulting path should be specified as output_path.
pulsar.client.setup_handler module
pulsar.client.util module
- class pulsar.client.util.ClientJsonEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]
Bases:
JSONEncoder- default(obj)[source]
Implement this method in a subclass such that it returns a serializable object for
o, or calls the base implementation (to raise aTypeError).For example, to support arbitrary iterators, you could implement default like this:
def default(self, o): try: iterable = iter(o) except TypeError: pass else: return list(iterable) # Let the base class default method raise the TypeError return super().default(o)
- class pulsar.client.util.MessageQueueUUIDStore(persistence_directory, subdirs=None)[source]
Bases:
objectPersistent dict-like object for persisting message queue UUIDs that are awaiting acknowledgement or that have been operated on.
- class pulsar.client.util.MonitorStyle(value)[source]
Bases:
str,Enum- BACKGROUND = 'background'
- FOREGROUND = 'foreground'
- NONE = 'none'
- class pulsar.client.util.PathHelper(separator, local_path_module=<module 'posixpath' (frozen)>)[source]
Bases:
object>>> import posixpath >>> # Forcing local path to posixpath because Pulsar designed to be used with >>> # posix client. >>> posix_path_helper = PathHelper("/", local_path_module=posixpath) >>> windows_slash = "\\" >>> len(windows_slash) 1 >>> nt_path_helper = PathHelper(windows_slash, local_path_module=posixpath) >>> posix_path_helper.remote_name("moo/cow") 'moo/cow' >>> nt_path_helper.remote_name("moo/cow") 'moo\\cow' >>> posix_path_helper.local_name("moo/cow") 'moo/cow' >>> nt_path_helper.local_name("moo\\cow") 'moo/cow' >>> posix_path_helper.from_posix_with_new_base("/galaxy/data/bowtie/hg19.fa", "/galaxy/data/", "/work/galaxy/data") '/work/galaxy/data/bowtie/hg19.fa' >>> posix_path_helper.from_posix_with_new_base("/galaxy/data/bowtie/hg19.fa", "/galaxy/data", "/work/galaxy/data") '/work/galaxy/data/bowtie/hg19.fa' >>> posix_path_helper.from_posix_with_new_base("/galaxy/data/bowtie/hg19.fa", "/galaxy/data", "/work/galaxy/data/") '/work/galaxy/data/bowtie/hg19.fa'
- pulsar.client.util.copy(source, destination)[source]
Copy file from source to destination if needed (skip if source is destination).
- pulsar.client.util.directory_files(directory)[source]
>>> from tempfile import mkdtemp >>> from shutil import rmtree >>> from os.path import join >>> from os import makedirs >>> tempdir = mkdtemp() >>> with open(join(tempdir, "moo"), "w") as f: pass >>> directory_files(tempdir) ['moo'] >>> subdir = join(tempdir, "cow", "sub1") >>> makedirs(subdir) >>> with open(join(subdir, "subfile1"), "w") as f: pass >>> with open(join(subdir, "subfile2"), "w") as f: pass >>> sorted(directory_files(tempdir)) ['cow/sub1/subfile1', 'cow/sub1/subfile2', 'moo'] >>> rmtree(tempdir)
Module contents
pulsar client
This module contains logic for interfacing with an external Pulsar server.
Configuring Galaxy
Galaxy job runners are configured in Galaxy’s job_conf.xml file. See job_conf.xml.sample_advanced
in your Galaxy code base or on
Github
for information on how to configure Galaxy to interact with the Pulsar.
Galaxy also supports an older, less rich configuration of job runners directly
in its main galaxy.ini file. The following section describes how to
configure Galaxy to communicate with the Pulsar in this legacy mode.
Legacy
A Galaxy tool can be configured to be executed remotely via Pulsar by
adding a line to the galaxy.ini file under the galaxy:tool_runners
section with the format:
<tool_id> = pulsar://http://<pulsar_host>:<pulsar_port>
As an example, if a host named remotehost is running the Pulsar server
application on port 8913, then the tool with id test_tool can
be configured to run remotely on remotehost by adding the following
line to galaxy.ini:
test_tool = pulsar://http://remotehost:8913
Remember this must be added after the [galaxy:tool_runners] header
in the galaxy.ini file.
- class pulsar.client.ClientInput(path, input_type, object_store_ref=None)[source]
Bases:
object- property action_source
- class pulsar.client.ClientInputs(client_inputs)[source]
Bases:
objectAbstraction describing input datasets for a job.
- class pulsar.client.ClientJobDescription(command_line, tool=None, config_files=None, input_files=None, client_inputs=None, client_outputs=None, working_directory=None, metadata_directory=None, dependencies_description=None, env=[], arbitrary_files=None, job_directory_files=None, tool_directory_required_files: RequiredFiles | None = None, rewrite_paths=True, touch_outputs=None, container=None, remote_pulsar_app_config=None, guest_ports=None)[source]
Bases:
objectA description of how client views job - command_line, inputs, etc..
Parameters
- command_linestr
The local command line to execute, this will be rewritten for the remote server.
- tool :
The Galaxy tool to execute.
- config_fileslist
List of Galaxy ‘configfile’s produced for this job. These will be rewritten and sent to remote server.
- input_fileslist
List of input files used by job. These will be transferred and references rewritten.
- client_outputsClientOutputs
Description of outputs produced by job (at least output files along with optional version string and working directory outputs.
- working_directorystr
Local path created by Galaxy for running this job (job_wrapper.tool_working_directory).
- metadata_directorystr
Local path created by Galaxy for running this job (job_wrapper.working_directory).
- dependencies_descriptionlist
galaxy.tools.deps.dependencies.DependencyDescription object describing tool dependency context for remote depenency resolution.
- env: list
List of dict object describing environment variables to populate.
- arbitrary_filesdict()
Additional non-input, non-tool, non-config, non-working directory files to transfer before staging job. This is most likely data indices but can be anything. For now these are copied into staging working directory but this will be reworked to find a better, more robust location.
- rewrite_pathsboolean
Indicates whether paths should be rewritten in job inputs (command_line and config files) while staging files).
- property input_files
- property output_files
- property tool_dependencies
- property version_file
- class pulsar.client.ClientOutputs(working_directory=None, output_files=[], work_dir_outputs=None, version_file=None, dynamic_outputs=None, metadata_directory=None, job_directory=None, dynamic_file_sources=None, dataset_collector_descriptions=None)[source]
Bases:
objectAbstraction describing the output datasets EXPECTED by the Galaxy job runner client.
- class pulsar.client.PathMapper(client, remote_job_config, local_working_directory, action_mapper=None)[source]
Bases:
objectTies together a FileActionMapper and remote job configuration returned by the Pulsar setup method to pre-determine the location of files for staging on the remote Pulsar server.
This is not useful when rewrite_paths (as has traditionally been done with the Pulsar) because when doing that the Pulsar determines the paths as files are uploaded. When rewrite_paths is disabled however, the destination of files needs to be determined prior to transfer so an object of this class can be used.
- exception pulsar.client.PulsarClientTransportError(code=None, message=None, transport_code=None, transport_message=None)[source]
Bases:
Exception- CONNECTION_REFUSED = 'connection_refused'
- INVALID_CODE_MESSAGE = 'Unknown transport error code: %s'
- TIMEOUT = 'timeout'
- UNKNOWN = 'unknown'
- messages = {'connection_refused': 'Connection refused', 'timeout': 'Connection timed out', 'unknown': 'Unknown transport error'}
- class pulsar.client.PulsarOutputs(working_directory_contents, output_directory_contents, metadata_directory_contents, job_directory_contents, remote_separator='/', realized_dynamic_file_sources=None)[source]
Bases:
objectAbstraction describing the output files PRODUCED by the remote Pulsar server.
- pulsar.client.build_client_manager(job_manager: ManagerInterface | None = None, relay_url: str | None = None, relay_username: str | None = None, relay_password: str | None = None, relay_topic_prefix: str | None = None, amqp_url: str | None = None, k8s_enabled: bool | None = None, tes_enabled: bool | None = None, gcp_batch_enabled: bool | None = None, **kwargs) ClientManagerInterface[source]
- pulsar.client.finish_job(client, cleanup_job, job_completed_normally, client_outputs, pulsar_outputs)[source]
Process for “un-staging” a complete Pulsar job.
This function is responsible for downloading results from remote server and cleaning up Pulsar staging directory (if needed.)
- pulsar.client.submit_job(client, client_job_description: ClientJobDescription, job_config=None)[source]
- pulsar.client.url_to_destination_params(url)[source]
Convert a legacy runner URL to a job destination
>>> params_simple = url_to_destination_params("http://localhost:8913/") >>> params_simple["url"] 'http://localhost:8913/' >>> params_simple["private_token"] is None True >>> advanced_url = "https://1234x@example.com:8914/managers/longqueue" >>> params_advanced = url_to_destination_params(advanced_url) >>> params_advanced["url"] 'https://example.com:8914/managers/longqueue/' >>> params_advanced["private_token"] '1234x' >>> runner_url = "pulsar://http://localhost:8913/" >>> runner_params = url_to_destination_params(runner_url) >>> runner_params['url'] 'http://localhost:8913/'