Source code for pulsar.client.relay_auth

"""
JWT authentication manager for pulsar-relay.

Handles token acquisition, caching, and automatic refresh.
"""
import logging
import threading
from typing import cast, Optional
from datetime import datetime, timedelta

import requests

log = logging.getLogger(__name__)


[docs] class RelayAuthManager: """Manages JWT authentication tokens for pulsar-relay communication. Features: - Thread-safe token caching - Automatic token refresh before expiry - Lazy authentication (only authenticates when needed) """ def __init__(self, relay_url: str, username: str, password: str): """Initialize the authentication manager. Args: relay_url: Base URL of the pulsar-relay server username: Username for authentication password: Password for authentication """ self.relay_url = relay_url.rstrip('/') self.username = username self.password = password self._token: Optional[str] = None self._token_expiry: Optional[datetime] = None self._lock = threading.Lock() # Refresh token 5 minutes before expiry self._refresh_buffer_seconds = 300
[docs] def get_token(self) -> str: """Get a valid JWT token, refreshing if necessary. Returns: Valid JWT access token Raises: Exception: If authentication fails """ with self._lock: if self._is_token_valid(): return cast(str, self._token) # Need to authenticate or refresh log.debug("Authenticating with pulsar-relay at %s", self.relay_url) self._authenticate() return cast(str, self._token)
def _is_token_valid(self) -> bool: """Check if current token is valid and not expiring soon. Returns: True if token exists and won't expire soon, False otherwise """ if self._token is None or self._token_expiry is None: return False # Check if token will expire within refresh buffer time_until_expiry = (self._token_expiry - datetime.now()).total_seconds() return time_until_expiry > self._refresh_buffer_seconds def _authenticate(self) -> None: """Perform authentication and cache the token. Raises: Exception: If authentication fails """ auth_url = f"{self.relay_url}/auth/login" try: response = requests.post( auth_url, data={ 'username': self.username, 'password': self.password, 'grant_type': 'password' }, headers={'Content-Type': 'application/x-www-form-urlencoded'}, timeout=10 ) response.raise_for_status() data = response.json() self._token = data['access_token'] expires_in = data['expires_in'] # Calculate expiry time self._token_expiry = datetime.now() + timedelta(seconds=expires_in) log.info("Successfully authenticated with pulsar-relay, token expires in %d seconds", expires_in) except requests.RequestException as e: log.error("Failed to authenticate with pulsar-relay: %s", e) raise Exception(f"pulsar-relay authentication failed: {e}")
[docs] def invalidate(self) -> None: """Invalidate the current token, forcing re-authentication on next request.""" with self._lock: self._token = None self._token_expiry = None log.debug("Invalidated pulsar-relay authentication token")