Source code for pulsar.messaging
""" This module contains the server-side only code for interfacing with
message queues. Code shared between client and server can be found in
submodules of ``pulsar.client``.
"""
import logging
from ..messaging import bind_amqp
log = logging.getLogger(__name__)
[docs]
def bind_app(app, queue_id, conf=None):
connection_string = __id_to_connection_string(app, queue_id)
queue_state = QueueState()
for manager in app.managers.values():
bind_amqp.bind_manager_to_queue(manager, queue_state, connection_string, conf)
return queue_state
[docs]
class QueueState:
""" Passed through to event loops, should be "non-zero" while queues should
be active.
"""
def __init__(self):
self.active = True
self.threads = []
[docs]
def deactivate(self):
self.active = False
def __nonzero__(self):
return self.active
__bool__ = __nonzero__ # Both needed Py2 v 3
[docs]
def join(self, timeout=None):
for t in self.threads:
t.join(timeout)
if t.is_alive():
log.warn("Failed to join thread [%s]." % t)
def __id_to_connection_string(app, queue_id):
return queue_id