Source code for pulsar.managers.util.tes

from typing import (
    Any,
    cast,
    Dict,
    List,
    Optional,
)

from galaxy.util import (
    asbool,
    listify,
)

IMPORT_MESSAGE = None
try:
    from pydantictes.api import TesClient
    from pydantictes.models import (
        TesExecutor,
        TesResources,
        TesState,
        TesTask,
    )
except ImportError as exc:
    TesClient = None  # type: ignore
    TesExecutor = None  # type: ignore
    TesResources = None  # type: ignore
    TesState = None  # type: ignore
    TesTask = None  # type: ignore
    IMPORT_MESSAGE = (
        "The Python pydantic-tes package is required to use "
        "this feature, please install it or correct the "
        "following error:\nImportError %s" % str(exc)
    )


[docs] def ensure_tes_client() -> None: if TesClient is None: assert IMPORT_MESSAGE raise Exception(IMPORT_MESSAGE)
def tes_client_from_dict(destination_params: Dict[str, Any]) -> TesClient: # TODO: implement funnel's basic auth in pydantic-tes and expose it here. tes_url = destination_params["tes_url"] return TesClient( url=tes_url, ) def tes_resources(destination_params: Dict[str, Any]) -> TesResources: cpu_cores: Optional[int] preemptible: Optional[bool] ram_gb: Optional[float] disk_gb: Optional[float] zones: Optional[List[str]] backend_parameters: Optional[Dict[str, str]] = None backend_parameters_strict: Optional[bool] raw_cpu_cores = destination_params.get("tes_cpu_cores") cpu_cores = int(raw_cpu_cores) if raw_cpu_cores is not None else None raw_preemptible = destination_params.get("tes_preemptible") preemptible = asbool(raw_preemptible) if raw_preemptible is not None else None raw_ram_gb = destination_params.get("tes_ram_gb") ram_gb = float(raw_ram_gb) if raw_ram_gb is not None else None raw_disk_gb = destination_params.get("tes_disk_gb") disk_gb = float(raw_disk_gb) if raw_disk_gb is not None else None raw_zones = destination_params.get("tes_zones") zones = listify(raw_zones) if raw_zones is not None else None raw_backend_parameters = destination_params.get("tes_backend_parameters") if raw_backend_parameters is not None: backend_parameters = {} for k, v in cast(dict, raw_backend_parameters).items(): backend_parameters[str(k)] = str(v) raw_backend_parameters_strict = destination_params.get("tes_backend_parameters_strict") if raw_backend_parameters_strict is not None: backend_parameters_strict = asbool(raw_backend_parameters_strict) else: backend_parameters_strict = None return TesResources( cpu_cores=cpu_cores, preemptible=preemptible, ram_gb=ram_gb, disk_gb=disk_gb, zones=zones, backend_parameters=backend_parameters, backend_parameters_strict=backend_parameters_strict, ) def tes_galaxy_instance_id(destinaton_params: Dict[str, Any]) -> Optional[str]: return destinaton_params.get("tes_galaxy_instance_id") __all__ = ( "ensure_tes_client", "TesClient", "TesExecutor", "TesResources", "TesState", "TesTask", )