Source code for pulsar.messaging.bind_relay

"""Pulsar server-side integration with pulsar-relay.

This module provides functionality to bind Pulsar job managers to the
pulsar-relay, allowing them to receive control messages (setup, status
requests, kill) and publish status updates.
"""
import functools
import logging
import os
import threading
import time
from typing import Optional

import requests

from pulsar import manager_endpoint_util
from pulsar.client.transport.relay import (
    RelayTransport,
    RelayTransportError,
)
from .outbox import build_status_outbox
from .relay_state import RelayState

log = logging.getLogger(__name__)


def _server_cursor_path(manager) -> Optional[str]:
    persistence_directory = manager.persistence_directory
    if not persistence_directory:
        return None
    return os.path.join(persistence_directory, "%s-relay-cursor.json" % manager.name)


[docs] def bind_manager_to_relay(manager, relay_state: RelayState, relay_url, conf): """Bind a specific manager to the relay. Args: manager: Pulsar job manager instance relay_state: RelayState for managing consumer threads relay_url: URL of the pulsar-relay server conf: Configuration dictionary with relay credentials """ manager_name = manager.name log.info("bind_manager_to_relay called for relay [%s] and manager [%s]", relay_url, manager_name) # Extract relay credentials username = conf.get('message_queue_username', 'admin') password = conf.get('message_queue_password') if not password: raise Exception("message_queue_password is required for relay communication") # Extract optional relay topic prefix relay_topic_prefix = conf.get('relay_topic_prefix', '') # Create relay transport with a per-manager persistent cursor so a Pulsar # restart resumes the long-poll exactly where it left off, rather than # silently skipping any setup/kill messages published while it was down. relay_transport = RelayTransport( relay_url, username, password, cursor_path=_server_cursor_path(manager), ) # Define message handlers process_setup_messages = functools.partial(__process_setup_message, manager) process_kill_messages = functools.partial(__process_kill_message, manager) process_status_messages = functools.partial(__process_status_message, manager) # Determine topics based on manager name and optional prefix setup_topic = __make_topic_name(relay_topic_prefix, "job_setup", manager_name) status_request_topic = __make_topic_name(relay_topic_prefix, "job_status_request", manager_name) kill_topic = __make_topic_name(relay_topic_prefix, "job_kill", manager_name) status_update_topic = __make_topic_name(relay_topic_prefix, "job_status_update", manager_name) # Start consumer threads if message_queue_consume is enabled if conf.get("message_queue_consume", True): log.info("Starting relay consumer threads for manager '%s'", manager_name) # Single consumer thread for all control messages consumer_thread = start_consumer( relay_transport, relay_state, [setup_topic, status_request_topic, kill_topic], { setup_topic: process_setup_messages, status_request_topic: process_status_messages, kill_topic: process_kill_messages, } ) relay_state.threads.append(consumer_thread) # Bind status change callback to publish status updates to relay if conf.get("message_queue_publish", True): log.info("Binding status change callback for manager '%s'", manager_name) outbox = build_status_outbox( manager, conf, publish_fn=lambda payload: relay_transport.post_message(status_update_topic, payload), suffix="relay-status-outbox", ) if outbox is not None: relay_state.outboxes.append(outbox) def bind_on_status_change(new_status, job_id): job_id = job_id or 'unknown' log.debug( "Publishing Pulsar state change with status %s for job_id %s via relay", new_status, job_id, ) payload = manager_endpoint_util.full_status(manager, new_status, job_id) if outbox is not None: outbox.enqueue(payload) return try: relay_transport.post_message(status_update_topic, payload) except (RelayTransportError, requests.RequestException): log.exception( "Failure to publish Pulsar state change for job_id %s via " "relay (no outbox configured; status update may be lost).", job_id, ) manager.set_state_change_callback(bind_on_status_change)
[docs] def start_consumer(relay_transport, relay_state: RelayState, topics, handlers): """Start a consumer thread that polls for messages. Args: relay_transport: RelayTransport instance relay_state: RelayState for checking if consumer should continue topics: List of topics to subscribe to handlers: Dict mapping topics to handler functions Returns: Thread object """ def consume(): log.info("Starting relay consumer for topics: %s", topics) while relay_state.active: try: # Long poll for messages (30 second timeout) messages = relay_transport.long_poll(topics, timeout=30) for message in messages: topic = message.get('topic') payload = message.get('payload', {}) handler = handlers.get(topic) if handler: try: handler(payload) except Exception: job_id = payload.get('job_id', 'unknown') log.exception("Failed to process message for job_id %s from topic %s", job_id, topic) else: log.warning("No handler found for topic '%s'", topic) except Exception: if relay_state.active: log.exception("Exception while polling relay, will retry after delay.") # Brief sleep before retrying time.sleep(5) else: log.debug("Exception during shutdown, stopping consumer.") break log.info("Finished consuming relay messages - no more messages will be processed.") thread = threading.Thread( name="relay-consumer-%s" % "-".join(topics), target=consume ) thread.daemon = True thread.start() return thread
def __process_setup_message(manager, body): """Process a job setup message. Args: manager: Job manager instance body: Message payload containing job setup parameters """ job_id = __client_job_id_from_body(body) if not job_id: log.error('Could not parse job id from body: %s', body) return try: log.info("Processing setup message for job_id %s", job_id) manager_endpoint_util.submit_job(manager, body) except Exception: log.exception("Failed to process setup message for job_id %s", job_id) def __process_status_message(manager, body): """Process a status request message. Args: manager: Job manager instance body: Message payload containing job_id """ job_id = __client_job_id_from_body(body) if not job_id: log.error('Could not parse job id from body: %s', body) return try: log.debug("Processing status request for job_id %s", job_id) manager.trigger_state_change_callback(job_id) except Exception: log.exception("Failed to process status message for job_id %s", job_id) def __process_kill_message(manager, body): """Process a job kill message. Args: manager: Job manager instance body: Message payload containing job_id """ job_id = __client_job_id_from_body(body) if not job_id: log.error('Could not parse job id from body: %s', body) return try: log.info("Processing kill request for job_id %s", job_id) manager.kill(job_id) except Exception: log.exception("Failed to process kill message for job_id %s", job_id) def __client_job_id_from_body(body): """Extract job_id from message body. Args: body: Message payload dictionary Returns: job_id string or None if not found """ job_id = body.get("job_id", None) return job_id def __make_topic_name(prefix, base_topic, manager_name): """Create a topic name with optional prefix and manager suffix. Args: prefix: Optional prefix string (e.g., 'galaxy1', 'prod') base_topic: Base topic name (e.g., 'job_setup', 'job_status_update') manager_name: Manager name (e.g., '_default_', 'cluster_a') Returns: Fully qualified topic name Examples: __make_topic_name('', 'job_setup', '_default_') -> 'job_setup' __make_topic_name('', 'job_setup', 'cluster_a') -> 'job_setup_cluster_a' __make_topic_name('prod', 'job_setup', '_default_') -> 'prod_job_setup' __make_topic_name('prod', 'job_setup', 'cluster_a') -> 'prod_job_setup_cluster_a' """ parts = [] # Add prefix if provided if prefix: parts.append(prefix) # Add base topic parts.append(base_topic) # Add manager name if not default if manager_name != "_default_": parts.append(manager_name) return "_".join(parts)