Source code for pulsar.client.manager

"""Entry point for client creation.

``build_client_manager`` in particular is the abstraction that should be used
to create a ``ClientManager``, that in return can create Pulsar clients for
specific actions.
"""

import functools
import os
import threading
from logging import getLogger
from os import getenv
from queue import Queue
from typing import (
    Any,
    Dict,
    Optional,
    Type,
    TYPE_CHECKING,
)

from typing_extensions import Protocol

from .amqp_exchange_factory import get_exchange
from .client import (
    BaseJobClient,
    InputCachingJobClient,
    JobClient,
    GcpMessageCoexecutionJobClient,
    GcpPollingCoexecutionJobClient,
    K8sMessageCoexecutionJobClient,
    K8sPollingCoexecutionJobClient,
    MessageCLIJobClient,
    MessageJobClient,
    RelayJobClient,
    TesMessageCoexecutionJobClient,
    TesPollingCoexecutionJobClient,
)
from .destination import url_to_destination_params
from .object_client import ObjectStoreClient
from .server_interface import (
    HttpPulsarInterface,
    LocalPulsarInterface,
    PulsarInterface,
)
from .transport import get_transport
from .transport.relay import RelayTransport
from .util import TransferEventManager

if TYPE_CHECKING:
    from pulsar.managers import ManagerInterface

log = getLogger(__name__)

DEFAULT_TRANSFER_THREADS = 2


def _per_handler_cursor_path(
    base_path: Optional[str],
    handler_id: Optional[str] = None,
) -> Optional[str]:
    """Insert a stable per-handler suffix before the file extension.

    Galaxy job handlers run as separate processes, each polling the relay
    independently with its own cursor. Sharing one file would let
    concurrent ``os.replace`` writes drop another handler's progress, but
    the suffix must also be **stable across restarts** — otherwise the
    cursor is read once on first start and then orphaned, defeating the
    point of persisting it.

    Resolution order for the suffix:

    1. ``handler_id`` argument (typically ``app.config.server_name`` from
       the caller — stable across restarts).
    2. ``GALAXY_SERVER_NAME`` env var (Galaxy's standard handler tag).
    3. ``os.getpid()`` as a last resort, with a warning: PID is unique
       per process but changes on every restart, so the persisted cursor
       will not be loaded by the next run.
    """
    if not base_path:
        return base_path
    suffix = handler_id or os.environ.get("GALAXY_SERVER_NAME")
    if not suffix:
        suffix = f"pid{os.getpid()}"
        log.warning(
            "relay_cursor_path is set but no stable handler id was supplied "
            "(neither relay_handler_id kwarg nor GALAXY_SERVER_NAME env). "
            "Falling back to %s; this cursor will not be picked up after a "
            "process restart, so status updates published while the handler "
            "was down may be skipped.",
            suffix,
        )
    root, ext = os.path.splitext(base_path)
    return f"{root}-{suffix}{ext}"


class ClientManagerInterface(Protocol):

    def get_client(self, destination_params: Dict[str, Any], job_id: str, **kwargs: Dict[str, Any]) -> BaseJobClient:
        """Get client instance for specified job description."""

    def shutdown(self, ensure_cleanup=False) -> None:
        """Mark client manager's work as complete and clean up resources it managed."""
        return


