import fnmatch
import tempfile
from contextlib import contextmanager
from os import (
makedirs,
unlink,
)
from os.path import (
abspath,
basename,
dirname,
exists,
join,
sep,
)
from re import (
compile,
escape,
)
from typing import (
Any,
Dict,
List,
Type,
)
from urllib.parse import urlencode
from galaxy.util.bunch import Bunch
from .config_util import read_file
from .transport import (
get_file,
post_file,
rsync_get_file,
rsync_post_file,
scp_get_file,
scp_post_file,
)
from .util import (
copy_to_path,
directory_files,
unique_path_prefix,
)
DEFAULT_MAPPED_ACTION = 'transfer' # Not really clear to me what this should be, exception?
DEFAULT_PATH_MAPPER_TYPE = 'prefix'
STAGING_ACTION_REMOTE = "remote"
STAGING_ACTION_LOCAL = "local"
STAGING_ACTION_NONE = None
STAGING_ACTION_DEFAULT = "default"
# Poor man's enum.
path_type = Bunch(
# Galaxy input datasets and extra files.
INPUT="input",
# Galaxy config and param files.
CONFIG="config",
# Files from tool's tool_dir (for now just wrapper if available).
TOOL="tool",
# Input tool work dir files - e.g. task-split input file
WORKDIR="workdir",
# Job directory files (e.g. tool standard input/output and containerized command).
JOBDIR="jobdir",
# Input metadata dir files - e.g. metadata files, etc..
METADATA="metadata",
# Galaxy output datasets in their final home.
OUTPUT="output",
# Galaxy from_work_dir output paths and other files (e.g. galaxy.json)
OUTPUT_WORKDIR="output_workdir",
# Meta job and data files (e.g. Galaxy metadata generation files and
# metric instrumentation files)
OUTPUT_METADATA="output_metadata",
# Job directory files output.
OUTPUT_JOBDIR="output_jobdir",
# Other fixed tool parameter paths (likely coming from tool data, but not
# necessarily).
UNSTRUCTURED="unstructured",
)
ACTION_DEFAULT_PATH_TYPES = [
path_type.INPUT,
path_type.CONFIG,
path_type.TOOL,
path_type.WORKDIR,
path_type.JOBDIR,
path_type.METADATA,
path_type.OUTPUT,
path_type.OUTPUT_WORKDIR,
path_type.OUTPUT_METADATA,
path_type.OUTPUT_JOBDIR,
]
ALL_PATH_TYPES = ACTION_DEFAULT_PATH_TYPES + [path_type.UNSTRUCTURED]
MISSING_FILES_ENDPOINT_ERROR = "Attempted to use remote_transfer action without defining a files_endpoint."
MISSING_SSH_KEY_ERROR = "Attempt to use file transfer action requiring an SSH key without specifying a ssh_key."
[docs]
class FileActionMapper:
"""
Objects of this class define how paths are mapped to actions.
>>> json_string = r'''{"paths": [ \
{"path": "/opt/galaxy", "action": "none"}, \
{"path": "/galaxy/data", "action": "transfer"}, \
{"path": "/cool/bamfiles/**/*.bam", "action": "copy", "match_type": "glob"}, \
{"path": ".*/dataset_\\\\d+.dat", "action": "copy", "match_type": "regex"} \
]}'''
>>> from tempfile import NamedTemporaryFile
>>> from os import unlink
>>> def mapper_for(default_action, config_contents):
... f = NamedTemporaryFile(delete=False)
... f.write(config_contents.encode('UTF-8'))
... f.close()
... mock_client = Bunch(default_file_action=default_action, action_config_path=f.name, files_endpoint=None)
... mapper = FileActionMapper(mock_client)
... as_dict = config=mapper.to_dict()
... mapper = FileActionMapper(config=as_dict) # Serialize and deserialize it to make sure still works
... unlink(f.name)
... return mapper
>>> mapper = mapper_for(default_action='none', config_contents=json_string)
>>> # Test first config line above, implicit path prefix mapper
>>> action = mapper.action({'path': '/opt/galaxy/tools/filters/catWrapper.py'}, 'input')
>>> action.action_type == u'none'
True
>>> action.staging_needed
False
>>> # Test another (2nd) mapper, this one with a different action
>>> action = mapper.action({'path': '/galaxy/data/files/000/dataset_1.dat'}, 'input')
>>> action.action_type == u'transfer'
True
>>> action.staging_needed
True
>>> # Always at least copy work_dir outputs.
>>> action = mapper.action({'path': '/opt/galaxy/database/working_directory/45.sh'}, 'workdir')
>>> action.action_type == u'copy'
True
>>> action.staging_needed
True
>>> # Test glob mapper (matching test)
>>> mapper.action({'path': '/cool/bamfiles/projectABC/study1/patient3.bam'}, 'input').action_type == u'copy'
True
>>> # Test glob mapper (non-matching test)
>>> mapper.action({'path': '/cool/bamfiles/projectABC/study1/patient3.bam.bai'}, 'input').action_type == u'none'
True
>>> # Regex mapper test.
>>> mapper.action({'path': '/old/galaxy/data/dataset_10245.dat'}, 'input').action_type == u'copy'
True
>>> # Doesn't map unstructured paths by default
>>> mapper.action({'path': '/old/galaxy/data/dataset_10245.dat'}, 'unstructured').action_type == u'none'
True
>>> input_only_mapper = mapper_for(default_action="none", config_contents=r'''{"paths": [ \
{"path": "/", "action": "transfer", "path_types": "input"} \
] }''')
>>> input_only_mapper.action({'path': '/dataset_1.dat'}, 'input').action_type == u'transfer'
True
>>> input_only_mapper.action({'path': '/dataset_1.dat'}, 'output').action_type == u'none'
True
>>> unstructured_mapper = mapper_for(default_action="none", config_contents=r'''{"paths": [ \
{"path": "/", "action": "transfer", "path_types": "*any*"} \
] }''')
>>> unstructured_mapper.action({'path': '/old/galaxy/data/dataset_10245.dat'}, 'unstructured').action_type == u'transfer'
True
>>> match_type_only_mapper = mapper_for(default_action="none", config_contents=r'''{"paths": [ \
{"action": "transfer", "path_types": "input"}, \
{"action": "remote_copy", "path_types": "output"} \
] }''')
>>> input_action = match_type_only_mapper.action({}, 'input')
>>> input_action.action_type
'transfer'
>>> output_action = match_type_only_mapper.action({}, 'output')
>>> output_action.action_type
'remote_copy'
"""
def __init__(self, client=None, config=None):
if config is None and client is None:
message = "FileActionMapper must be constructed from either a client or a config dictionary."
raise Exception(message)
if config is None:
config = self.__client_to_config(client)
self.default_action = config.get("default_action", "transfer")
self.ssh_key = config.get("ssh_key", None)
self.ssh_user = config.get("ssh_user", None)
self.ssh_host = config.get("ssh_host", None)
self.ssh_port = config.get("ssh_port", None)
self.mappers = mappers_from_dicts(config.get("paths", []))
self.files_endpoint = config.get("files_endpoint", None)
[docs]
def action(self, source, type, mapper=None):
path = source.get("path", None)
mapper = self.__find_mapper(path, type, mapper)
action_class = self.__action_class(path, type, mapper)
file_lister = DEFAULT_FILE_LISTER
action_kwds = {}
if mapper:
file_lister = mapper.file_lister
action_kwds = mapper.action_kwds
action = action_class(source, file_lister=file_lister, **action_kwds)
self.__process_action(action, type)
return action
[docs]
def unstructured_mappers(self):
""" Return mappers that will map 'unstructured' files (i.e. go beyond
mapping inputs, outputs, and config files).
"""
return filter(lambda m: path_type.UNSTRUCTURED in m.path_types, self.mappers)
[docs]
def to_dict(self):
return dict(
default_action=self.default_action,
files_endpoint=self.files_endpoint,
ssh_key=self.ssh_key,
ssh_user=self.ssh_user,
ssh_port=self.ssh_port,
ssh_host=self.ssh_host,
paths=list(map(lambda m: m.to_dict(), self.mappers))
)
def __client_to_config(self, client):
action_config_path = client.action_config_path
if action_config_path:
config = read_file(action_config_path)
else:
config = getattr(client, "file_actions", {})
config["default_action"] = client.default_file_action
config["files_endpoint"] = client.files_endpoint
for attr in ['ssh_key', 'ssh_user', 'ssh_port', 'ssh_host']:
if hasattr(client, attr):
config[attr] = getattr(client, attr)
return config
def __find_mapper(self, path, type, mapper=None):
if not mapper:
if path is not None:
normalized_path = abspath(path)
else:
normalized_path = None
for query_mapper in self.mappers:
if query_mapper.matches(normalized_path, type):
mapper = query_mapper
break
return mapper
def __action_class(self, path, type, mapper):
action_type = self.default_action if type in ACTION_DEFAULT_PATH_TYPES else "none"
if mapper:
action_type = mapper.action_type
if type in ["workdir", "jobdir", "output_workdir", "output_metadata", "output_jobdir"] and action_type == "none":
# We are changing the working_directory/job_directory relative to what
# Galaxy would use, these need to be copied over.
action_type = "copy"
action_class = actions.get(action_type, None)
if action_class is None:
message_template = "Unknown action_type encountered %s while trying to map path %s"
message_args = (action_type, path)
raise Exception(message_template % message_args)
return action_class
def __process_action(self, action, file_type):
""" Extension point to populate extra action information after an
action has been created.
"""
if getattr(action, "inject_url", False):
self.__inject_url(action, file_type)
if getattr(action, "inject_ssh_properties", False):
self.__inject_ssh_properties(action)
def __inject_url(self, action, file_type):
url_base = self.files_endpoint
if not url_base:
raise Exception(MISSING_FILES_ENDPOINT_ERROR)
if "?" not in url_base:
url_base = "%s?" % url_base
else:
url_base = "%s&" % url_base
url_params = urlencode({"path": action.path, "file_type": file_type})
action.url = f"{url_base}{url_params}"
def __inject_ssh_properties(self, action):
for attr in ["ssh_key", "ssh_host", "ssh_port", "ssh_user"]:
action_attr = getattr(action, attr)
if action_attr == UNSET_ACTION_KWD:
client_default_attr = getattr(self, attr, None)
setattr(action, attr, client_default_attr)
if action.ssh_key is None:
raise Exception(MISSING_SSH_KEY_ERROR)
REQUIRED_ACTION_KWD = object()
UNSET_ACTION_KWD = "__UNSET__"
class BaseAction:
whole_directory_transfer_supported = False
action_spec: Dict[str, Any] = {}
action_type: str
def __init__(self, source, file_lister=None):
self.source = source
self.file_lister = file_lister or DEFAULT_FILE_LISTER
@property
def path(self):
return self.source.get("path")
def unstructured_map(self, path_helper):
unstructured_map = self.file_lister.unstructured_map(self.path)
if self.staging_needed:
# To ensure uniqueness, prepend unique prefix to each name
prefix = unique_path_prefix(self.path)
for path, name in unstructured_map.items():
unstructured_map[path] = join(prefix, name)
else:
path_rewrites = {}
for path in unstructured_map:
rewrite = self.path_rewrite(path_helper, path)
if rewrite:
path_rewrites[path] = rewrite
unstructured_map = path_rewrites
return unstructured_map
@property
def staging_needed(self):
return self.staging != STAGING_ACTION_NONE
@property
def staging_action_local(self):
return self.staging == STAGING_ACTION_LOCAL
def _extend_base_dict(self, **kwds):
base_dict = dict(
path=self.path, # For older Pulsar servers (pre-0.13.0?)
source=self.source,
action_type=self.action_type,
)
base_dict.update(**kwds)
return base_dict
def to_dict(self):
return self._extend_base_dict()
def __str__(self):
as_dict = self.to_dict()
attribute_str = ""
first = True
for key, value in as_dict.items():
if key == "source":
continue
if first:
first = False
else:
attribute_str += ","
attribute_str += "{}={}".format(key, value)
return "FileAction[%s]" % attribute_str
class NoneAction(BaseAction):
""" This action indicates the corresponding path does not require any
additional action. This should indicate paths that are available both on
the Pulsar client (i.e. Galaxy server) and remote Pulsar server with the same
paths. """
action_type = "none"
staging = STAGING_ACTION_NONE
def to_dict(self):
return self._extend_base_dict()
@classmethod
def from_dict(cls, action_dict):
return NoneAction(source=action_dict["source"])
def path_rewrite(self, path_helper, path=None):
return None
class RewriteAction(BaseAction):
""" This actin indicates the Pulsar server should simply rewrite the path
to the specified file.
"""
action_spec = dict(
source_directory=REQUIRED_ACTION_KWD,
destination_directory=REQUIRED_ACTION_KWD
)
action_type = "rewrite"
staging = STAGING_ACTION_NONE
def __init__(self, source, file_lister=None, source_directory=None, destination_directory=None):
super().__init__(source, file_lister=file_lister)
self.source_directory = source_directory
self.destination_directory = destination_directory
def to_dict(self):
return self._extend_base_dict(
source_directory=self.source_directory,
destination_directory=self.destination_directory,
)
@classmethod
def from_dict(cls, action_dict):
return RewriteAction(
source=action_dict["source"],
source_directory=action_dict["source_directory"],
destination_directory=action_dict["destination_directory"],
)
def path_rewrite(self, path_helper, path=None):
if not path:
path = self.path
new_path = path_helper.from_posix_with_new_base(self.path, self.source_directory, self.destination_directory)
return None if new_path == self.path else new_path
class TransferAction(BaseAction):
""" This actions indicates that the Pulsar client should initiate an HTTP
transfer of the corresponding path to the remote Pulsar server before
launching the job. """
action_type = "transfer"
staging = STAGING_ACTION_LOCAL
class CopyAction(BaseAction):
""" This action indicates that the Pulsar client should execute a file system
copy of the corresponding path to the Pulsar staging directory prior to
launching the corresponding job. """
action_type = "copy"
staging = STAGING_ACTION_LOCAL
class RemoteCopyAction(BaseAction):
""" This action indicates the Pulsar server should copy the file before
execution via direct file system copy. This is like a CopyAction, but
it indicates the action should occur on the Pulsar server instead of on
the client.
"""
action_type = "remote_copy"
staging = STAGING_ACTION_REMOTE
@classmethod
def from_dict(cls, action_dict):
return RemoteCopyAction(source=action_dict["source"])
def write_to_path(self, path):
copy_to_path(open(self.path, "rb"), path)
def write_from_path(self, pulsar_path):
destination = self.path
parent_directory = dirname(destination)
if not exists(parent_directory):
makedirs(parent_directory)
with open(pulsar_path, "rb") as f:
copy_to_path(f, destination)
[docs]
class RemoteTransferAction(BaseAction):
""" This action indicates the Pulsar server should transfer the file before
execution via one of the remote transfer implementations. This is like a TransferAction, but
it indicates the action requires network access to the staging server, and
should be executed via ssh/rsync/etc
"""
inject_url = True
action_type = "remote_transfer"
staging = STAGING_ACTION_REMOTE
def __init__(self, source, file_lister=None, url=None):
super().__init__(source, file_lister=file_lister)
self.url = url
[docs]
def to_dict(self):
return self._extend_base_dict(url=self.url)
[docs]
@classmethod
def from_dict(cls, action_dict):
return RemoteTransferAction(source=action_dict["source"], url=action_dict["url"])
[docs]
def write_to_path(self, path):
get_file(self.url, path)
[docs]
def write_from_path(self, pulsar_path):
post_file(self.url, pulsar_path)
class RemoteObjectStoreCopyAction(BaseAction):
"""
"""
action_type = "remote_object_store_copy"
staging = STAGING_ACTION_REMOTE
inject_object_store = True
@classmethod
def from_dict(cls, action_dict):
return RemoteObjectStoreCopyAction(source=action_dict["source"])
def write_to_path(self, path):
assert self.object_store # Make sure object_store attribute injected
assert "object_store_ref" in self.source
object_store_ref = self.source["object_store_ref"]
dataset_object = Bunch(
id=object_store_ref["dataset_id"],
uuid=object_store_ref["dataset_uuid"],
object_store_id=object_store_ref["object_store_id"],
)
filename = self.object_store.get_filename(dataset_object)
copy_to_path(open(filename, 'rb'), path)
def write_from_path(self, pulsar_path):
raise NotImplementedError("Writing raw files to object store not supported at this time.")
class PubkeyAuthenticatedTransferAction(BaseAction):
"""Base class for file transfers requiring an SSH public/private key
"""
inject_ssh_properties = True
action_spec = dict(
ssh_key=UNSET_ACTION_KWD,
ssh_user=UNSET_ACTION_KWD,
ssh_host=UNSET_ACTION_KWD,
ssh_port=UNSET_ACTION_KWD,
)
staging = STAGING_ACTION_REMOTE
def __init__(self, source, file_lister=None, ssh_user=UNSET_ACTION_KWD,
ssh_host=UNSET_ACTION_KWD, ssh_port=UNSET_ACTION_KWD, ssh_key=UNSET_ACTION_KWD):
super().__init__(source, file_lister=file_lister)
self.ssh_user = ssh_user
self.ssh_host = ssh_host
self.ssh_port = ssh_port
self.ssh_key = ssh_key
def to_dict(self):
return self._extend_base_dict(
ssh_user=self.ssh_user,
ssh_host=self.ssh_host,
ssh_port=self.ssh_port
)
@contextmanager
def _serialized_key(self):
key_file = self.__serialize_ssh_key()
yield key_file
self.__cleanup_ssh_key(key_file)
def __serialize_ssh_key(self):
f = tempfile.NamedTemporaryFile(delete=False)
if self.ssh_key is not None:
f.write(self.ssh_key.encode("utf-8"))
else:
raise Exception("SSH_KEY not available")
return f.name
def __cleanup_ssh_key(self, keyfile):
if exists(keyfile):
unlink(keyfile)
class RsyncTransferAction(PubkeyAuthenticatedTransferAction):
action_type = "remote_rsync_transfer"
@classmethod
def from_dict(cls, action_dict):
return RsyncTransferAction(source=action_dict["source"],
ssh_user=action_dict["ssh_user"],
ssh_host=action_dict["ssh_host"],
ssh_port=action_dict["ssh_port"],
ssh_key=action_dict["ssh_key"])
def write_to_path(self, path):
with self._serialized_key() as key_file:
rsync_get_file(self.path, path, self.ssh_user, self.ssh_host,
self.ssh_port, key_file)
def write_from_path(self, pulsar_path):
with self._serialized_key() as key_file:
rsync_post_file(pulsar_path, self.path, self.ssh_user,
self.ssh_host, self.ssh_port, key_file)
class ScpTransferAction(PubkeyAuthenticatedTransferAction):
action_type = "remote_scp_transfer"
@classmethod
def from_dict(cls, action_dict):
return ScpTransferAction(source=action_dict["source"],
ssh_user=action_dict["ssh_user"],
ssh_host=action_dict["ssh_host"],
ssh_port=action_dict["ssh_port"],
ssh_key=action_dict["ssh_key"])
def write_to_path(self, path):
with self._serialized_key() as key_file:
scp_get_file(self.path, path, self.ssh_user, self.ssh_host,
self.ssh_port, key_file)
def write_from_path(self, pulsar_path):
with self._serialized_key() as key_file:
scp_post_file(pulsar_path, self.path, self.ssh_user, self.ssh_host,
self.ssh_port, key_file)
[docs]
class MessageAction:
""" Sort of pseudo action describing "files" store in memory and
transferred via message (HTTP, Python-call, MQ, etc...)
"""
action_type = "message"
staging = STAGING_ACTION_DEFAULT
def __init__(self, contents, client=None):
self.contents = contents
self.client = client
@property
def staging_needed(self):
return True
@property
def staging_action_local(self):
# Ekkk, cannot be called if created through from_dict.
# Shouldn't be a problem the way it is used - but is an
# object design problem.
return self.client.prefer_local_staging
[docs]
def to_dict(self):
return dict(contents=self.contents, action_type=MessageAction.action_type)
[docs]
@classmethod
def from_dict(cls, action_dict):
return MessageAction(contents=action_dict["contents"])
[docs]
def write_to_path(self, path):
open(path, "w").write(self.contents)
DICTIFIABLE_ACTION_CLASSES = [
RemoteCopyAction,
RemoteTransferAction,
MessageAction,
RsyncTransferAction,
ScpTransferAction,
RemoteObjectStoreCopyAction
]
[docs]
def from_dict(action_dict):
action_type = action_dict.get("action_type", None)
target_class = None
for action_class in DICTIFIABLE_ACTION_CLASSES:
if action_type == action_class.action_type:
target_class = action_class
if not target_class:
message = "Failed to recover action from dictionary - invalid action type specified %s." % action_type
raise Exception(message)
if "source" in action_dict:
action_dict.pop("path") # remove redundant information stored for backward compatibility.
elif "path" in action_dict:
# legacy message received from older Pulsar client, pop the path from the dict
# and convert it to a source.
source = {"path": action_dict.pop("path")}
action_dict["source"] = source
return target_class.from_dict(action_dict)
class BasePathMapper:
match_type: str
def __init__(self, config):
action_type = config.get('action', DEFAULT_MAPPED_ACTION)
action_class = actions.get(action_type, None)
action_kwds = action_class.action_spec.copy()
for key, value in action_kwds.items():
if key in config:
action_kwds[key] = config[key]
elif value is REQUIRED_ACTION_KWD:
message_template = "action_type %s requires key word argument %s"
message = message_template % (action_type, key)
raise Exception(message)
else:
action_kwds[key] = value
self.action_type = action_type
self.action_kwds = action_kwds
path_types_str = config.get('path_types', "*defaults*")
path_types_str = path_types_str.replace("*defaults*", ",".join(ACTION_DEFAULT_PATH_TYPES))
path_types_str = path_types_str.replace("*any*", ",".join(ALL_PATH_TYPES))
self.path_types = path_types_str.split(",")
self.file_lister = FileLister(config)
def matches(self, path, path_type):
path_type_matches = path_type in self.path_types
rval = path_type_matches and self._path_matches(path)
return rval
def _extend_base_dict(self, **kwds):
base_dict = dict(
action=self.action_type,
path_types=",".join(self.path_types),
match_type=self.match_type
)
base_dict.update(self.file_lister.to_dict())
base_dict.update(self.action_kwds)
base_dict.update(**kwds)
return base_dict
def to_pattern(self):
raise NotImplementedError()
class PathTypeOnlyMapper(BasePathMapper):
match_type = 'path_type_only'
def __init__(self, config):
super().__init__(config)
def _path_matches(self, path):
return True
def to_dict(self):
return self._extend_base_dict()
class PrefixPathMapper(BasePathMapper):
match_type = 'prefix'
def __init__(self, config):
super().__init__(config)
self.prefix_path = abspath(config['path'])
def _path_matches(self, path):
return path is not None and path.startswith(self.prefix_path)
def to_pattern(self):
pattern_str = r"({}{}[^\s,\"\']+)".format(escape(self.prefix_path), escape(sep))
return compile(pattern_str)
def to_dict(self):
return self._extend_base_dict(path=self.prefix_path)
class GlobPathMapper(BasePathMapper):
match_type = 'glob'
def __init__(self, config):
super().__init__(config)
self.glob_path = config['path']
def _path_matches(self, path):
return path is not None and fnmatch.fnmatch(path, self.glob_path)
def to_pattern(self):
return compile(fnmatch.translate(self.glob_path))
def to_dict(self):
return self._extend_base_dict(path=self.glob_path)
class RegexPathMapper(BasePathMapper):
match_type = 'regex'
def __init__(self, config):
super().__init__(config)
self.pattern_raw = config['path']
self.pattern = compile(self.pattern_raw)
def _path_matches(self, path):
return path is not None and self.pattern.match(path) is not None
def to_pattern(self):
return self.pattern
def to_dict(self):
return self._extend_base_dict(path=self.pattern_raw)
MAPPER_CLASSES = [PathTypeOnlyMapper, PrefixPathMapper, GlobPathMapper, RegexPathMapper]
MAPPER_CLASS_DICT = dict(map(lambda c: (c.match_type, c), MAPPER_CLASSES))
def mappers_from_dicts(mapper_def_list):
return list(map(lambda m: _mappper_from_dict(m), mapper_def_list))
def _mappper_from_dict(mapper_dict):
if "path" in mapper_dict:
map_type = mapper_dict.get('match_type', DEFAULT_PATH_MAPPER_TYPE)
else:
map_type = 'path_type_only'
return MAPPER_CLASS_DICT[map_type](mapper_dict)
class FileLister:
def __init__(self, config):
self.depth = int(config.get("depth", "0"))
def to_dict(self):
return dict(
depth=self.depth
)
def unstructured_map(self, path):
depth = self.depth
if self.depth == 0:
return {path: basename(path)}
else:
while depth > 0:
path = dirname(path)
depth -= 1
return {join(path, f): f for f in directory_files(path)}
DEFAULT_FILE_LISTER = FileLister(dict(depth=0))
ACTION_CLASSES: List[Type[BaseAction]] = [
NoneAction,
RewriteAction,
TransferAction,
CopyAction,
RemoteCopyAction,
RemoteTransferAction,
RemoteObjectStoreCopyAction,
RsyncTransferAction,
ScpTransferAction,
]
actions = {clazz.action_type: clazz for clazz in ACTION_CLASSES}
__all__ = (
'FileActionMapper',
'path_type',
'from_dict',
'MessageAction',
'RemoteTransferAction', # For testing
)