Source code for pulsar.tools.authorization

from os.path import join


class AllowAnyAuthorization:

    def authorize_setup(self):
        pass

    def authorize_tool_file(self, name, contents):
        pass

    def authorize_execution(self, job_directory, command_line):
        pass

    def authorize_config_file(self, job_directory, name, path):
        pass


class AllowAnyAuthorizer:
    """
    Allow any, by default Pulsar is assumed to be secured
    using a firewall or private_token.
    """
    ALLOW_ANY_AUTHORIZATION = AllowAnyAuthorization()

    def get_authorization(self, tool_id):
        return self.ALLOW_ANY_AUTHORIZATION


class ToolBasedAuthorization(AllowAnyAuthorization):

    def __init__(self, tool):
        self.tool = tool

    def __unauthorized(self, msg):
        raise Exception("Unauthorized action attempted: %s" % msg)

    def authorize_setup(self):
        if self.tool is None:
            self.__unauthorized("Attempt to setup a tool with id not registered with Pulsar toolbox.")

    def authorize_tool_file(self, name, contents):
        tool = self.tool
        tool_dir = tool.get_tool_dir()
        tool_dir_file = join(tool_dir, name)
        allowed_contents = open(tool_dir_file).read()
        if contents != allowed_contents:
            self.__unauthorized("Attempt to write tool file with contents differing from Pulsar copy of tool file.")

    def authorize_config_file(self, job_directory, name, path):
        if not self.__inputs_validator.validate_configfile(job_directory, name, path):
            self.__unauthorized("Attempt to utilize unauthorized configfile.")

    def authorize_execution(self, job_directory, command_line):
        if not self.__inputs_validator.validate_command(job_directory, command_line):
            self.__unauthorized("Attempt to execute unauthorized command.")

    @property
    def __inputs_validator(self):
        return self.tool.inputs_validator


class ToolBasedAuthorizer:
    """
    Work In Progress: Implement tool based white-listing
    of what jobs can run and what those jobs can do.
    """

    def __init__(self, toolbox):
        self.toolbox = toolbox

    def get_authorization(self, tool_id):
        tool = None
        try:
            tool = self.toolbox.get_tool(tool_id)
        except Exception:
            pass
        return ToolBasedAuthorization(tool)


[docs] def get_authorizer(toolbox): if toolbox: # Use toolbox as a white list. authorizer = ToolBasedAuthorizer(toolbox) else: # No toolbox specified, allow any tools to run. authorizer = AllowAnyAuthorizer() return authorizer
__all__ = ('get_authorizer',)