import logging
from abc import (
ABCMeta,
abstractmethod,
)
from io import BytesIO
from string import Template
from urllib.parse import (
urlencode,
urljoin,
)
from galaxy.util import unicodify
from .util import copy_to_path
log = logging.getLogger(__name__)
[docs]
class PulsarInterface(metaclass=ABCMeta):
"""
Abstract base class describes how synchronous client communicates with
(potentially remote) Pulsar procedures. Obvious implementation is HTTP based
but Pulsar objects wrapped in routes can also be directly communicated with
if in memory.
"""
[docs]
@abstractmethod
def execute(self, command, args=None, data=None, input_path=None, output_path=None):
"""
Execute the correspond command against configured Pulsar job manager. Arguments are
method parameters and data or input_path describe essentially POST bodies. If command
results in a file, resulting path should be specified as output_path.
"""
COMMAND_TO_PATH = {
"path": Template("jobs/${job_id}/files/path"),
"upload_file": Template("jobs/${job_id}/files"),
"download_output": Template("jobs/${job_id}/files"),
"setup": Template("jobs"),
"clean": Template("jobs/${job_id}"),
"status": Template("jobs/${job_id}/status"),
"cancel": Template("jobs/${job_id}/cancel"),
"submit": Template("jobs/${job_id}/submit"),
"file_available": Template("cache/status"),
"cache_required": Template("cache"),
"cache_insert": Template("cache"),
"object_store_exists": Template("objects/${object_id}/exists"),
"object_store_file_ready": Template("objects/${object_id}/file_ready"),
"object_store_update_from_file": Template("objects/${object_id}"),
"object_store_create": Template("objects/${object_id}"),
"object_store_empty": Template("objects/${object_id}/empty"),
"object_store_size": Template("objects/${object_id}/size"),
"object_store_delete": Template("objects/${object_id}"),
"object_store_get_data": Template("objects/${object_id}"),
"object_store_get_filename": Template("objects/${object_id}/filename"),
"object_store_get_store_usage_percent": Template("object_store_usage_percent")
}
COMMAND_TO_METHOD = {
"upload_file": "POST",
"download_output": "GET",
"setup": "POST",
"submit": "POST",
"clean": "DELETE",
"cancel": "PUT",
"object_store_update_from_file": "PUT",
"object_store_create": "POST",
"object_store_delete": "DELETE",
"file_available": "GET",
"cache_required": "PUT",
"cache_insert": "POST",
}
[docs]
class HttpPulsarInterface(PulsarInterface):
def __init__(self, destination_params, transport):
self.transport = transport
remote_host = destination_params.get("url")
assert remote_host is not None, "Failed to determine url for Pulsar client."
if not remote_host.startswith("http"):
remote_host = "http://%s" % remote_host
manager = destination_params.get("manager", None)
if manager:
if "/managers/" in remote_host:
log.warning("Ignoring manager tag '%s', Pulsar client URL already contains a \"/managers/\" path." % manager)
else:
remote_host = urljoin(remote_host, "managers/%s" % manager)
if not remote_host.endswith("/"):
remote_host = "%s/" % remote_host
self.remote_host = remote_host
self.private_token = destination_params.get("private_token", None)
[docs]
def execute(self, command, args=None, data=None, input_path=None, output_path=None):
url = self.__build_url(command, args)
method = COMMAND_TO_METHOD.get(command, None) # Default to GET is no data, POST otherwise
response = self.transport.execute(url, method=method, data=data, input_path=input_path, output_path=output_path)
return response
def __build_url(self, command, args):
if args is None:
args = {}
path = COMMAND_TO_PATH.get(command, Template(command)).safe_substitute(args)
if self.private_token:
args["private_token"] = self.private_token
arg_bytes = {k: unicodify(args[k]).encode('utf-8') for k in args}
data = urlencode(arg_bytes)
url = self.remote_host + path + "?" + data
return url
[docs]
class LocalPulsarInterface(PulsarInterface):
def __init__(self, destination_params, job_manager=None, pulsar_app=None, file_cache=None, object_store=None):
if job_manager is None:
job_manager_name = destination_params.get("manager", None)
if job_manager_name is None:
job_manager = pulsar_app.only_manager
else:
job_manager = pulsar_app.managers[job_manager_name]
self.job_manager = job_manager
self.file_cache = file_cache
self.object_store = object_store
def __app_args(self):
# Arguments that would be specified from PulsarApp if running
# in web server.
return {
'manager': self.job_manager,
'file_cache': self.file_cache,
'object_store': self.object_store,
'ip': None
}
[docs]
def execute(self, command, args=None, data=None, input_path=None, output_path=None):
if args is None:
args = {}
# If data set, should be unicode (on Python 2) or str (on Python 3).
from pulsar.web import routes
from pulsar.web.framework import build_func_args
controller = getattr(routes, command)
action = controller.func
body_args = dict(body=self.__build_body(data, input_path))
args = build_func_args(action, args.copy(), self.__app_args(), body_args)
result = action(**args)
if controller.response_type != 'file':
return controller.body(result)
else:
with open(result, 'rb') as result_file:
copy_to_path(result_file, output_path)
def __build_body(self, data, input_path):
if data is not None:
return BytesIO(data)
elif input_path is not None:
return open(input_path, 'rb')
else:
return None