Source code for pulsar.messaging.relay_state
"""State management for pulsar-relay message consumers.
Similar to QueueState for AMQP, this manages the lifecycle of relay
consumer threads on the Pulsar server side.
"""
[docs]
class RelayState:
"""Manages state for pulsar-relay message consumers.
This object is passed to consumer loops and used to signal when
they should stop processing messages.
"""
def __init__(self):
"""Initialize relay state."""
self.active = True
self.threads = []
[docs]
def deactivate(self):
"""Mark the relay state as inactive, signaling consumers to stop."""
self.active = False
[docs]
def join(self, timeout=None):
"""Join all consumer threads.
Args:
timeout: Optional timeout in seconds for joining threads
"""
import logging
log = logging.getLogger(__name__)
for t in self.threads:
t.join(timeout)
if t.is_alive():
log.warning("Failed to join relay consumer thread [%s].", t)