pulsar.managers.util package
Subpackages
Submodules
pulsar.managers.util.aws_batch module
Interface layer for Boto’s batch library shared between Galaxy and Pulsar.
pulsar.managers.util.env module
- pulsar.managers.util.env.env_to_statement(env)[source]
Return the abstraction description of an environment variable definition into a statement for shell script.
>>> env_to_statement(dict(name='X', value='Y')) 'X="Y"; export X' >>> env_to_statement(dict(name='X', value='Y', raw=True)) 'X=Y; export X' >>> env_to_statement(dict(name='X', value='"A","B","C"')) 'X="\\"A\\",\\"B\\",\\"C\\""; export X' >>> env_to_statement(dict(file="Y")) '. "Y"' >>> env_to_statement(dict(file="'RAW $FILE'", raw=True)) ". 'RAW $FILE'" >>> # Source file takes precedence >>> env_to_statement(dict(name='X', value='"A","B","C"', file="S")) '. "S"' >>> env_to_statement(dict(execute="module load java/1.5.1")) 'module load java/1.5.1'
pulsar.managers.util.external module
- pulsar.managers.util.external.parse_external_id(output, type=None)[source]
Attempt to parse the output of job submission commands for an external id.__doc__
>>> parse_external_id("12345.pbsmanager") '12345.pbsmanager' >>> parse_external_id('Submitted batch job 185') '185' >>> parse_external_id('Submitted batch job 185', type='torque') 'Submitted batch job 185' >>> parse_external_id('submitted to cluster 125.') '125' >>> parse_external_id('submitted to cluster 125.', type='slurm') >>>
pulsar.managers.util.gcp_util module
- pulsar.managers.util.gcp_util.delete_gcp_job(project_id: str, region: str, job_name: str, credentials_file: str | None = None) Any[source]
- pulsar.managers.util.gcp_util.get_gcp_job(project_id: str, region: str, job_name: str, credentials_file: str | None = None) Job[source]
Retrieve a GCP Batch job by its name.
- Args:
project_id: GCP project ID. region: GCP region where the job is located. job_name: Name of the job to retrieve. credentials_file: Path to GCP service account credentials file (optional).
- Returns:
The GCP Batch job object.
pulsar.managers.util.kill module
pulsar.managers.util.process_groups module
pulsar.managers.util.pykube_util module
Interface layer for pykube library shared between Galaxy and Pulsar.
- pulsar.managers.util.pykube_util.find_job_object_by_name(pykube_api, job_name, namespace=None)[source]
- pulsar.managers.util.pykube_util.find_pod_object_by_name(pykube_api, pod_name, namespace=None)[source]
- pulsar.managers.util.pykube_util.galaxy_instance_id(params)[source]
Parse and validate the id of the Galaxy instance from supplied dict.
This will be added to Jobs and Pods names, so it needs to be DNS friendly, this means: The Internet standards (Requests for Comments) for protocols mandate that component hostname labels may contain only the ASCII letters ‘a’ through ‘z’ (in a case-insensitive manner), the digits ‘0’ through ‘9’, and the minus sign (‘-‘).
It looks for the value set on params[‘k8s_galaxy_instance_id’], which might or not be set. The idea behind this is to allow the Galaxy instance to trust (or not) existing k8s Jobs and Pods that match the setup of a Job that is being recovered or restarted after a downtime/reboot.
pulsar.managers.util.retry module
- pulsar.managers.util.retry.DEFAULT_SHOULD_RETRY(_exc)
pulsar.managers.util.sudo module
pulsar.managers.util.tes module
- class pulsar.managers.util.tes.TesClient(url: str, headers: Dict | None = None)[source]
Bases:
object
- class pulsar.managers.util.tes.TesExecutor(*, image: str, command: List[str], workdir: str | None = None, stdin: str | None = None, stdout: str | None = None, stderr: str | None = None, env: Dict[str, str] | None = None)[source]
Bases:
BaseModel- command: List[str]
- env: Dict[str, str] | None
- image: str
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- stderr: str | None
- stdin: str | None
- stdout: str | None
- workdir: str | None
- class pulsar.managers.util.tes.TesResources(*, cpu_cores: int | None = None, preemptible: bool | None = None, ram_gb: float | None = None, disk_gb: float | None = None, zones: List[str] | None = None, backend_parameters: Dict[str, str] | None = None, backend_parameters_strict: bool | None = False)[source]
Bases:
BaseModel- backend_parameters: Dict[str, str] | None
- backend_parameters_strict: bool | None
- cpu_cores: int | None
- disk_gb: float | None
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- preemptible: bool | None
- ram_gb: float | None
- zones: List[str] | None
- class pulsar.managers.util.tes.TesState(value)[source]
Bases:
Enum- CANCELED = 'CANCELED'
- COMPLETE = 'COMPLETE'
- EXECUTOR_ERROR = 'EXECUTOR_ERROR'
- INITIALIZING = 'INITIALIZING'
- PAUSED = 'PAUSED'
- QUEUED = 'QUEUED'
- RUNNING = 'RUNNING'
- SYSTEM_ERROR = 'SYSTEM_ERROR'
- UNKNOWN = 'UNKNOWN'
- class pulsar.managers.util.tes.TesTask(*, id: str | None = None, state: TesState | None = None, name: str | None = None, description: str | None = None, inputs: List[TesInput] | None = None, outputs: List[TesOutput] | None = None, resources: TesResources | None = None, executors: List[TesExecutor], volumes: List[str] | None = None, tags: Dict[str, str] | None = None, logs: List[TesTaskLog] | None = None, creation_time: str | None = None)[source]
Bases:
BaseModel- creation_time: str | None
- description: str | None
- executors: List[TesExecutor]
- id: str | None
- inputs: List[TesInput] | None
- logs: List[TesTaskLog] | None
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- name: str | None
- outputs: List[TesOutput] | None
- resources: TesResources | None
- tags: Dict[str, str] | None
- volumes: List[str] | None
Module contents
This module and its submodules contains utilities for running external processes and interfacing with job managers. This module should contain functionality shared between Galaxy and the Pulsar.