Source code for pulsar.managers.util.cli.job

"""
Abstract base class for cli job plugins.
"""
from abc import (
    ABCMeta,
    abstractmethod,
)
from enum import Enum

try:
    from galaxy.model import Job

    job_states = Job.states
except ImportError:

    # Not in Galaxy, map Galaxy job states to Pulsar ones.
[docs] class job_states(str, Enum): # type: ignore[no-redef] RUNNING = "running" OK = "complete" QUEUED = "queued" ERROR = "failed"
[docs]class BaseJobExec(metaclass=ABCMeta): def __init__(self, **params): """ Constructor for CLI job executor. """ self.params = params.copy()
[docs] def job_script_kwargs(self, ofile, efile, job_name): """Return extra keyword argument for consumption by job script module. """ return {}
[docs] @abstractmethod def submit(self, script_file): """ Given specified script_file path, yield command to submit it to external job manager. """
[docs] @abstractmethod def delete(self, job_id): """ Given job id, return command to stop execution or dequeue specified job. """
[docs] @abstractmethod def get_status(self, job_ids=None): """ Return command to get statuses of specified job ids. """
[docs] @abstractmethod def get_single_status(self, job_id): """ Return command to get the status of a single, specified job. """
[docs] @abstractmethod def parse_status(self, status, job_ids): """ Parse the statuses of output from get_status command. """
[docs] @abstractmethod def parse_single_status(self, status, job_id): """ Parse the status of output from get_single_status command. """
[docs] def get_failure_reason(self, job_id): """ Return the failure reason for the given job_id. """ return None
[docs] def parse_failure_reason(self, reason, job_id): """ Parses the failure reason, assigning it against a """ return None
__all__ = ( "BaseJobExec", "job_states", )