Source code for pulsar.messaging.relay_state

"""State management for pulsar-relay message consumers.

Similar to QueueState for AMQP, this manages the lifecycle of relay
consumer threads on the Pulsar server side.
"""
import logging
import threading
from typing import (
    List,
    Optional,
)

log = logging.getLogger(__name__)


[docs] class RelayState: """Manages state for pulsar-relay message consumers. This object is passed to consumer loops and used to signal when they should stop processing messages. """ def __init__(self) -> None: """Initialize relay state.""" self.active: bool = True self.threads: List[threading.Thread] = [] self.outboxes: list = []
[docs] def deactivate(self) -> None: """Mark the relay state as inactive, signaling consumers to stop.""" self.active = False for outbox in self.outboxes: try: outbox.stop(timeout=2.0) except OSError: log.exception("Failed to stop relay status update outbox")
[docs] def join(self, timeout: Optional[float] = None) -> None: """Join all consumer threads.""" for t in self.threads: t.join(timeout) if t.is_alive(): log.warning("Failed to join relay consumer thread [%s].", t)