import logging
import os
from enum import Enum
from typing import (
Any,
Callable,
Dict,
List,
NamedTuple,
Optional,
)
from typing_extensions import Protocol
from pulsar.managers.util.tes import (
ensure_tes_client,
TesClient,
TesExecutor,
TesState,
TesTask,
tes_client_from_dict,
tes_galaxy_instance_id,
tes_resources,
)
from pulsar.managers.util.pykube_util import (
ensure_pykube,
find_job_object_by_name,
find_pod_object_by_name,
galaxy_instance_id,
Job,
job_object_dict,
produce_unique_k8s_job_name,
pull_policy,
pykube_client_from_dict,
stop_job,
)
from pulsar.managers import status as manager_status
from .action_mapper import (
actions,
path_type,
)
from .amqp_exchange import ACK_FORCE_NOACK_KEY
from .decorators import (
parseJson,
retry,
)
from .destination import submit_params
from .job_directory import RemoteJobDirectory
from .setup_handler import build as build_setup_handler
from .util import (
copy,
ensure_directory,
ExternalId,
json_dumps,
json_loads,
MonitorStyle,
to_base64_json,
)
log = logging.getLogger(__name__)
CACHE_WAIT_SECONDS = 3
TOOL_EXECUTION_CONTAINER_COMMAND_TEMPLATE = """
path='%s/command_line';
while [ ! -e $path ];
do sleep 1; echo "waiting for job script $path";
done;
echo 'running script';
sh $path;
echo 'ran script'"""
PULSAR_CONTAINER_IMAGE = "galaxy/pulsar-pod-staging:0.15.0.0"
CONTAINER_STAGING_DIRECTORY = "/pulsar_staging/"
[docs]
class OutputNotFoundException(Exception):
def __init__(self, path):
self.path = path
def __str__(self):
return "No remote output found for path %s" % self.path
[docs]
class ClientManagerProtocol(Protocol):
manager_name: str
[docs]
class BaseJobClient:
ensure_library_available: Optional[Callable[[], None]] = None
def __init__(self, destination_params, job_id):
precondition = self.__class__.ensure_library_available
precondition and precondition()
destination_params = destination_params or {}
self.destination_params = destination_params
self.assign_job_id(job_id)
for attr in ["ssh_key", "ssh_user", "ssh_host", "ssh_port"]:
setattr(self, attr, destination_params.get(attr, None))
self.env = destination_params.get("env", [])
self.files_endpoint = destination_params.get("files_endpoint", None)
self.token_endpoint = destination_params.get("token_endpoint", None)
default_file_action = self.destination_params.get("default_file_action", "transfer")
if default_file_action not in actions:
raise Exception("Unknown Pulsar default file action type %s" % default_file_action)
self.default_file_action = default_file_action
self.action_config_path = self.destination_params.get("file_action_config", None)
if self.action_config_path is None:
self.file_actions = self.destination_params.get("file_actions", {})
else:
self.file_actions = None
self.setup_handler = build_setup_handler(self, destination_params)
[docs]
def assign_job_id(self, job_id):
self.job_id = job_id
self._set_job_directory()
def _set_job_directory(self):
if "jobs_directory" in self.destination_params:
pulsar_staging = self.destination_params["jobs_directory"]
sep = self.destination_params.get("remote_sep", os.sep)
job_directory = RemoteJobDirectory(
remote_staging_directory=pulsar_staging,
remote_id=self.job_id,
remote_sep=sep,
)
else:
job_directory = None
self.job_directory = job_directory
[docs]
def setup(self, tool_id=None, tool_version=None, preserve_galaxy_python_environment=None):
"""
Setup remote Pulsar server to run this job.
"""
setup_args = {"job_id": self.job_id}
if tool_id:
setup_args["tool_id"] = tool_id
if tool_version:
setup_args["tool_version"] = tool_version
if preserve_galaxy_python_environment:
setup_args["preserve_galaxy_python_environment"] = preserve_galaxy_python_environment
return self.setup_handler.setup(**setup_args)
@property
def prefer_local_staging(self):
# If doing a job directory is defined, calculate paths here and stage
# remotely.
return self.job_directory is None
[docs]
class JobClient(BaseJobClient):
"""
Objects of this client class perform low-level communication with a remote Pulsar server.
**Parameters**
destination_params : dict or str
connection parameters, either url with dict containing url (and optionally `private_token`).
job_id : str
Galaxy job/task id.
"""
def __init__(self, destination_params, job_id, job_manager_interface):
super().__init__(destination_params, job_id)
self.job_manager_interface = job_manager_interface
[docs]
def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None,
dynamic_file_sources=None, token_endpoint=None):
"""
Queue up the execution of the supplied `command_line` on the remote
server. Called launch for historical reasons, should be renamed to
enqueue or something like that.
**Parameters**
command_line : str
Command to execute.
"""
launch_params = dict(command_line=command_line, job_id=self.job_id)
submit_params_dict = submit_params(self.destination_params)
if submit_params_dict:
launch_params['params'] = json_dumps(submit_params_dict)
if dependencies_description:
launch_params['dependencies_description'] = json_dumps(dependencies_description.to_dict())
if env:
launch_params['env'] = json_dumps(env)
if remote_staging:
launch_params['remote_staging'] = json_dumps(remote_staging)
if job_config and 'touch_outputs' in job_config:
# message clients pass the entire job config
launch_params['submit_extras'] = json_dumps({'touch_outputs': job_config['touch_outputs']})
if token_endpoint is not None:
launch_params["token_endpoint"] = json_dumps({'token_endpoint': token_endpoint})
if job_config and self.setup_handler.local:
# Setup not yet called, job properties were inferred from
# destination arguments. Hence, must have Pulsar setup job
# before queueing.
setup_params = _setup_params_from_job_config(job_config)
launch_params['setup_params'] = json_dumps(setup_params)
if dynamic_file_sources is not None:
launch_params["dynamic_file_sources"] = json_dumps(dynamic_file_sources)
return self._raw_execute("submit", launch_params)
[docs]
def full_status(self):
""" Return a dictionary summarizing final state of job.
"""
return self.raw_check_complete()
[docs]
def kill(self):
"""
Cancel remote job, either removing from the queue or killing it.
"""
return self._raw_execute("cancel", {"job_id": self.job_id})
@retry()
@parseJson()
def raw_check_complete(self):
"""
Get check_complete response from the remote server.
"""
check_complete_response = self._raw_execute("status", {"job_id": self.job_id})
return check_complete_response
[docs]
def get_status(self):
check_complete_response = self.raw_check_complete()
# Older Pulsar instances won't set status so use 'complete', at some
# point drop backward compatibility.
status = check_complete_response.get("status", None)
return status
[docs]
def clean(self):
"""
Cleanup the remote job.
"""
self._raw_execute("clean", {"job_id": self.job_id})
@parseJson()
def remote_setup(self, **setup_args):
"""
Setup remote Pulsar server to run this job.
"""
return self._raw_execute("setup", setup_args)
[docs]
def put_file(self, path, input_type, name=None, contents=None, action_type='transfer'):
if not name:
name = os.path.basename(path)
args = {"job_id": self.job_id, "name": name, "type": input_type}
input_path = path
if contents:
input_path = None
# action type == 'message' should either copy or transfer
# depending on default not just fallback to transfer.
if action_type in ['transfer', 'message']:
if isinstance(contents, str):
contents = contents.encode("utf-8")
message = "Uploading path [%s] (action_type: [%s])"
log.debug(message, path, action_type)
return self._upload_file(args, contents, input_path)
elif action_type == 'copy':
path_response = self._raw_execute('path', args)
pulsar_path = json_loads(path_response)['path']
_copy(path, pulsar_path)
return {'path': pulsar_path}
[docs]
def fetch_output(self, path, name, working_directory, action_type, output_type):
"""
Fetch (transfer, copy, etc...) an output from the remote Pulsar server.
**Parameters**
path : str
Local path of the dataset.
name : str
Remote name of file (i.e. path relative to remote staging output
or working directory).
working_directory : str
Local working_directory for the job.
action_type : str
Where to find file on Pulsar (output_workdir or output). legacy is also
an option in this case Pulsar is asked for location - this will only be
used if targetting an older Pulsar server that didn't return statuses
allowing this to be inferred.
"""
if output_type in ['output_workdir', 'output_metadata']:
self._populate_output_path(name, path, action_type, output_type)
elif output_type == 'output':
self._fetch_output(path=path, name=name, action_type=action_type)
else:
raise Exception("Unknown output_type %s" % output_type)
def _raw_execute(self, command, args=None, data=None, input_path=None, output_path=None):
if args is None:
args = {}
return self.job_manager_interface.execute(command, args, data, input_path, output_path)
def _fetch_output(self, path, name=None, check_exists_remotely=False, action_type='transfer'):
if not name:
# Extra files will send in the path.
name = os.path.basename(path)
self._populate_output_path(name, path, action_type, path_type.OUTPUT)
def _populate_output_path(self, name, output_path, action_type, path_type):
ensure_directory(output_path)
if action_type == 'transfer':
self.__raw_download_output(name, self.job_id, path_type, output_path)
elif action_type == 'copy':
pulsar_path = self._output_path(name, self.job_id, path_type)['path']
_copy(pulsar_path, output_path)
@parseJson()
def _upload_file(self, args, contents, input_path):
return self._raw_execute("upload_file", args, contents, input_path)
@parseJson()
def _output_path(self, name, job_id, output_type):
return self._raw_execute("path",
{"name": name,
"job_id": self.job_id,
"type": output_type})
@retry()
def __raw_download_output(self, name, job_id, output_type, output_path):
output_params = {
"name": name,
"job_id": self.job_id,
"type": output_type
}
self._raw_execute("download_output", output_params, output_path=output_path)
[docs]
def job_ip(self):
"""Return a entry point ports dict (if applicable)."""
return None
[docs]
class MessagingClientManagerProtocol(ClientManagerProtocol):
status_cache: Dict[str, Dict[str, Any]]
[docs]
class BaseMessageJobClient(BaseRemoteConfiguredJobClient):
client_manager: MessagingClientManagerProtocol
[docs]
def clean(self):
del self.client_manager.status_cache[self.job_id]
[docs]
def full_status(self):
job_id = self.job_id
full_status = self.client_manager.status_cache.get(job_id, None)
if full_status is None:
raise Exception("full_status() called for [%s] before a final status was properly cached with cilent manager." % job_id)
return full_status
def _build_status_request_message(self):
# Because this is used to poll, status requests will not be resent if we do not receive an acknowledgement
update_params = {
'request': 'status',
'job_id': self.job_id,
ACK_FORCE_NOACK_KEY: True,
}
return update_params
[docs]
class MessageJobClient(BaseMessageJobClient):
[docs]
def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None,
dynamic_file_sources=None, token_endpoint=None):
"""
"""
launch_params = self._build_setup_message(
command_line,
dependencies_description=dependencies_description,
env=env,
remote_staging=remote_staging,
job_config=job_config,
dynamic_file_sources=dynamic_file_sources,
token_endpoint=token_endpoint,
)
self.client_manager.exchange.publish("setup", launch_params)
log.info("Job published to setup message queue: %s", self.job_id)
return None
[docs]
def get_status(self):
status_params = self._build_status_request_message()
response = self.client_manager.exchange.publish("setup", status_params)
log.info("Job status request published to setup message queue: %s", self.job_id)
return response
[docs]
def kill(self):
self.client_manager.exchange.publish("kill", dict(job_id=self.job_id))
[docs]
class MessageCLIJobClient(BaseMessageJobClient):
def __init__(self, destination_params, job_id, client_manager, shell):
super().__init__(destination_params, job_id, client_manager)
self.remote_pulsar_path = destination_params["remote_pulsar_path"]
self.shell = shell
[docs]
def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None,
dynamic_file_sources=None, token_endpoint=None):
"""
"""
launch_params = self._build_setup_message(
command_line,
dependencies_description=dependencies_description,
env=env,
remote_staging=remote_staging,
job_config=job_config,
dynamic_file_sources=dynamic_file_sources,
token_endpoint=token_endpoint,
)
base64_message = to_base64_json(launch_params)
submit_command = os.path.join(self.remote_pulsar_path, "scripts", "submit.bash")
# TODO: Allow configuration of manager, app, and ini path...
self.shell.execute("nohup {} --base64 {} &".format(submit_command, base64_message))
[docs]
def kill(self):
# TODO
pass
[docs]
class CoexecutionContainerCommand(NamedTuple):
image: str
command: str
args: List[str]
working_directory: str
ports: Optional[List[int]] = None
[docs]
class ExecutionType(str, Enum):
# containers run one after each other with similar configuration
# like in TES or AWS Batch
SEQUENTIAL = "sequential"
# containers run concurrently with the same file system - like K8S
PARALLEL = "parallel"
[docs]
class CoexecutionLaunchMixin(BaseRemoteConfiguredJobClient):
execution_type: ExecutionType
pulsar_container_image: str
[docs]
def launch(
self,
command_line,
dependencies_description=None,
env=None,
remote_staging=None,
job_config=None,
dynamic_file_sources=None,
container_info=None,
token_endpoint=None,
pulsar_app_config=None
) -> Optional[ExternalId]:
"""
"""
launch_params = self._build_setup_message(
command_line,
dependencies_description=dependencies_description,
env=env,
remote_staging=remote_staging,
job_config=job_config,
dynamic_file_sources=dynamic_file_sources,
token_endpoint=token_endpoint,
)
container = None
guest_ports = None
if container_info is not None:
container = container_info.get("container_id")
guest_ports = container_info.get("guest_ports")
wait_after_submission = not (container and self.execution_type == ExecutionType.SEQUENTIAL)
manager_name = self.client_manager.manager_name
manager_type = "coexecution" if container is not None else "unqueued"
pulsar_app_config = pulsar_app_config or {}
manager_config = self._ensure_manager_config(
pulsar_app_config, manager_name, manager_type,
)
if "staging_directory" not in manager_config and "staging_directory" not in pulsar_app_config:
pulsar_app_config["staging_directory"] = CONTAINER_STAGING_DIRECTORY
if self.amqp_key_prefix:
pulsar_app_config["amqp_key_prefix"] = self.amqp_key_prefix
if "monitor" not in manager_config:
manager_config["monitor"] = MonitorStyle.BACKGROUND.value if wait_after_submission else MonitorStyle.NONE.value
if "persistence_directory" not in pulsar_app_config:
pulsar_app_config["persistence_directory"] = os.path.join(CONTAINER_STAGING_DIRECTORY, "persisted_data")
elif "manager" in pulsar_app_config and manager_name != '_default_':
log.warning(
"'manager' set in app config but client has non-default manager '%s', this will cause communication"
" failures, remove `manager` from app or client config to fix", manager_name)
using_dependencies = container is None and dependencies_description is not None
if using_dependencies and "dependency_resolution" not in pulsar_app_config:
# Setup default dependency resolution for container above...
dependency_resolution = {
"cache": False,
"use": True,
"default_base_path": "/pulsar_dependencies",
"cache_dir": "/pulsar_dependencies/_cache",
"resolvers": [{ # TODO: add CVMFS resolution...
"type": "conda",
"auto_init": True,
"auto_install": True,
"prefix": '/pulsar_dependencies/conda',
}, {
"type": "conda",
"auto_init": True,
"auto_install": True,
"prefix": '/pulsar_dependencies/conda',
"versionless": True,
}]
}
pulsar_app_config["dependency_resolution"] = dependency_resolution
base64_message = to_base64_json(launch_params)
base64_app_conf = to_base64_json(pulsar_app_config)
pulsar_container_image = self.pulsar_container_image
wait_arg = "--wait" if wait_after_submission else "--no-wait"
pulsar_container = CoexecutionContainerCommand(
pulsar_container_image,
"pulsar-submit",
self._pulsar_script_args(manager_name, base64_message, base64_app_conf, wait_arg=wait_arg),
"/",
None,
)
tool_container = None # Default to just use dependency resolution in Pulsar container
if container:
job_directory = self.job_directory
command = TOOL_EXECUTION_CONTAINER_COMMAND_TEMPLATE % job_directory.job_directory
ports = None
if guest_ports:
ports = [int(p) for p in guest_ports]
tool_container = CoexecutionContainerCommand(
container,
"sh",
["-c", command],
"/",
ports,
)
pulsar_finish_container: Optional[CoexecutionContainerCommand] = None
if not wait_after_submission:
pulsar_finish_container = CoexecutionContainerCommand(
pulsar_container_image,
"pulsar-finish",
self._pulsar_script_args(manager_name, base64_message, base64_app_conf),
"/",
None,
)
return self._launch_containers(pulsar_container, tool_container, pulsar_finish_container)
def _pulsar_script_args(self, manager_name, base64_job, base64_app_conf, wait_arg=None):
manager_args = []
if manager_name != "_default_":
manager_args.append("--manager")
manager_args.append(manager_name)
if wait_arg:
manager_args.append(wait_arg)
manager_args.extend(["--base64", base64_job, "--app_conf_base64", base64_app_conf])
return manager_args
def _ensure_manager_config(self, pulsar_app_config, manager_name, manager_type):
if "manager" in pulsar_app_config:
manager_config = pulsar_app_config["manager"]
elif "managers" in pulsar_app_config:
managers_config = pulsar_app_config["managers"]
if manager_name not in managers_config:
managers_config[manager_name] = {}
manager_config = managers_config[manager_name]
else:
manager_config = {}
pulsar_app_config["manager"] = manager_config
if "type" not in manager_config:
manager_config["type"] = manager_type
return manager_config
def _launch_containers(
self,
pulsar_submit_container: CoexecutionContainerCommand,
tool_container: Optional[CoexecutionContainerCommand],
pulsar_finish_container: Optional[CoexecutionContainerCommand]
) -> Optional[ExternalId]:
...
[docs]
class BaseMessageCoexecutionJobClient(BaseMessageJobClient):
pulsar_container_image: str
def __init__(self, destination_params, job_id, client_manager):
super().__init__(destination_params, job_id, client_manager)
self.pulsar_container_image = destination_params.get("pulsar_container_image", PULSAR_CONTAINER_IMAGE)
[docs]
class BasePollingCoexecutionJobClient(BaseRemoteConfiguredJobClient):
pulsar_container_image: str
def __init__(self, destination_params, job_id, client_manager):
super().__init__(destination_params, job_id, client_manager)
self.pulsar_container_image = destination_params.get("pulsar_container_image", PULSAR_CONTAINER_IMAGE)
[docs]
def tes_state_to_pulsar_status(state: Optional[TesState]) -> str:
state = state or TesState.UNKNOWN
state_map = {
TesState.UNKNOWN: manager_status.FAILED,
TesState.INITIALIZING: manager_status.PREPROCESSING,
TesState.RUNNING: manager_status.RUNNING,
TesState.PAUSED: manager_status.RUNNING,
TesState.COMPLETE: manager_status.COMPLETE,
TesState.EXECUTOR_ERROR: manager_status.FAILED,
TesState.SYSTEM_ERROR: manager_status.FAILED,
TesState.CANCELED: manager_status.CANCELLED,
}
if state not in state_map:
log.warning(f"Unknown tes state encountered [{state}]")
return manager_status.FAILED
else:
return state_map[state]
[docs]
def tes_state_is_complete(state: Optional[TesState]) -> bool:
state = state or TesState.UNKNOWN
state_map = {
TesState.UNKNOWN: True,
TesState.INITIALIZING: False,
TesState.RUNNING: False,
TesState.PAUSED: False,
TesState.COMPLETE: True,
TesState.EXECUTOR_ERROR: True,
TesState.SYSTEM_ERROR: True,
TesState.CANCELED: True,
}
if state not in state_map:
log.warning(f"Unknown tes state encountered [{state}]")
return True
else:
return state_map[state]
[docs]
class LaunchesTesContainersMixin(CoexecutionLaunchMixin):
""""""
ensure_library_available = ensure_tes_client
execution_type = ExecutionType.SEQUENTIAL
def _launch_containers(
self,
pulsar_submit_container: CoexecutionContainerCommand,
tool_container: Optional[CoexecutionContainerCommand],
pulsar_finish_container: Optional[CoexecutionContainerCommand]
) -> ExternalId:
volumes = [
CONTAINER_STAGING_DIRECTORY,
]
pulsar_container_executor = self._container_to_executor(pulsar_submit_container)
executors = [pulsar_container_executor]
if tool_container:
tool_container_executor = self._container_to_executor(tool_container)
executors.append(tool_container_executor)
assert pulsar_finish_container
pulsar_finish_executor = self._container_to_executor(pulsar_finish_container)
executors.append(pulsar_finish_executor)
name = self._tes_job_name
tes_task = TesTask(
name=name,
executors=executors,
volumes=volumes,
resources=tes_resources(self.destination_params)
)
created_task = self._tes_client.create_task(tes_task)
return ExternalId(created_task.id)
def _container_to_executor(self, container: CoexecutionContainerCommand) -> TesExecutor:
if container.ports:
raise Exception("exposing container ports not possible via TES")
return TesExecutor(
image=container.image,
command=[container.command] + container.args,
workdir=container.working_directory,
)
@property
def _tes_client(self) -> TesClient:
return tes_client_from_dict(self.destination_params)
@property
def _tes_job_name(self):
# currently just _k8s_job_prefix... which might be fine?
job_id = self.job_id
job_name = produce_unique_k8s_job_name(app_prefix="pulsar", job_id=job_id, instance_id=self.instance_id)
return job_name
def _setup_tes_client_properties(self, destination_params):
self.instance_id = tes_galaxy_instance_id(destination_params)
[docs]
def kill(self):
self._tes_client.cancel_task(self.job_id)
[docs]
def raw_check_complete(self) -> Dict[str, Any]:
tes_task: TesTask = self._tes_client.get_task(self.job_id, "FULL")
tes_state = tes_task.state
return {
"status": tes_state_to_pulsar_status(tes_state),
"complete": "true" if tes_state_is_complete(tes_state) else "false", # Ancient John, what were you thinking?
}
[docs]
class TesPollingCoexecutionJobClient(BasePollingCoexecutionJobClient, LaunchesTesContainersMixin):
"""A client that co-executes pods via GA4GH TES and depends on amqp for status updates."""
def __init__(self, destination_params, job_id, client_manager):
super().__init__(destination_params, job_id, client_manager)
self._setup_tes_client_properties(destination_params)
[docs]
class TesMessageCoexecutionJobClient(BaseMessageCoexecutionJobClient, LaunchesTesContainersMixin):
"""A client that co-executes pods via GA4GH TES and doesn't depend on amqp for status updates."""
def __init__(self, destination_params, job_id, client_manager):
super().__init__(destination_params, job_id, client_manager)
self._setup_tes_client_properties(destination_params)
[docs]
class LaunchesK8ContainersMixin(CoexecutionLaunchMixin):
"""Mixin to provide K8 launch and kill interaction."""
ensure_library_available = ensure_pykube
execution_type = ExecutionType.PARALLEL
def _launch_containers(
self,
pulsar_submit_container: CoexecutionContainerCommand,
tool_container: Optional[CoexecutionContainerCommand],
pulsar_finish_container: Optional[CoexecutionContainerCommand]
) -> None:
assert pulsar_finish_container is None
volumes = [
{"name": "staging-directory", "emptyDir": {}},
]
volume_mounts = [
{"mountPath": CONTAINER_STAGING_DIRECTORY, "name": "staging-directory"},
]
pulsar_container_dict = self._container_command_to_dict("pulsar-container", pulsar_submit_container)
pulsar_container_resources = self._pulsar_container_resources
if pulsar_container_resources:
pulsar_container_dict["resources"] = pulsar_container_resources
pulsar_container_dict["volumeMounts"] = volume_mounts
container_dicts = [pulsar_container_dict]
if tool_container:
tool_container_dict = self._container_command_to_dict("tool-container", tool_container)
tool_container_resources = self._tool_container_resources
if tool_container_resources:
tool_container_dict["resources"] = tool_container_resources
tool_container_dict["volumeMounts"] = volume_mounts
container_dicts.append(tool_container_dict)
for container_dict in container_dicts:
if self._default_pull_policy:
container_dict["imagePullPolicy"] = self._default_pull_policy
job_name = self._k8s_job_name
template = {
"metadata": {
"labels": {"app": job_name},
},
"spec": {
"volumes": volumes,
"restartPolicy": "Never",
"containers": container_dicts,
}
}
spec = {"template": template}
params = self.destination_params
spec.update(self._job_spec_params(params))
k8s_job_obj = job_object_dict(params, job_name, spec)
pykube_client = self._pykube_client
job = Job(pykube_client, k8s_job_obj)
job.create()
def _container_command_to_dict(self, name: str, container: CoexecutionContainerCommand) -> Dict[str, Any]:
container_dict: Dict[str, Any] = {
"name": name,
"image": container.image,
"command": [container.command],
"args": container.args,
"workingDir": container.working_directory,
}
ports = container.ports
if ports:
container_dict["ports"] = [{"containerPort": p} for p in ports]
return container_dict
[docs]
def kill(self):
job_name = self._k8s_job_name
pykube_client = self._pykube_client
job = find_job_object_by_name(pykube_client, job_name)
if job:
log.info("Kill k8s job with name %s" % job_name)
stop_job(job)
else:
log.info("Attempted to kill k8s job but it is unavailable.")
[docs]
def clean(self):
self.kill() # pretty much the same here right?
[docs]
def job_ip(self):
job_name = self._k8s_job_name
pykube_client = self._pykube_client
pod = find_pod_object_by_name(pykube_client, job_name)
if pod:
status = pod.obj['status']
else:
status = {}
if 'podIP' in status:
pod_ip = status['podIP']
return pod_ip
else:
log.debug("Attempted to get ports dict but k8s pod unavailable")
@property
def _pykube_client(self):
return pykube_client_from_dict(self.destination_params)
@property
def _k8s_job_name(self):
job_id = self.job_id
job_name = produce_unique_k8s_job_name(app_prefix="pulsar", job_id=job_id, instance_id=self.instance_id)
return job_name
def _job_spec_params(self, params):
spec = {}
if "k8s_walltime_limit" in params:
spec["activeDeadlineSeconds"] = int(params["k8s_walltime_limit"])
if "k8s_job_ttl_secs_after_finished" in params and params.get("k8s_cleanup_job") != "never":
spec["ttlSecondsAfterFinished"] = int(params["k8s_job_ttl_secs_after_finished"])
return spec
@property
def _pulsar_container_resources(self):
params = self.destination_params
return self._container_resources(params, container='pulsar')
@property
def _tool_container_resources(self):
params = self.destination_params
return self._container_resources(params, container='tool')
def _container_resources(self, params, container=None):
resources = {}
for resource_param in ('requests_cpu', 'requests_memory', 'limits_cpu', 'limits_memory'):
subkey, resource = resource_param.split('_', 1)
if resource_param in params:
if subkey not in resources:
resources[subkey] = {}
resources[subkey][resource] = params[resource_param]
if container is not None and container + '_' + resource_param in params:
if subkey not in resources:
resources[subkey] = {}
resources[subkey][resource] = params[container + '_' + resource_param]
return resources
def _setup_k8s_client_properties(self, destination_params):
self.instance_id = galaxy_instance_id(destination_params)
self._default_pull_policy = pull_policy(destination_params)
[docs]
class K8sMessageCoexecutionJobClient(BaseMessageCoexecutionJobClient, LaunchesK8ContainersMixin):
"""A client that co-executes pods via Kubernetes and depends on amqp for status updates."""
def __init__(self, destination_params, job_id, client_manager):
super().__init__(destination_params, job_id, client_manager)
self._setup_k8s_client_properties(destination_params)
[docs]
class K8sPollingCoexecutionJobClient(BasePollingCoexecutionJobClient, LaunchesK8ContainersMixin):
"""A client that co-executes pods via Kubernetes and doesn't depend on amqp."""
def __init__(self, destination_params, job_id, client_manager):
super().__init__(destination_params, job_id, client_manager)
self._setup_k8s_client_properties(destination_params)
[docs]
def full_status(self):
status = self._raw_check_complete()
return status
[docs]
def raw_check_complete(self):
return self._raw_check_complete()
def _raw_check_complete(self):
job_name = self._k8s_job_name
pykube_client = self._pykube_client
job = find_job_object_by_name(pykube_client, job_name)
job_failed = (job.obj['status']['failed'] > 0
if 'failed' in job.obj['status'] else False)
job_active = (job.obj['status']['active'] > 0
if 'active' in job.obj['status'] else False)
job_succeeded = (job.obj['status']['succeeded'] > 0
if 'succeeded' in job.obj['status'] else False)
if job_failed:
status = manager_status.FAILED
elif job_succeeded > 0 and job_active == 0:
status = manager_status.COMPLETE
elif job_active >= 0:
status = manager_status.RUNNING
else:
status = manager_status.FAILED
return {
"status": status,
"complete": "true" if manager_status.is_job_done(status) else "false", # Ancient John, what were you thinking?
}
[docs]
class LaunchesAwsBatchContainersMixin(CoexecutionLaunchMixin):
"""..."""
execution_type = ExecutionType.SEQUENTIAL
[docs]
class AwsBatchPollingCoexecutionJobClient(BasePollingCoexecutionJobClient, LaunchesAwsBatchContainersMixin):
"""A client that co-executes pods via AWS Batch and doesn't depend on amqp for status updates."""
def __init__(self, destination_params, job_id, client_manager):
super().__init__(destination_params, job_id, client_manager)
raise NotImplementedError()
[docs]
class AwsBatchMessageCoexecutionJobClient(BasePollingCoexecutionJobClient, LaunchesAwsBatchContainersMixin):
"""A client that co-executes pods via AWS Batch and depends on amqp for status updates."""
def __init__(self, destination_params, job_id, client_manager):
super().__init__(destination_params, job_id, client_manager)
raise NotImplementedError()
def _copy(from_path, to_path):
message = "Copying path [%s] to [%s]"
log.debug(message, from_path, to_path)
copy(from_path, to_path)
def _setup_params_from_job_config(job_config):
job_id = job_config.get("job_id", None)
tool_id = job_config.get("tool_id", None)
tool_version = job_config.get("tool_version", None)
preserve_galaxy_python_environment = job_config.get("preserve_galaxy_python_environment", None)
# use_metadata ignored post Pulsar 0.14.12+ but keep setting it for older Pulsar's that
# had hacks for pre-2017 Galaxies.
return dict(
job_id=job_id,
tool_id=tool_id,
tool_version=tool_version,
use_metadata=True,
preserve_galaxy_python_environment=preserve_galaxy_python_environment,
)