pulsar.managers.util package¶
Subpackages¶
- pulsar.managers.util.cli package
- Subpackages
- pulsar.managers.util.cli.job package
- pulsar.managers.util.cli.shell package
- Submodules
- pulsar.managers.util.cli.factory module
- Module contents
- Subpackages
- pulsar.managers.util.condor package
- pulsar.managers.util.drmaa package
- pulsar.managers.util.job_script package
Submodules¶
pulsar.managers.util.aws_batch module¶
Interface layer for Boto’s batch library shared between Galaxy and Pulsar.
pulsar.managers.util.env module¶
- pulsar.managers.util.env.env_to_statement(env)[source]¶
Return the abstraction description of an environment variable definition into a statement for shell script.
>>> env_to_statement(dict(name='X', value='Y')) 'X="Y"; export X' >>> env_to_statement(dict(name='X', value='Y', raw=True)) 'X=Y; export X' >>> env_to_statement(dict(name='X', value='"A","B","C"')) 'X="\\"A\\",\\"B\\",\\"C\\""; export X' >>> env_to_statement(dict(file="Y")) '. "Y"' >>> env_to_statement(dict(file="'RAW $FILE'", raw=True)) ". 'RAW $FILE'" >>> # Source file takes precedence >>> env_to_statement(dict(name='X', value='"A","B","C"', file="S")) '. "S"' >>> env_to_statement(dict(execute="module load java/1.5.1")) 'module load java/1.5.1'
pulsar.managers.util.external module¶
- pulsar.managers.util.external.parse_external_id(output, type=None)[source]¶
Attempt to parse the output of job submission commands for an external id.__doc__
>>> parse_external_id("12345.pbsmanager") '12345.pbsmanager' >>> parse_external_id('Submitted batch job 185') '185' >>> parse_external_id('Submitted batch job 185', type='torque') 'Submitted batch job 185' >>> parse_external_id('submitted to cluster 125.') '125' >>> parse_external_id('submitted to cluster 125.', type='slurm') >>>
pulsar.managers.util.kill module¶
pulsar.managers.util.process_groups module¶
pulsar.managers.util.pykube_util module¶
Interface layer for pykube library shared between Galaxy and Pulsar.
- pulsar.managers.util.pykube_util.find_job_object_by_name(pykube_api, job_name, namespace=None)[source]¶
- pulsar.managers.util.pykube_util.find_pod_object_by_name(pykube_api, pod_name, namespace=None)[source]¶
- pulsar.managers.util.pykube_util.galaxy_instance_id(params)[source]¶
Parse and validate the id of the Galaxy instance from supplied dict.
This will be added to Jobs and Pods names, so it needs to be DNS friendly, this means: The Internet standards (Requests for Comments) for protocols mandate that component hostname labels may contain only the ASCII letters ‘a’ through ‘z’ (in a case-insensitive manner), the digits ‘0’ through ‘9’, and the minus sign (‘-‘).
It looks for the value set on params[‘k8s_galaxy_instance_id’], which might or not be set. The idea behind this is to allow the Galaxy instance to trust (or not) existing k8s Jobs and Pods that match the setup of a Job that is being recovered or restarted after a downtime/reboot.
pulsar.managers.util.retry module¶
pulsar.managers.util.sudo module¶
pulsar.managers.util.tes module¶
- class pulsar.managers.util.tes.TesExecutor(*, image: str, command: List[str], workdir: str | None = None, stdin: str | None = None, stdout: str | None = None, stderr: str | None = None, env: Dict[str, str] | None = None)[source]¶
Bases:
BaseModel
- command: List[str]¶
- env: Dict[str, str] | None¶
- image: str¶
- stderr: str | None¶
- stdin: str | None¶
- stdout: str | None¶
- workdir: str | None¶
- class pulsar.managers.util.tes.TesResources(*, cpu_cores: int | None = None, preemptible: bool | None = None, ram_gb: float | None = None, disk_gb: float | None = None, zones: List[str] | None = None, backend_parameters: Dict[str, str] | None = None, backend_parameters_strict: bool | None = False)[source]¶
Bases:
BaseModel
- backend_parameters: Dict[str, str] | None¶
- backend_parameters_strict: bool | None¶
- cpu_cores: int | None¶
- disk_gb: float | None¶
- preemptible: bool | None¶
- ram_gb: float | None¶
- zones: List[str] | None¶
- class pulsar.managers.util.tes.TesState(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
Bases:
Enum
- CANCELED = 'CANCELED'¶
- COMPLETE = 'COMPLETE'¶
- EXECUTOR_ERROR = 'EXECUTOR_ERROR'¶
- INITIALIZING = 'INITIALIZING'¶
- PAUSED = 'PAUSED'¶
- QUEUED = 'QUEUED'¶
- RUNNING = 'RUNNING'¶
- SYSTEM_ERROR = 'SYSTEM_ERROR'¶
- UNKNOWN = 'UNKNOWN'¶
- class pulsar.managers.util.tes.TesTask(*, id: str | None = None, state: TesState | None = None, name: str | None = None, description: str | None = None, inputs: List[TesInput] | None = None, outputs: List[TesOutput] | None = None, resources: TesResources | None = None, executors: List[TesExecutor], volumes: List[str] | None = None, tags: Dict[str, str] | None = None, logs: List[TesTaskLog] | None = None, creation_time: str | None = None)[source]¶
Bases:
BaseModel
- creation_time: str | None¶
- description: str | None¶
- executors: List[TesExecutor]¶
- id: str | None¶
- inputs: List[TesInput] | None¶
- logs: List[TesTaskLog] | None¶
- name: str | None¶
- outputs: List[TesOutput] | None¶
- resources: TesResources | None¶
- tags: Dict[str, str] | None¶
- volumes: List[str] | None¶
Module contents¶
This module and its submodules contains utilities for running external processes and interfacing with job managers. This module should contain functionality shared between Galaxy and the Pulsar.