Source code for pulsar.user_auth.manager

from abc import ABC

import inspect


[docs]class UserAuthManager(ABC): """ Authorization/Authentication manager. """ def __init__(self, config): self._authorization_methods = [] self._authentication_methods = [] try: user_auth = config.get("user_auth", None) if not user_auth: return authentications = user_auth.pop("authentication", []) authorizations = user_auth.pop("authorization", []) for authorization in authorizations: authorization.update(user_auth) obj = get_object("pulsar.user_auth.methods." + authorization["type"], "auth_type", authorization["type"]) self._authorization_methods.append(obj(authorization)) for authentication in authentications: authentication.update(user_auth) obj = get_object("pulsar.user_auth.methods." + authentication["type"], "auth_type", authentication["type"]) self._authentication_methods.append(obj(authentication)) except Exception as e: raise Exception("cannot read auth configuration") from e
[docs] def authorize(self, job_id, job_directory): authentication_info = self.__authenticate(job_id, job_directory) if len(self._authorization_methods) == 0: return True for method in self._authorization_methods: res = method.authorize(authentication_info) if res: return True raise Exception("Could not authorize job execution on remote resource")
def __authenticate(self, job_id, job_directory): if len(self._authentication_methods) == 0: return {} for method in self._authentication_methods: res = method.authenticate(job_directory) if res: return res raise Exception("Could not authenticate job %s" % job_id)
[docs]def get_object(module_name, attribute_name, attribute_value): module = __import__(module_name) for comp in module_name.split(".")[1:]: module = getattr(module, comp) for _, obj in inspect.getmembers(module): if inspect.isclass(obj) and hasattr(obj, attribute_name) and getattr(obj, attribute_name) == attribute_value: return obj raise Exception("Cannot find object %s with attribute %s=%s " % (module_name, attribute_name, attribute_value))