[docs] class ClientManager(ClientManagerInterface): """Factory class to create Pulsar clients. This class was introduced for classes of clients that need to potential share state between multiple client connections. """ job_manager_interface_class: Type[PulsarInterface] client_class: Type[BaseJobClient] def __init__(self, job_manager: Optional["ManagerInterface"] = None, **kwds: Dict[str, Any]): """Build a HTTP client or a local client that talks directly to a job manger.""" if 'pulsar_app' in kwds or job_manager: self.job_manager_interface_class = LocalPulsarInterface pulsar_app = kwds.get('pulsar_app', None) file_cache = kwds.get('file_cache', None) self.job_manager_interface_args = dict( job_manager=job_manager, pulsar_app=pulsar_app, file_cache=file_cache, ) else: print(kwds) self.job_manager_interface_class = HttpPulsarInterface transport_type = kwds.get('transport', None) transport_params = {p.replace('transport_', '', 1): v for p, v in kwds.items() if p.startswith('transport_')} transport = get_transport(transport_type, transport_params=transport_params) self.job_manager_interface_args = dict(transport=transport) cache = kwds.get('cache', None) if cache is None: cache = _environ_default_int('PULSAR_CACHE_TRANSFERS') if cache: log.info("Setting Pulsar client class to caching variant.") self.client_cacher = ClientCacher(**kwds) self.client_class = InputCachingJobClient self.extra_client_kwds = {"client_cacher": self.client_cacher} else: log.info("Setting Pulsar client class to standard, non-caching variant.") self.client_class = JobClient self.extra_client_kwds = {}
[docs] def get_client(self, destination_params, job_id, **kwargs): """Build a client given specific destination parameters and job_id.""" destination_params = _parse_destination_params(destination_params) destination_params.update(**kwargs) job_manager_interface_class = self.job_manager_interface_class job_manager_interface_args = dict(destination_params=destination_params, **self.job_manager_interface_args) job_manager_interface = job_manager_interface_class(**job_manager_interface_args) return self.client_class(destination_params, job_id, job_manager_interface, **self.extra_client_kwds)
try: from galaxy.jobs.runners.util.cli import factory as cli_factory except ImportError: from pulsar.managers.util.cli import factory as cli_factory class BaseRemoteConfiguredJobClientManager(ClientManagerInterface): def __init__(self, **kwds: Dict[str, Any]): self.manager_name = kwds.get("manager", None) or "_default_" class MessageQueueClientManager(BaseRemoteConfiguredJobClientManager): status_cache: Dict[str, Any] ack_consumer_threads: Dict[str, threading.Thread] def __init__(self, amqp_url: str, **kwds: Dict[str, Any]): super().__init__(**kwds) self.url = amqp_url self.amqp_key_prefix = kwds.get("amqp_key_prefix", None) self.exchange = get_exchange(self.url, self.manager_name, kwds) self.status_cache = {} self.callback_lock = threading.Lock() self.callback_thread = None self.ack_consumer_threads = {} self.active = True def callback_wrapper(self, callback, body, message): if message.acknowledged: log.info("Message is already acknowledged (by an upstream " "callback?), Pulsar client will not handle this message") return if not self.active: log.debug("Obtained update message for inactive client manager, attempting requeue.") try: message.requeue() log.debug("Requeue succeeded, will likely be handled next time consumer is enabled.") except Exception: log.debug("Requeue failed, message may be lost?") return try: if "job_id" in body: job_id = body["job_id"] self.status_cache[job_id] = body log.debug("Handling asynchronous status update from remote Pulsar.") callback(body) except Exception: log.exception("Failure processing job status update message.") except BaseException as e: log.exception("Failure processing job status update message - BaseException type %s" % type(e)) finally: message.ack() def callback_consumer(self, callback_wrapper): try: self.exchange.consume("status_update", callback_wrapper, check=self) except Exception: log.exception("Exception while handling status update messages, " "this shouldn't really happen. Handler should be " "restarted.") finally: log.debug("Leaving Pulsar client status update thread, no " "additional Pulsar updates will be processed.") def ensure_has_status_update_callback(self, callback): with self.callback_lock: if self.callback_thread is not None: return callback_wrapper = functools.partial(self.callback_wrapper, callback) run = functools.partial(self.callback_consumer, callback_wrapper) thread = threading.Thread( name="pulsar_client_%s_status_update_callback" % self.manager_name, target=run ) thread.daemon = False # Lets not interrupt processing of this. thread.start() self.callback_thread = thread def ack_consumer(self, queue_name: str): try: self.exchange.consume(queue_name + '_ack', None, check=self) except Exception: log.exception("Exception while handling %s acknowledgement " "messages, this shouldn't really happen. Handler " "should be restarted.", queue_name) finally: log.debug("Leaving Pulsar client %s acknowledgement thread, no " "additional acknowledgements will be processed.", queue_name) def ensure_has_ack_consumers(self): with self.callback_lock: for name in ('setup', 'kill'): if name in self.ack_consumer_threads: return run = functools.partial(self.ack_consumer, name) thread = threading.Thread( name="pulsar_client_{}_{}_ack".format(self.manager_name, name), target=run ) thread.daemon = False # Lets not interrupt processing of this. thread.start() self.ack_consumer_threads[name] = thread def shutdown(self, ensure_cleanup: bool = False): self.active = False if ensure_cleanup: if self.callback_thread is not None: self.callback_thread.join() for v in self.ack_consumer_threads.values(): v.join() def __nonzero__(self): return self.active __bool__ = __nonzero__ # Both needed Py2 v 3 def get_client(self, destination_params, job_id, **kwargs): if job_id is None: raise Exception("Cannot generate Pulsar client for empty job_id.") destination_params = _parse_destination_params(destination_params) destination_params.update(**kwargs) if self.amqp_key_prefix: destination_params["amqp_key_prefix"] = self.amqp_key_prefix if 'shell_plugin' in destination_params: shell = cli_factory.get_shell(destination_params) return MessageCLIJobClient(destination_params, job_id, self, shell) elif destination_params.get('k8s_enabled', False): return K8sMessageCoexecutionJobClient(destination_params, job_id, self) elif destination_params.get("tes_url", False): return TesMessageCoexecutionJobClient(destination_params, job_id, self) elif destination_params.get("project_id", False): return GcpMessageCoexecutionJobClient(destination_params, job_id, self) else: return MessageJobClient(destination_params, job_id, self) class RelayClientManager(BaseRemoteConfiguredJobClientManager): """Client manager that communicates with Pulsar via pulsar-relay. This manager uses HTTP-based long-polling to receive status updates from Pulsar through the relay, while posting control messages (setup, status requests, kill) to the relay for Pulsar to consume. """ status_cache: Dict[str, Any] def __init__( self, relay_url: str, relay_username: str, relay_password: str, relay_topic_prefix: str = '', relay_cursor_path: Optional[str] = None, relay_handler_id: Optional[str] = None, **kwds: Dict[str, Any], ): super().__init__(**kwds) if not relay_url: raise Exception("relay_url is required for RelayClientManager") # Initialize relay transport. ``relay_cursor_path`` persists the long-poll # cursor across Galaxy restarts so we don't silently skip messages # published by Pulsar while Galaxy was down. Galaxy job handlers run as # separate processes — each one polls the relay independently and so # tracks its own cursor — so we expand the operator-supplied path with # a stable per-handler suffix (``relay_handler_id`` or # ``GALAXY_SERVER_NAME``) to give every handler its own file. A shared # cursor would suffer last-writer-wins corruption when handlers persist # concurrently and could silently rewind another handler's progress. self.relay_transport = RelayTransport( relay_url, relay_username, relay_password, cursor_path=_per_handler_cursor_path(relay_cursor_path, relay_handler_id), ) self.relay_topic_prefix = relay_topic_prefix self.status_cache = {} self.callback_lock = threading.Lock() self.callback_thread = None self.active = True self.shutdown_event = threading.Event() def callback_wrapper(self, callback, message_data): """Process status update messages from the relay.""" if not self.active: log.debug("Obtained update message for inactive client manager, ignoring.") return try: payload = message_data.get('payload', {}) if "job_id" in payload: job_id = payload["job_id"] self.status_cache[job_id] = payload log.debug("Handling asynchronous status update from Pulsar via relay.") callback(payload) except Exception: log.exception("Failure processing job status update message.") except BaseException as e: log.exception("Failure processing job status update message - BaseException type %s" % type(e)) def status_consumer(self, callback_wrapper): """Long-poll the relay for status update messages.""" manager_name = self.manager_name topic = self._make_topic_name("job_status_update", manager_name) log.info("Starting relay status consumer for topic '%s'", topic) while self.active: try: # Long poll for status updates (30 second timeout) messages = self.relay_transport.long_poll([topic], timeout=30) for message in messages: callback_wrapper(message) except Exception: if self.active: log.exception("Exception while polling for status updates from relay, will retry.") # Brief sleep before retrying to avoid tight loop on persistent errors # Use wait() instead of sleep() to allow immediate interruption on shutdown if self.shutdown_event.wait(timeout=5): break else: log.debug("Exception during shutdown, ignoring.") break log.info("Done consuming relay status updates for topic %s", topic) def ensure_has_status_update_callback(self, callback): """Start a thread to poll for status updates if not already running.""" with self.callback_lock: if self.callback_thread is not None: return callback_wrapper = functools.partial(self.callback_wrapper, callback) run = functools.partial(self.status_consumer, callback_wrapper) thread = threading.Thread( name="pulsar_client_%s_relay_status_consumer" % self.manager_name, target=run ) # Make daemon so Python can exit even if thread is blocked in HTTP request. # Unlike MessageQueueClientManager which uses AMQP connections that can be # interrupted cleanly, HTTP long-poll requests block until timeout. thread.daemon = True thread.start() self.callback_thread = thread def ensure_has_ack_consumers(self): """No-op for relay client manager, as acknowledgements are handled via HTTP.""" pass def _make_topic_name(self, base_topic: str, manager_name: str) -> str: """Create a topic name with optional prefix and manager suffix. Args: base_topic: Base topic name (e.g., 'job_setup', 'job_status_update') manager_name: Manager name (e.g., '_default_', 'cluster_a') Returns: Fully qualified topic name """ parts = [] # Add prefix if provided if self.relay_topic_prefix: parts.append(self.relay_topic_prefix) # Add base topic parts.append(base_topic) # Add manager name if not default if manager_name != "_default_": parts.append(manager_name) return "_".join(parts) def shutdown(self, ensure_cleanup: bool = False): """Shutdown the client manager and cleanup resources.""" self.active = False # Signal the shutdown event to interrupt any waiting threads self.shutdown_event.set() if ensure_cleanup: if self.callback_thread is not None: self.callback_thread.join() # Close relay transport if hasattr(self, 'relay_transport'): self.relay_transport.close() def __nonzero__(self): return self.active __bool__ = __nonzero__ # Both needed Py2 v 3 def get_client(self, destination_params, job_id, **kwargs): """Create a RelayJobClient for the given job.""" if job_id is None: raise Exception("Cannot generate Pulsar client for empty job_id.") destination_params = _parse_destination_params(destination_params) destination_params.update(**kwargs) return RelayJobClient(destination_params, job_id, self) class PollingJobClientManager(BaseRemoteConfiguredJobClientManager): def get_client(self, destination_params, job_id, **kwargs): if job_id is None: raise Exception("Cannot generate Pulsar client for empty job_id.") destination_params = _parse_destination_params(destination_params) destination_params.update(**kwargs) # TODO: cli version of this... if destination_params.get('k8s_enabled', False): return K8sPollingCoexecutionJobClient(destination_params, job_id, self) elif destination_params.get("tes_url", False): return TesPollingCoexecutionJobClient(destination_params, job_id, self) elif destination_params.get("project_id", False): return GcpPollingCoexecutionJobClient(destination_params, job_id, self) else: raise Exception("Unknown client configuration") def shutdown(self, ensure_cleanup=False): pass
[docs] def build_client_manager( job_manager: Optional["ManagerInterface"] = None, relay_url: Optional[str] = None, relay_username: Optional[str] = None, relay_password: Optional[str] = None, relay_topic_prefix: Optional[str] = None, amqp_url: Optional[str] = None, k8s_enabled: Optional[bool] = None, tes_enabled: Optional[bool] = None, gcp_batch_enabled: Optional[bool] = None, **kwargs ) -> ClientManagerInterface: if job_manager: return ClientManager(job_manager=job_manager, **kwargs) # TODO: Consider more separation here. elif relay_url: assert relay_password and relay_username, "relay_url set, but relay_username and relay_password must also be set" return RelayClientManager( relay_url=relay_url, relay_username=relay_username, relay_password=relay_password, relay_topic_prefix=relay_topic_prefix or '', **kwargs ) elif amqp_url: return MessageQueueClientManager(amqp_url=amqp_url, **kwargs) elif k8s_enabled or tes_enabled or gcp_batch_enabled: return PollingJobClientManager(**kwargs) else: return ClientManager(**kwargs)
[docs] class ObjectStoreClientManager: def __init__(self, **kwds): if 'object_store' in kwds: self.interface_class = LocalPulsarInterface self.interface_args = dict(object_store=kwds['object_store']) else: self.interface_class = HttpPulsarInterface transport_type = kwds.get('transport', None) transport = get_transport(transport_type) self.interface_args = dict(transport=transport) self.extra_client_kwds = {}
[docs] def get_client(self, client_params): interface_class = self.interface_class interface_args = dict(destination_params=client_params, **self.interface_args) interface = interface_class(**interface_args) return ObjectStoreClient(interface)
class ClientCacher: def __init__(self, **kwds): self.event_manager = TransferEventManager() default_transfer_threads = _environ_default_int('PULSAR_CACHE_THREADS', DEFAULT_TRANSFER_THREADS) num_transfer_threads = int(kwds.get('transfer_threads', default_transfer_threads)) self.__init_transfer_threads(num_transfer_threads) def queue_transfer(self, client, path): self.transfer_queue.put((client, path)) def acquire_event(self, input_path): return self.event_manager.acquire_event(input_path) def _transfer_worker(self): while True: transfer_info = self.transfer_queue.get() try: self.__perform_transfer(transfer_info) except BaseException as e: log.warn("Transfer failed.") log.exception(e) pass self.transfer_queue.task_done() def __perform_transfer(self, transfer_info): (client, path) = transfer_info event_holder = self.event_manager.acquire_event(path, force_clear=True) failed = True try: client.cache_insert(path) failed = False finally: event_holder.failed = failed event_holder.release() def __init_transfer_threads(self, num_transfer_threads): self.num_transfer_threads = num_transfer_threads self.transfer_queue = Queue() for _ in range(num_transfer_threads): t = threading.Thread(target=self._transfer_worker) t.daemon = True t.start() def _parse_destination_params(destination_params): try: unicode_type = unicode except NameError: unicode_type = str if isinstance(destination_params, str) or isinstance(destination_params, unicode_type): destination_params = url_to_destination_params(destination_params) return destination_params def _environ_default_int(variable, default="0"): val = getenv(variable, default) int_val = int(default) if str(val).isdigit(): int_val = int(val) return int_val __all__ = ( 'ClientManager', 'ObjectStoreClientManager', 'HttpPulsarInterface', )