Source code for pulsar.user_auth.methods.oidc
import requests
import base64
import json
import jwt
import re
from cryptography.hazmat.backends import default_backend
from cryptography.x509 import load_der_x509_certificate
from pulsar.user_auth.methods.interface import AuthMethod
import logging
log = logging.getLogger(__name__)
[docs]def get_token(job_directory, provider):
log.debug("Getting OIDC token for provider " + provider + " from Galaxy")
endpoint = job_directory.load_metadata("launch_config")["token_endpoint"]
endpoint = endpoint + "&provider=" + provider
r = requests.get(url=endpoint)
return r.text
[docs]class OIDCAuth(AuthMethod):
"""
Authorization based on OIDC tokens
"""
auth_type = "oidc"
def __init__(self, config):
try:
self._provider = config["oidc_provider"]
self._jwks_url = config["oidc_jwks_url"]
self._username_in_token = config["oidc_username_in_token"]
self._username_template = config["oidc_username_template"]
except Exception as e:
raise Exception("cannot read OIDCAuth configuration") from e
def _verify_token(self, token):
try:
# Obtain appropriate cert from JWK URI
key_set = requests.get(self._jwks_url, timeout=5)
encoded_header, rest = token.split('.', 1)
headerobj = json.loads(base64.b64decode(encoded_header + '==').decode('utf8'))
key_id = headerobj['kid']
for key in key_set.json()['keys']:
if key['kid'] == key_id:
x5c = key['x5c'][0]
break
else:
raise jwt.DecodeError('Cannot find kid ' + key_id)
cert = load_der_x509_certificate(base64.b64decode(x5c), default_backend())
# Decode token (exp date is checked automatically)
decoded_token = jwt.decode(
token,
key=cert.public_key(),
algorithms=['RS256'],
options={'exp': True, 'verify_aud': False}
)
return decoded_token
except Exception as error:
raise Exception("Error verifying jwt token") from error
[docs] def authorize(self, authentication_info):
raise NotImplementedError("authorization not implemented for this class")
[docs] def authenticate(self, job_directory):
token = get_token(job_directory, self._provider)
decoded_token = self._verify_token(token)
user = decoded_token[self._username_in_token]
user = re.match(self._username_template, user).group(0)
return {"username": user}