pulsar.managers.util package

Subpackages

Submodules

pulsar.managers.util.aws_batch module

Interface layer for Boto’s batch library shared between Galaxy and Pulsar.

pulsar.managers.util.aws_batch.ensure_client_available() None[source]
pulsar.managers.util.aws_batch.get_client(access_key_id=None, secret_access_key=None, region_name=None)[source]

pulsar.managers.util.env module

pulsar.managers.util.env.env_to_statement(env)[source]

Return the abstraction description of an environment variable definition into a statement for shell script.

>>> env_to_statement(dict(name='X', value='Y'))
'X="Y"; export X'
>>> env_to_statement(dict(name='X', value='Y', raw=True))
'X=Y; export X'
>>> env_to_statement(dict(name='X', value='"A","B","C"'))
'X="\\"A\\",\\"B\\",\\"C\\""; export X'
>>> env_to_statement(dict(file="Y"))
'. "Y"'
>>> env_to_statement(dict(file="'RAW $FILE'", raw=True))
". 'RAW $FILE'"
>>> # Source file takes precedence
>>> env_to_statement(dict(name='X', value='"A","B","C"', file="S"))
'. "S"'
>>> env_to_statement(dict(execute="module load java/1.5.1"))
'module load java/1.5.1'

pulsar.managers.util.external module

pulsar.managers.util.external.parse_external_id(output, type=None)[source]

Attempt to parse the output of job submission commands for an external id.__doc__

>>> parse_external_id("12345.pbsmanager")
'12345.pbsmanager'
>>> parse_external_id('Submitted batch job 185')
'185'
>>> parse_external_id('Submitted batch job 185', type='torque')
'Submitted batch job 185'
>>> parse_external_id('submitted to cluster 125.')
'125'
>>> parse_external_id('submitted to cluster 125.', type='slurm')
>>>

pulsar.managers.util.kill module

pulsar.managers.util.kill.kill_pid(pid: int, use_psutil: bool = True)[source]

pulsar.managers.util.process_groups module

pulsar.managers.util.process_groups.check_pg(pgid)[source]

Check whether processes in process group pgid are still alive.

pulsar.managers.util.process_groups.kill_pg(pgid)[source]

Kill all processes in process group pgid.

pulsar.managers.util.pykube_util module

Interface layer for pykube library shared between Galaxy and Pulsar.

pulsar.managers.util.pykube_util.ensure_pykube()[source]
pulsar.managers.util.pykube_util.find_job_object_by_name(pykube_api, job_name, namespace=None)[source]
pulsar.managers.util.pykube_util.find_pod_object_by_name(pykube_api, pod_name, namespace=None)[source]
pulsar.managers.util.pykube_util.galaxy_instance_id(params)[source]

Parse and validate the id of the Galaxy instance from supplied dict.

This will be added to Jobs and Pods names, so it needs to be DNS friendly, this means: The Internet standards (Requests for Comments) for protocols mandate that component hostname labels may contain only the ASCII letters ‘a’ through ‘z’ (in a case-insensitive manner), the digits ‘0’ through ‘9’, and the minus sign (‘-‘).

It looks for the value set on params[‘k8s_galaxy_instance_id’], which might or not be set. The idea behind this is to allow the Galaxy instance to trust (or not) existing k8s Jobs and Pods that match the setup of a Job that is being recovered or restarted after a downtime/reboot.

pulsar.managers.util.pykube_util.job_object_dict(params, job_name, spec)[source]
pulsar.managers.util.pykube_util.produce_unique_k8s_job_name(app_prefix=None, instance_id=None, job_id=None)[source]
pulsar.managers.util.pykube_util.pull_policy(params)[source]
pulsar.managers.util.pykube_util.pykube_client_from_dict(params)[source]
pulsar.managers.util.pykube_util.stop_job(job, cleanup='always')[source]

pulsar.managers.util.retry module

class pulsar.managers.util.retry.RetryActionExecutor(**kwds)[source]

Bases: object

execute(action, description=None)[source]

pulsar.managers.util.sudo module

pulsar.managers.util.sudo.sudo_popen(*args, **kwargs)[source]

Helper method for building and executing Popen command. This is potentially sensetive code so should probably be centralized.

pulsar.managers.util.tes module

class pulsar.managers.util.tes.TesClient(url: str)[source]

Bases: object

cancel_task(has_task_id: TesCreateTaskResponse | TesTask | str) TesCancelTaskResponse[source]
create_task(task: TesTask) TesCreateTaskResponse[source]
get_task(has_task_id: TesCreateTaskResponse | TesTask | str, view) TesTask[source]
list_tasks() TesListTasksResponse[source]
service_info() TesServiceInfo[source]
class pulsar.managers.util.tes.TesExecutor(*, image: str, command: List[str], workdir: str | None = None, stdin: str | None = None, stdout: str | None = None, stderr: str | None = None, env: Dict[str, str] | None = None)[source]

Bases: BaseModel

command: List[str]
env: Dict[str, str] | None
image: str
stderr: str | None
stdin: str | None
stdout: str | None
workdir: str | None
class pulsar.managers.util.tes.TesResources(*, cpu_cores: int | None = None, preemptible: bool | None = None, ram_gb: float | None = None, disk_gb: float | None = None, zones: List[str] | None = None, backend_parameters: Dict[str, str] | None = None, backend_parameters_strict: bool | None = False)[source]

Bases: BaseModel

backend_parameters: Dict[str, str] | None
backend_parameters_strict: bool | None
cpu_cores: int | None
disk_gb: float | None
preemptible: bool | None
ram_gb: float | None
zones: List[str] | None
class pulsar.managers.util.tes.TesState(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

CANCELED = 'CANCELED'
COMPLETE = 'COMPLETE'
EXECUTOR_ERROR = 'EXECUTOR_ERROR'
INITIALIZING = 'INITIALIZING'
PAUSED = 'PAUSED'
QUEUED = 'QUEUED'
RUNNING = 'RUNNING'
SYSTEM_ERROR = 'SYSTEM_ERROR'
UNKNOWN = 'UNKNOWN'
class pulsar.managers.util.tes.TesTask(*, id: str | None = None, state: TesState | None = None, name: str | None = None, description: str | None = None, inputs: List[TesInput] | None = None, outputs: List[TesOutput] | None = None, resources: TesResources | None = None, executors: List[TesExecutor], volumes: List[str] | None = None, tags: Dict[str, str] | None = None, logs: List[TesTaskLog] | None = None, creation_time: str | None = None)[source]

Bases: BaseModel

creation_time: str | None
description: str | None
executors: List[TesExecutor]
id: str | None
inputs: List[TesInput] | None
logs: List[TesTaskLog] | None
name: str | None
outputs: List[TesOutput] | None
resources: TesResources | None
state: TesState | None
tags: Dict[str, str] | None
volumes: List[str] | None
pulsar.managers.util.tes.ensure_tes_client() None[source]

Module contents

This module and its submodules contains utilities for running external processes and interfacing with job managers. This module should contain functionality shared between Galaxy and the Pulsar.

pulsar.managers.util.kill_pid(pid: int, use_psutil: bool = True)[source]