""" Composite actions over managers shared between HTTP endpoint (routes.py)
and message queue.
"""
import logging
import os
from pulsar import __version__ as pulsar_version
from pulsar.client.setup_handler import build_job_config
from pulsar.managers import (
PULSAR_UNKNOWN_RETURN_CODE,
status,
)
from pulsar.managers.staging import realized_dynamic_file_sources
log = logging.getLogger(__name__)
[docs]def status_dict(manager, job_id):
job_status = manager.get_status(job_id)
return full_status(manager, job_status, job_id)
[docs]def full_status(manager, job_status, job_id):
if status.is_job_done(job_status):
full_status = __job_complete_dict(job_status, manager, job_id)
else:
full_status = {"complete": "false", "status": job_status, "job_id": job_id}
return full_status
def __job_complete_dict(complete_status, manager, job_id):
""" Build final dictionary describing completed job for consumption by
Pulsar client.
"""
return_code = manager.return_code(job_id)
if return_code == PULSAR_UNKNOWN_RETURN_CODE:
return_code = None
stdout_contents = manager.stdout_contents(job_id).decode("utf-8")
stderr_contents = manager.stderr_contents(job_id).decode("utf-8")
job_stdout_contents = manager.job_stdout_contents(job_id).decode("utf-8")
job_stderr_contents = manager.job_stderr_contents(job_id).decode("utf-8")
job_directory = manager.job_directory(job_id)
as_dict = dict(
job_id=job_id,
complete="true", # Is this still used or is it legacy.
status=complete_status,
returncode=return_code,
stdout=stdout_contents,
stderr=stderr_contents,
job_stdout=job_stdout_contents,
job_stderr=job_stderr_contents,
working_directory=job_directory.working_directory(),
metadata_directory=job_directory.metadata_directory(),
job_directory=job_directory.job_directory,
working_directory_contents=job_directory.working_directory_contents(),
metadata_directory_contents=job_directory.metadata_directory_contents(),
outputs_directory_contents=job_directory.outputs_directory_contents(),
job_directory_contents=job_directory.job_directory_contents(),
system_properties=manager.system_properties(),
pulsar_version=pulsar_version,
realized_dynamic_file_sources=realized_dynamic_file_sources(job_directory)
)
return as_dict
[docs]def submit_job(manager, job_config):
""" Launch new job from specified config. May have been previously 'setup'
if 'setup_params' in job_config is empty.
"""
# job_config is raw dictionary from JSON (from MQ or HTTP endpoint).
job_id = job_config.get('job_id')
try:
command_line = job_config.get('command_line')
setup_params = job_config.get('setup_params', {})
force_setup = job_config.get('setup')
remote_staging = job_config.get('remote_staging', {})
dependencies_description = job_config.get('dependencies_description', None)
env = job_config.get('env', [])
submit_params = job_config.get('submit_params', {})
touch_outputs = job_config.get('touch_outputs', [])
dynamic_file_sources = job_config.get("dynamic_file_sources", None)
token_endpoint = job_config.get("token_endpoint", None)
job_config = None
if setup_params or force_setup:
input_job_id = setup_params.get("job_id", job_id)
tool_id = setup_params.get("tool_id", None)
tool_version = setup_params.get("tool_version", None)
job_config = setup_job(
manager,
input_job_id,
tool_id,
tool_version,
)
if job_config is not None:
job_directory = job_config["job_directory"]
jobs_directory = os.path.abspath(os.path.join(job_directory, os.pardir))
command_line = command_line.replace('__PULSAR_JOBS_DIRECTORY__', jobs_directory)
# TODO: Handle __PULSAR_JOB_DIRECTORY__ config files, metadata files, etc...
manager.touch_outputs(job_id, touch_outputs)
launch_config = {
"remote_staging": remote_staging,
"command_line": command_line,
"dependencies_description": dependencies_description,
"submit_params": submit_params,
"env": env,
"setup_params": setup_params,
"dynamic_file_sources": dynamic_file_sources,
"token_endpoint": token_endpoint,
}
manager.preprocess_and_launch(job_id, launch_config)
except Exception:
manager.handle_failure_before_launch(job_id)
raise
[docs]def setup_job(manager, job_id, tool_id, tool_version):
""" Setup new job from these inputs and return dict summarizing state
(used to configure command line).
"""
job_id = manager.setup_job(job_id, tool_id, tool_version)
return build_job_config(
job_id=job_id,
job_directory=manager.job_directory(job_id),
system_properties=manager.system_properties(),
tool_id=tool_id,
tool_version=tool_version
)