Source code for pulsar.client.amqp_exchange_factory
from .amqp_exchange import PulsarExchange
from .util import (
filter_destination_params,
MessageQueueUUIDStore,
)
[docs]def get_exchange(url, manager_name, params):
connect_ssl = parse_amqp_connect_ssl_params(params)
exchange_kwds = dict(
manager_name=manager_name,
amqp_key_prefix=params.get("amqp_key_prefix"),
connect_ssl=connect_ssl,
publish_kwds=parse_amqp_publish_kwds(params),
)
if params.get('amqp_acknowledge', False):
exchange_kwds.update(parse_ack_kwds(params, manager_name))
timeout = params.get('amqp_consumer_timeout', False)
if timeout is not False:
exchange_kwds['timeout'] = timeout
exchange = PulsarExchange(url, **exchange_kwds)
return exchange
[docs]def parse_amqp_connect_ssl_params(params):
ssl_params = filter_destination_params(params, "amqp_connect_ssl_")
if not ssl_params:
return
ssl = __import__('ssl')
if 'cert_reqs' in ssl_params:
value = ssl_params['cert_reqs']
ssl_params['cert_reqs'] = getattr(ssl, value.upper())
return ssl_params
[docs]def parse_amqp_publish_kwds(params):
all_publish_params = filter_destination_params(params, "amqp_publish_")
retry_policy_params = {}
for key in all_publish_params.copy().keys():
if key.startswith("retry_"):
value = all_publish_params[key]
retry_policy_params[key[len("retry_"):]] = value
del all_publish_params[key]
if retry_policy_params:
all_publish_params["retry_policy"] = retry_policy_params
return all_publish_params
[docs]def parse_ack_kwds(params, manager_name):
ack_params = {}
persistence_directory = params.get('persistence_directory', None)
if persistence_directory:
subdirs = ['amqp_ack-%s' % manager_name]
ack_params['publish_uuid_store'] = MessageQueueUUIDStore(persistence_directory, subdirs=subdirs + ['publish'])
ack_params['consume_uuid_store'] = MessageQueueUUIDStore(persistence_directory, subdirs=subdirs + ['consume'])
republish_time = params.get('amqp_ack_republish_time', None)
if republish_time:
ack_params['republish_time'] = int(republish_time)
return ack_params