pulsar.managers.util package

Submodules

pulsar.managers.util.aws_batch module

Interface layer for Boto’s batch library shared between Galaxy and Pulsar.

pulsar.managers.util.aws_batch.ensure_client_available() → None[source]
pulsar.managers.util.aws_batch.get_client(access_key_id=None, secret_access_key=None, region_name=None)[source]

pulsar.managers.util.env module

pulsar.managers.util.env.env_to_statement(env)[source]

Return the abstraction description of an environment variable definition into a statement for shell script.

>>> env_to_statement(dict(name='X', value='Y'))
'X="Y"; export X'
>>> env_to_statement(dict(name='X', value='Y', raw=True))
'X=Y; export X'
>>> env_to_statement(dict(name='X', value='"A","B","C"'))
'X="\\"A\\",\\"B\\",\\"C\\""; export X'
>>> env_to_statement(dict(file="Y"))
'. "Y"'
>>> env_to_statement(dict(file="'RAW $FILE'", raw=True))
". 'RAW $FILE'"
>>> # Source file takes precedence
>>> env_to_statement(dict(name='X', value='"A","B","C"', file="S"))
'. "S"'
>>> env_to_statement(dict(execute="module load java/1.5.1"))
'module load java/1.5.1'

pulsar.managers.util.external module

pulsar.managers.util.external.parse_external_id(output, type=None)[source]

Attempt to parse the output of job submission commands for an external id.__doc__

>>> parse_external_id("12345.pbsmanager")
'12345.pbsmanager'
>>> parse_external_id('Submitted batch job 185')
'185'
>>> parse_external_id('Submitted batch job 185', type='torque')
'Submitted batch job 185'
>>> parse_external_id('submitted to cluster 125.')
'125'
>>> parse_external_id('submitted to cluster 125.', type='slurm')
>>>

pulsar.managers.util.kill module

pulsar.managers.util.kill.kill_pid(pid: int, use_psutil: bool = True)[source]

pulsar.managers.util.process_groups module

pulsar.managers.util.process_groups.check_pg(pgid)[source]

Check whether processes in process group pgid are still alive.

pulsar.managers.util.process_groups.kill_pg(pgid)[source]

Kill all processes in process group pgid.

pulsar.managers.util.pykube_util module

Interface layer for pykube library shared between Galaxy and Pulsar.

pulsar.managers.util.pykube_util.ensure_pykube()[source]
pulsar.managers.util.pykube_util.find_job_object_by_name(pykube_api, job_name, namespace=None)[source]
pulsar.managers.util.pykube_util.find_pod_object_by_name(pykube_api, pod_name, namespace=None)[source]
pulsar.managers.util.pykube_util.galaxy_instance_id(params)[source]

Parse and validate the id of the Galaxy instance from supplied dict.

This will be added to Jobs and Pods names, so it needs to be DNS friendly, this means: The Internet standards (Requests for Comments) for protocols mandate that component hostname labels may contain only the ASCII letters ‘a’ through ‘z’ (in a case-insensitive manner), the digits ‘0’ through ‘9’, and the minus sign (‘-‘).

It looks for the value set on params[‘k8s_galaxy_instance_id’], which might or not be set. The idea behind this is to allow the Galaxy instance to trust (or not) existing k8s Jobs and Pods that match the setup of a Job that is being recovered or restarted after a downtime/reboot.

pulsar.managers.util.pykube_util.job_object_dict(params, job_name, spec)[source]
pulsar.managers.util.pykube_util.produce_unique_k8s_job_name(app_prefix=None, instance_id=None, job_id=None)[source]
pulsar.managers.util.pykube_util.pull_policy(params)[source]
pulsar.managers.util.pykube_util.pykube_client_from_dict(params)[source]
pulsar.managers.util.pykube_util.stop_job(job, cleanup='always')[source]

pulsar.managers.util.retry module

class pulsar.managers.util.retry.RetryActionExecutor(**kwds)[source]

Bases: object

execute(action, description=None)[source]

pulsar.managers.util.sudo module

pulsar.managers.util.sudo.sudo_popen(*args, **kwargs)[source]

Helper method for building and executing Popen command. This is potentially sensetive code so should probably be centralized.

pulsar.managers.util.tes module

pulsar.managers.util.tes.ensure_tes_client() → None[source]
class pulsar.managers.util.tes.TesClient(url: str)[source]

Bases: object

cancel_task(has_task_id: Union[pydantictes.models.TesCreateTaskResponse, pydantictes.models.TesTask, str]) → pydantictes.models.TesCancelTaskResponse[source]
create_task(task: pydantictes.models.TesTask) → pydantictes.models.TesCreateTaskResponse[source]
get_task(has_task_id: Union[pydantictes.models.TesCreateTaskResponse, pydantictes.models.TesTask, str], view) → pydantictes.models.TesTask[source]
list_tasks() → pydantictes.models.TesListTasksResponse[source]
service_info() → pydantictes.models.TesServiceInfo[source]
class pulsar.managers.util.tes.TesExecutor[source]

Bases: pydantic.main.BaseModel

class pulsar.managers.util.tes.TesResources[source]

Bases: pydantic.main.BaseModel

class pulsar.managers.util.tes.TesState[source]

Bases: enum.Enum

An enumeration.

CANCELED = 'CANCELED'
COMPLETE = 'COMPLETE'
EXECUTOR_ERROR = 'EXECUTOR_ERROR'
INITIALIZING = 'INITIALIZING'
PAUSED = 'PAUSED'
QUEUED = 'QUEUED'
RUNNING = 'RUNNING'
SYSTEM_ERROR = 'SYSTEM_ERROR'
UNKNOWN = 'UNKNOWN'
class pulsar.managers.util.tes.TesTask[source]

Bases: pydantic.main.BaseModel

Module contents

This module and its submodules contains utilities for running external processes and interfacing with job managers. This module should contain functionality shared between Galaxy and the Pulsar.

pulsar.managers.util.kill_pid(pid: int, use_psutil: bool = True)[source]