import copy
import logging
import socket
import threading
import uuid
from time import (
sleep,
time,
)
from typing import Optional
from packaging.version import parse as parse_version
try:
import kombu
import kombu.exceptions
from kombu import pools
except ImportError:
kombu = None
try:
import amqp
import amqp.exceptions
except ImportError:
amqp = None
log = logging.getLogger(__name__)
KOMBU_UNAVAILABLE = "Attempting to bind to AMQP message queue, but kombu dependency unavailable"
AMQP_UNAVAILABLE = "Attempting to bind to AMQP message queue, but pyampq dependency unavailable"
DEFAULT_EXCHANGE_NAME = "pulsar"
DEFAULT_EXCHANGE_TYPE = "direct"
# Set timeout to periodically give up looking and check if polling should end.
DEFAULT_TIMEOUT = 0.2
DEFAULT_HEARTBEAT = 580
DEFAULT_RECONNECT_CONSUMER_WAIT = 1
DEFAULT_HEARTBEAT_WAIT = 1
DEFAULT_HEARTBEAT_JOIN_TIMEOUT = 10
ACK_QUEUE_SUFFIX = "_ack"
ACK_UUID_KEY = 'acknowledge_uuid'
ACK_QUEUE_KEY = 'acknowledge_queue'
ACK_SUBMIT_QUEUE_KEY = 'acknowledge_submit_queue'
ACK_UUID_RESPONSE_KEY = 'acknowledge_uuid_response'
ACK_FORCE_NOACK_KEY = 'force_noack'
DEFAULT_ACK_MANAGER_SLEEP = 15
DEFAULT_REPUBLISH_TIME = 30
MINIMUM_KOMBU_VERSION_PUBLISH_TIMEOUT = parse_version("5.2.0")
[docs]class PulsarExchange:
""" Utility for publishing and consuming structured Pulsar queues using kombu.
This is shared between the server and client - an exchange should be setup
for each manager (or in the case of the client, each manager one wished to
communicate with.)
Each Pulsar manager is defined solely by name in the scheme, so only one Pulsar
should target each AMQP endpoint or care should be taken that unique
manager names are used across Pulsar servers targeting the same AMQP endpoint -
and in particular only one such Pulsar should define an default manager with
name _default_.
"""
def __init__(
self,
url,
manager_name,
amqp_key_prefix=None,
connect_ssl=None,
timeout=DEFAULT_TIMEOUT,
publish_kwds={},
publish_uuid_store=None,
consume_uuid_store=None,
republish_time=DEFAULT_REPUBLISH_TIME,
):
"""
"""
if not kombu:
raise Exception(KOMBU_UNAVAILABLE)
if not amqp:
raise Exception(AMQP_UNAVAILABLE)
# conditional imports and type checking prevent us from doing this at the module level.
self.recoverable_exceptions = (
ConnectionResetError, # https://github.com/galaxyproject/pulsar/issues/328
socket.timeout,
amqp.exceptions.ConnectionForced, # e.g. connection closed on rabbitmq sigterm
amqp.exceptions.RecoverableConnectionError, # connection closed
amqp.exceptions.RecoverableChannelError, # publish time out
kombu.exceptions.OperationalError, # ConnectionRefusedError, e.g. when getting a new connection while rabbitmq is down
)
self.__kombu_version = parse_version(kombu.__version__)
self.__url = url
self.__manager_name = manager_name
self.__amqp_key_prefix = amqp_key_prefix
self.__connect_ssl = connect_ssl
self.__exchange = kombu.Exchange(DEFAULT_EXCHANGE_NAME, DEFAULT_EXCHANGE_TYPE)
self.__timeout = timeout
self.__republish_time = republish_time
# Be sure to log message publishing failures.
if publish_kwds.get("retry", False):
if "retry_policy" not in publish_kwds:
publish_kwds["retry_policy"] = {}
self.__publish_kwds = publish_kwds
self.publish_uuid_store = publish_uuid_store
self.consume_uuid_store = consume_uuid_store
self.publish_ack_lock = threading.Lock()
# Ack manager should sleep before checking for
# republishes, but if that changes, need to drain the
# queue once before the ack manager starts doing its
# thing
self.ack_manager_thread = self.__start_ack_manager()
@staticmethod
def __publish_errback(exc, interval, publish_log_prefix=""):
log.error("%sConnection error while publishing: %r", publish_log_prefix, exc, exc_info=1)
log.info("%sRetrying in %s seconds", publish_log_prefix, interval)
@property
def url(self):
return self.__url
@property
def acks_enabled(self):
return self.publish_uuid_store is not None
[docs] def consume(self, queue_name, callback, check=True, connection_kwargs={}):
queue = self.__queue(queue_name)
log.debug("Consuming queue '%s'", queue)
callbacks = [self.__ack_callback]
if callback is not None:
callbacks.append(callback)
while check:
heartbeat_thread = None
try:
with self.connection(self.__url, heartbeat=DEFAULT_HEARTBEAT, **connection_kwargs) as connection:
with kombu.Consumer(connection, queues=[queue], callbacks=callbacks, accept=['json']):
heartbeat_thread = self.__start_heartbeat(queue_name, connection)
while check and connection.connected:
try:
connection.drain_events(timeout=self.__timeout)
except socket.timeout:
pass
except self.recoverable_exceptions as exc:
self.__handle_io_error(exc, heartbeat_thread)
except BaseException:
log.exception("Problem consuming queue, consumer quitting in problematic fashion!")
raise
log.info("Done consuming queue %s" % queue_name)
def __ack_callback(self, body, message):
if ACK_UUID_KEY in body:
# The consumer of a normal queue has received a message requiring
# acknowledgement
ack_uuid = body[ACK_UUID_KEY]
ack_queue = body[ACK_QUEUE_KEY]
response = {ACK_UUID_RESPONSE_KEY: ack_uuid}
log.debug('Acknowledging UUID %s on queue %s', ack_uuid, ack_queue)
self.publish(ack_queue, response)
if self.consume_uuid_store is None:
log.warning('Received an ack request (UUID: %s, response queue: '
'%s) but ack UUID persistence is not enabled, check '
'your config', ack_uuid, ack_queue)
elif ack_uuid not in self.consume_uuid_store:
# This message has not been seen before, store the uuid so it
# is not operated on more than once
self.consume_uuid_store[ack_uuid] = time()
else:
# This message has been seen before, prevent downstream
# callbacks from processing normally by acknowledging it here,
# still send the ack reply
log.warning('Message with UUID %s on queue %s has already '
'been performed, skipping callback', ack_uuid, ack_queue)
message.ack()
elif ACK_UUID_RESPONSE_KEY in body:
# The consumer of an ack queue has received an ack, remove it from the store
ack_uuid = body[ACK_UUID_RESPONSE_KEY]
log.debug('Got acknowledgement for UUID %s, will remove from store', ack_uuid)
try:
with self.publish_ack_lock:
del self.publish_uuid_store[ack_uuid]
except KeyError:
log.warning('Cannot remove UUID %s from store, already removed', ack_uuid)
message.ack()
def __handle_io_error(self, exc: BaseException, heartbeat_thread: Optional[threading.Thread] = None):
# In testing, errno is None
log.warning('Got %s, will retry: %s', exc.__class__.__name__, exc)
try:
if heartbeat_thread:
heartbeat_thread.join(DEFAULT_HEARTBEAT_JOIN_TIMEOUT)
except Exception:
log.exception("Failed to join heartbeat thread, this is bad?")
try:
sleep(DEFAULT_RECONNECT_CONSUMER_WAIT)
except Exception:
log.exception("Interrupted sleep while waiting to reconnect to message queue, may restart unless problems encountered.")
[docs] def heartbeat(self, connection):
log.debug('AMQP heartbeat thread alive')
try:
while connection.connected:
connection.heartbeat_check()
sleep(DEFAULT_HEARTBEAT_WAIT)
except BaseException:
log.exception("Problem with heartbeat, leaving heartbeat method in problematic state!")
raise
log.debug('AMQP heartbeat thread exiting')
[docs] def publish(self, name, payload):
# Consider optionally disabling if throughput becomes main concern.
transaction_uuid = uuid.uuid1()
key = self.__queue_name(name)
publish_log_prefix = self.__publish_log_prefex(transaction_uuid)
log.debug("%sBegin publishing to key %s", publish_log_prefix, key)
if (self.acks_enabled and not name.endswith(ACK_QUEUE_SUFFIX) and
ACK_FORCE_NOACK_KEY not in payload):
# Publishing a message on a normal queue and it's not a republish
# (or explicitly forced do-not-ack), so add ack keys
ack_uuid = str(transaction_uuid)
ack_queue = name + ACK_QUEUE_SUFFIX
payload[ACK_UUID_KEY] = ack_uuid
payload[ACK_QUEUE_KEY] = ack_queue
payload[ACK_SUBMIT_QUEUE_KEY] = name
self.publish_uuid_store[ack_uuid] = payload
log.debug('Requesting acknowledgement of UUID %s on queue %s', ack_uuid, ack_queue)
with self.connection(self.__url) as connection:
with pools.producers[connection].acquire() as producer:
log.debug("%sHave producer for publishing to key %s", publish_log_prefix, key)
publish_kwds = self.__prepare_publish_kwds(publish_log_prefix)
producer.publish(
payload,
serializer='json',
exchange=self.__exchange,
declare=[self.__exchange],
routing_key=key,
**publish_kwds
)
log.debug("%sPublished to key %s", publish_log_prefix, key)
[docs] def ack_manager(self):
log.debug('Acknowledgement manager thread alive')
failed = set()
try:
while True:
sleep(DEFAULT_ACK_MANAGER_SLEEP)
with self.publish_ack_lock:
for unack_uuid in self.publish_uuid_store.keys():
if self.publish_uuid_store.get_time(unack_uuid) < time() - self.__republish_time:
payload = self.__get_payload(unack_uuid, failed)
if payload is None:
continue
payload[ACK_FORCE_NOACK_KEY] = True
resubmit_queue = payload[ACK_SUBMIT_QUEUE_KEY]
log.debug('UUID %s has not been acknowledged, '
'republishing original message on queue %s',
unack_uuid, resubmit_queue)
try:
self.publish(resubmit_queue, payload)
self.publish_uuid_store.set_time(unack_uuid)
except self.recoverable_exceptions as e:
self.__handle_io_error(e)
continue
except Exception:
log.exception("Problem with acknowledgement manager, leaving ack_manager method in problematic state!")
raise
log.debug('Acknowledgement manager thread exiting')
def __get_payload(self, uuid, failed):
"""Retry reading a message from the publish_uuid_store once, delete on the second failure."""
# Caller should have the publish_uuid_store lock
try:
return self.publish_uuid_store[uuid]
except Exception as exc:
msg = "Failed to load payload from publish store for UUID %s, %s: %s"
if uuid in failed:
log.error(msg, uuid, "discarding", str(exc))
self.__discard_publish_uuid(uuid, failed)
else:
log.error(msg, uuid, "will try agan", str(exc))
failed.add(uuid)
return None
def __discard_publish_uuid(self, uuid, failed):
try:
del self.publish_uuid_store[uuid]
failed.discard(uuid)
except Exception as exc:
log.error("Failed to discard UUID %s from publish store: %s", uuid, str(exc))
def __prepare_publish_kwds(self, publish_log_prefix):
if "retry_policy" in self.__publish_kwds:
publish_kwds = copy.deepcopy(self.__publish_kwds)
def errback(exc, interval):
return PulsarExchange.__publish_errback(exc, interval, publish_log_prefix)
publish_kwds["retry_policy"]["errback"] = errback
else:
publish_kwds = self.__publish_kwds
if self.__kombu_version < MINIMUM_KOMBU_VERSION_PUBLISH_TIMEOUT:
log.warning(f"kombu version {kombu.__version__} does not support timeout argument to publish. Consider updating to 5.2.0 or newer")
publish_kwds.pop("timeout", None)
return publish_kwds
def __publish_log_prefex(self, transaction_uuid=None):
prefix = ""
if transaction_uuid:
prefix = "[publish:%s] " % str(transaction_uuid)
return prefix
[docs] def connection(self, connection_string, **kwargs):
if "ssl" not in kwargs:
kwargs["ssl"] = self.__connect_ssl
return kombu.Connection(connection_string, **kwargs)
def __queue(self, name):
queue_name = self.__queue_name(name)
queue = kombu.Queue(queue_name, self.__exchange, routing_key=queue_name)
return queue
def __queue_name(self, name):
key_prefix = self.__key_prefix()
queue_name = '{}_{}'.format(key_prefix, name)
return queue_name
def __key_prefix(self):
if self.__amqp_key_prefix is not None:
key_prefix = self.__amqp_key_prefix
else:
if self.__manager_name == "_default_":
key_prefix = "pulsar_"
else:
key_prefix = "pulsar_%s_" % self.__manager_name
return key_prefix
def __start_heartbeat(self, queue_name, connection):
thread_name = "consume-heartbeat-%s" % (self.__queue_name(queue_name))
thread = threading.Thread(name=thread_name, target=self.heartbeat, args=(connection,))
thread.start()
return thread
def __start_ack_manager(self):
if self.acks_enabled:
thread_name = "acknowledgement-manager"
thread = threading.Thread(name=thread_name, target=self.ack_manager)
thread.daemon = True
thread.start()
return thread