Source code for pulsar.web.routes

import logging
import os
from json import loads

from webob import exc

from pulsar.client.action_mapper import path_type
from pulsar.client.job_directory import verify_is_in_directory
from pulsar.manager_endpoint_util import (
    setup_job,
    status_dict,
    submit_job,
)
from pulsar.manager_factory import DEFAULT_MANAGER_NAME
from pulsar.util import (
    copy_to_path,
    copy_to_temp,
)
from pulsar.web.framework import Controller

log = logging.getLogger(__name__)


[docs] class PulsarController(Controller): def __init__(self, **kwargs): super().__init__(**kwargs) def _check_access(self, req, environ, start_response): if req.app.private_token: sent_private_token = req.GET.get("private_token", None) if not (req.app.private_token == sent_private_token): return exc.HTTPUnauthorized()(environ, start_response) def _app_args(self, args, req): app = req.app managers = app.managers manager_name = args.get('manager_name', DEFAULT_MANAGER_NAME) app_args = {} app_args['manager'] = managers[manager_name] app_args['file_cache'] = getattr(app, 'file_cache', None) app_args['object_store'] = getattr(app, 'object_store', None) return app_args
@PulsarController(path="/jobs", method="POST", response_type='json') def setup(manager, job_id, tool_id=None, tool_version=None, use_metadata='true'): return __setup(manager, job_id, tool_id=tool_id, tool_version=tool_version) def __setup(manager, job_id, tool_id, tool_version): response = setup_job(manager, job_id, tool_id, tool_version) log.debug("Setup job with configuration: %s" % response) return response @PulsarController(path="/jobs/{job_id}", method="DELETE") def clean(manager, job_id): manager.clean(job_id) @PulsarController(path="/jobs/{job_id}/submit", method="POST") def submit(manager, job_id, command_line, params='{}', dependencies_description='null', setup_params='{}', remote_staging='{}', env='[]', submit_extras='{}', dynamic_file_sources='null'): submit_params = loads(params) setup_params = loads(setup_params) dependencies_description = loads(dependencies_description) env = loads(env) remote_staging = loads(remote_staging) submit_extras = loads(submit_extras) dynamic_file_sources = loads(dynamic_file_sources) submit_config = dict( job_id=job_id, command_line=command_line, setup_params=setup_params, submit_params=submit_params, dependencies_description=dependencies_description, env=env, remote_staging=remote_staging, dynamic_file_sources=dynamic_file_sources, ) submit_config.update(submit_extras) submit_job(manager, submit_config) @PulsarController(path="/jobs/{job_id}/status", response_type='json') def status(manager, job_id): return status_dict(manager, job_id) @PulsarController(path="/jobs/{job_id}/cancel", method="PUT") def cancel(manager, job_id): manager.kill(job_id) @PulsarController(path="/jobs/{job_id}/files", method="POST", response_type='json') def upload_file(manager, type, file_cache, job_id, name, body, cache_token=None): # Input type should be one of input, config, workdir, metadata, tool, or unstructured (see action_mapper.path_type) path = manager.job_directory(job_id).calculate_path(name, type) return _handle_upload(file_cache, path, body, cache_token=cache_token) @PulsarController(path="/jobs/{job_id}/files/path", method="GET", response_type='json') def path(manager, type, job_id, name): if type in [path_type.OUTPUT, path_type.OUTPUT_WORKDIR, path_type.OUTPUT_METADATA]: path = _output_path(manager, job_id, name, type) else: path = manager.job_directory(job_id).calculate_path(name, type) return {'path': path} @PulsarController(path="/jobs/{job_id}/files", method="GET", response_type='file') def download_output(manager, job_id, name, type=path_type.OUTPUT): return _output_path(manager, job_id, name, type)
[docs] def output_path(manager, job_id, name, type=path_type.OUTPUT): # output_type should be one of... # work_dir, direct # Added for non-transfer downloading. return {"path": _output_path(manager, job_id, name, type)}
def _output_path(manager, job_id, name, output_type): """ """ directory = manager.job_directory(job_id).outputs_directory() if output_type == path_type.OUTPUT_WORKDIR: # action_mapper.path_type.OUTPUT_WORKDIR directory = manager.job_directory(job_id).working_directory() elif output_type == path_type.OUTPUT_METADATA: directory = manager.job_directory(job_id).metadata_directory() path = os.path.join(directory, name) verify_is_in_directory(path, directory) return path @PulsarController(path="/cache/status", method="GET", response_type='json') def file_available(file_cache, ip, path): """ Returns {token: <token>, ready: <bool>} """ return file_cache.file_available(ip, path) @PulsarController(path="/cache", method="PUT", response_type='json') def cache_required(file_cache, ip, path): """ Returns bool indicating whether this client should execute cache_insert. Either way client should be follow up with file_available. """ return file_cache.cache_required(ip, path) @PulsarController(path="/cache", method="POST", response_type='json') def cache_insert(file_cache, ip, path, body): temp_path = copy_to_temp(body) file_cache.cache_file(temp_path, ip, path) # TODO: coerce booleans and None values into correct types - simplejson may # do this already but need to check. @PulsarController(path="/objects/{object_id}/exists", response_type='json') def object_store_exists(object_store, object_id, base_dir=None, dir_only=False, extra_dir=None, extra_dir_at_root=False, alt_name=None): obj = PulsarDataset(object_id) return object_store.exists(obj, base_dir=base_dir, dir_only=dir_only, extra_dir=extra_dir, extra_dir_at_root=extra_dir_at_root, alt_name=alt_name) @PulsarController(path="/objects/{object_id}/file_ready", response_type='json') def object_store_file_ready(object_store, object_id, base_dir=None, dir_only=False, extra_dir=None, extra_dir_at_root=False, alt_name=None): obj = PulsarDataset(object_id) return object_store.file_ready(obj, base_dir=base_dir, dir_only=dir_only, extra_dir=extra_dir, extra_dir_at_root=extra_dir_at_root, alt_name=alt_name) @PulsarController(path="/objects/{object_id}", method="POST", response_type='json') def object_store_create(object_store, object_id, base_dir=None, dir_only=False, extra_dir=None, extra_dir_at_root=False, alt_name=None): obj = PulsarDataset(object_id) return object_store.create(obj, base_dir=base_dir, dir_only=dir_only, extra_dir=extra_dir, extra_dir_at_root=extra_dir_at_root, alt_name=alt_name) @PulsarController(path="/objects/{object_id}/empty", response_type='json') def object_store_empty(object_store, object_id, base_dir=None, extra_dir=None, extra_dir_at_root=False, alt_name=None): obj = PulsarDataset(object_id) return object_store.empty(obj, base_dir=base_dir, extra_dir=extra_dir, extra_dir_at_root=extra_dir_at_root, alt_name=alt_name) @PulsarController(path="/objects/{object_id}/size", response_type='json') def object_store_size(object_store, object_id, extra_dir=None, extra_dir_at_root=False, alt_name=None): obj = PulsarDataset(object_id) return object_store.size(obj, extra_dir=extra_dir, extra_dir_at_root=extra_dir_at_root, alt_name=alt_name) @PulsarController(path="/objects/{object_id}", method="DELETE", response_type='json') def object_store_delete(object_store, object_id, entire_dir=False, base_dir=None, extra_dir=None, extra_dir_at_root=False, alt_name=None): obj = PulsarDataset(object_id) return object_store.delete(obj, entire_dir=False, base_dir=None, extra_dir=extra_dir, extra_dir_at_root=extra_dir_at_root, alt_name=alt_name) @PulsarController(path="/objects/{object_id}", method="GET", response_type='json') def object_store_get_data(object_store, object_id, start=0, count=-1, base_dir=None, extra_dir=None, extra_dir_at_root=False, alt_name=None): obj = PulsarDataset(object_id) return object_store.get_data(obj, start=int(start), count=int(count), entire_dir=False, base_dir=None, extra_dir=extra_dir, extra_dir_at_root=extra_dir_at_root, alt_name=alt_name) @PulsarController(path="/objects/{object_id}/filename", response_type='json') def object_store_get_filename(object_store, object_id, base_dir=None, dir_only=False, extra_dir=None, extra_dir_at_root=False, alt_name=None): obj = PulsarDataset(object_id) return object_store.get_filename(obj, base_dir=base_dir, dir_only=dir_only, extra_dir=extra_dir, extra_dir_at_root=extra_dir_at_root, alt_name=alt_name) @PulsarController(path="/objects/{object_id}", method="PUT", response_type='json') def object_store_update_from_file(object_store, object_id, base_dir=None, extra_dir=None, extra_dir_at_root=False, alt_name=None, file_name=None, create=False): obj = PulsarDataset(object_id) return object_store.update_from_file(obj, base_dir=base_dir, extra_dir=extra_dir, extra_dir_at_root=extra_dir_at_root, alt_name=alt_name, file_name=file_name, create=create) @PulsarController(path="/object_store_usage_percent", response_type='json') def object_store_get_store_usage_percent(object_store): return object_store.get_store_usage_percent()
[docs] class PulsarDataset: """Intermediary between Pulsar and objectstore.""" def __init__(self, id): self.id = id self.object_store_id = None
def _handle_upload(file_cache, path, body, cache_token=None): source = body if cache_token: cached_file = file_cache.destination(cache_token) source = open(cached_file, 'rb') log.info("Copying cached file {} to {}".format(cached_file, path)) copy_to_path(source, path) return {"path": path}