import hashlib
import json
import os.path
import shutil
from base64 import (
b64decode as _b64decode,
b64encode as _b64encode,
)
from enum import Enum
from errno import (
EEXIST,
ENOENT,
)
from functools import wraps
from os import (
curdir,
listdir,
makedirs,
unlink,
walk,
)
from os.path import (
abspath,
exists,
join,
relpath,
)
from threading import (
Event,
Lock,
)
from weakref import WeakValueDictionary
# TODO: move to galaxy.util so it doesn't have to be duplicated
# twice in pulsar.
BUFFER_SIZE = 4096
[docs]
def copy_to_path(object, path):
"""
Copy file-like object to path.
"""
output = open(path, 'wb')
_copy_and_close(object, output)
def _copy_and_close(object, output):
try:
while True:
buffer = object.read(BUFFER_SIZE)
if not buffer:
break
output.write(buffer)
finally:
output.close()
# Variant of base64 compat layer inspired by BSD code from Bcfg2
# https://github.com/Bcfg2/bcfg2/blob/maint/src/lib/Bcfg2/Compat.py
@wraps(_b64encode)
def b64encode(val, **kwargs):
try:
return _b64encode(val, **kwargs)
except TypeError:
return _b64encode(val.encode('UTF-8'), **kwargs).decode('UTF-8')
@wraps(_b64decode)
def b64decode(val, **kwargs):
return _b64decode(val.encode('UTF-8'), **kwargs).decode('UTF-8')
[docs]
def unique_path_prefix(path):
m = hashlib.md5()
m.update(path.encode('utf-8'))
return m.hexdigest()
[docs]
def copy(source, destination):
""" Copy file from source to destination if needed (skip if source
is destination).
"""
source = os.path.abspath(source)
destination = os.path.abspath(destination)
if source != destination:
if not os.path.exists(os.path.dirname(destination)):
os.makedirs(os.path.dirname(destination))
shutil.copyfile(source, destination)
[docs]
def ensure_directory(file_path):
directory = os.path.dirname(file_path)
if not os.path.exists(directory):
os.makedirs(directory)
[docs]
def directory_files(directory):
"""
>>> from tempfile import mkdtemp
>>> from shutil import rmtree
>>> from os.path import join
>>> from os import makedirs
>>> tempdir = mkdtemp()
>>> with open(join(tempdir, "moo"), "w") as f: pass
>>> directory_files(tempdir)
['moo']
>>> subdir = join(tempdir, "cow", "sub1")
>>> makedirs(subdir)
>>> with open(join(subdir, "subfile1"), "w") as f: pass
>>> with open(join(subdir, "subfile2"), "w") as f: pass
>>> sorted(directory_files(tempdir))
['cow/sub1/subfile1', 'cow/sub1/subfile2', 'moo']
>>> rmtree(tempdir)
"""
contents = []
for path, _, files in walk(directory):
relative_path = relpath(path, directory)
for name in files:
# Return file1.txt, dataset_1_files/image.png, etc... don't
# include . in path.
if relative_path != curdir:
contents.append(join(relative_path, name))
else:
contents.append(name)
return contents
[docs]
def filter_destination_params(destination_params, prefix):
destination_params = destination_params or {}
return {
key[len(prefix):]: destination_params[key]
for key in destination_params if key.startswith(prefix)
}
[docs]
def to_base64_json(data):
"""
>>> enc = to_base64_json(dict(a=5))
>>> dec = from_base64_json(enc)
>>> dec["a"]
5
"""
dumped = json_dumps(data)
return b64encode(dumped)
[docs]
def from_base64_json(data):
return json.loads(b64decode(data))
[docs]
class PathHelper:
'''
>>> import posixpath
>>> # Forcing local path to posixpath because Pulsar designed to be used with
>>> # posix client.
>>> posix_path_helper = PathHelper("/", local_path_module=posixpath)
>>> windows_slash = "\\\\"
>>> len(windows_slash)
1
>>> nt_path_helper = PathHelper(windows_slash, local_path_module=posixpath)
>>> posix_path_helper.remote_name("moo/cow")
'moo/cow'
>>> nt_path_helper.remote_name("moo/cow")
'moo\\\\cow'
>>> posix_path_helper.local_name("moo/cow")
'moo/cow'
>>> nt_path_helper.local_name("moo\\\\cow")
'moo/cow'
>>> posix_path_helper.from_posix_with_new_base("/galaxy/data/bowtie/hg19.fa", "/galaxy/data/", "/work/galaxy/data")
'/work/galaxy/data/bowtie/hg19.fa'
>>> posix_path_helper.from_posix_with_new_base("/galaxy/data/bowtie/hg19.fa", "/galaxy/data", "/work/galaxy/data")
'/work/galaxy/data/bowtie/hg19.fa'
>>> posix_path_helper.from_posix_with_new_base("/galaxy/data/bowtie/hg19.fa", "/galaxy/data", "/work/galaxy/data/")
'/work/galaxy/data/bowtie/hg19.fa'
'''
def __init__(self, separator, local_path_module=os.path):
self.separator = separator
self.local_join = local_path_module.join
self.local_sep = local_path_module.sep
[docs]
def remote_name(self, local_name):
return self.remote_join(*local_name.split(self.local_sep))
[docs]
def local_name(self, remote_name):
return self.local_join(*remote_name.split(self.separator))
[docs]
def remote_join(self, *args):
return self.separator.join(args)
[docs]
def from_posix_with_new_base(self, posix_path, old_base, new_base):
# TODO: Test with new_base as a windows path against nt_path_helper.
if old_base.endswith("/"):
old_base = old_base[:-1]
if not posix_path.startswith(old_base):
message_template = "Cannot compute new path for file %s, does not start with %s."
message = message_template % (posix_path, old_base)
raise Exception(message)
stripped_path = posix_path[len(old_base):]
while stripped_path.startswith("/"):
stripped_path = stripped_path[1:]
path_parts = stripped_path.split(self.separator)
if new_base.endswith(self.separator):
new_base = new_base[:-len(self.separator)]
return self.remote_join(new_base, *path_parts)
[docs]
class TransferEventManager:
def __init__(self):
self.events = WeakValueDictionary(dict())
self.events_lock = Lock()
[docs]
def acquire_event(self, path, force_clear=False):
with self.events_lock:
if path in self.events:
event_holder = self.events[path]
else:
event_holder = EventHolder(Event(), path, self)
self.events[path] = event_holder
if force_clear:
event_holder.event.clear()
return event_holder
[docs]
class EventHolder:
def __init__(self, event, path, condition_manager):
self.event = event
self.path = path
self.condition_manager = condition_manager
self.failed = False
[docs]
def release(self):
self.event.set()
[docs]
def fail(self):
self.failed = True
[docs]
def json_loads(obj):
if isinstance(obj, bytes):
obj = obj.decode("utf-8")
return json.loads(obj)
[docs]
def json_dumps(obj):
if isinstance(obj, bytes):
obj = obj.decode("utf-8")
return json.dumps(obj, cls=ClientJsonEncoder)
[docs]
class ClientJsonEncoder(json.JSONEncoder):
[docs]
def default(self, obj):
if isinstance(obj, bytes):
return obj.decode("utf-8")
return json.JSONEncoder.default(self, obj)
[docs]
class MessageQueueUUIDStore:
"""Persistent dict-like object for persisting message queue UUIDs that are
awaiting acknowledgement or that have been operated on.
"""
def __init__(self, persistence_directory, subdirs=None):
if subdirs is None:
subdirs = ['acknowledge_uuids']
self.__store = abspath(join(persistence_directory, *subdirs))
try:
makedirs(self.__store)
except OSError as exc:
if exc.errno != EEXIST:
raise
def __path(self, item):
return join(self.__store, item)
def __contains__(self, item):
return exists(self.__path(item))
def __setitem__(self, key, value):
open(self.__path(key), 'w').write(json.dumps(value))
def __getitem__(self, key):
return json.loads(open(self.__path(key)).read())
def __delitem__(self, key):
try:
unlink(self.__path(key))
except OSError as exc:
if exc.errno == ENOENT:
raise KeyError(key)
raise
[docs]
def keys(self):
return iter(listdir(self.__store))
[docs]
def get_time(self, key):
try:
return os.stat(self.__path(key)).st_mtime
except OSError as exc:
if exc.errno == ENOENT:
raise KeyError(key)
raise
[docs]
def set_time(self, key):
try:
os.utime(self.__path(key), None)
except OSError as exc:
if exc.errno == ENOENT:
raise KeyError(key)
raise
[docs]
class ExternalId:
external_id: str
def __init__(self, external_id: str):
self.external_id = external_id
[docs]
class MonitorStyle(str, Enum):
FOREGROUND = "foreground"
BACKGROUND = "background"
NONE = "none"