Source code for pulsar.client.amqp_exchange_factory

from .amqp_exchange import PulsarExchange
from .util import (
    filter_destination_params,
    MessageQueueUUIDStore,
)


[docs] def get_exchange(url, manager_name, params): connect_ssl = parse_amqp_connect_ssl_params(params) exchange_kwds = dict( manager_name=manager_name, amqp_key_prefix=params.get("amqp_key_prefix"), connect_ssl=connect_ssl, publish_kwds=parse_amqp_publish_kwds(params), ) durable_param = params.get("amqp_durable", False) if isinstance(durable_param, str): durable_param = durable_param.strip().lower() in ("true", "1", "yes", "on") exchange_kwds["durable"] = bool(durable_param) if params.get('amqp_acknowledge', False): exchange_kwds.update(parse_ack_kwds(params, manager_name)) timeout = params.get('amqp_consumer_timeout', False) if timeout is not False: exchange_kwds['timeout'] = timeout exchange = PulsarExchange(url, **exchange_kwds) return exchange
[docs] def parse_amqp_connect_ssl_params(params): ssl_params = filter_destination_params(params, "amqp_connect_ssl_") if not ssl_params: return ssl = __import__('ssl') if 'cert_reqs' in ssl_params: value = ssl_params['cert_reqs'] ssl_params['cert_reqs'] = getattr(ssl, value.upper()) return ssl_params
DEFAULT_PUBLISH_RETRY_POLICY = { "max_retries": 5, "interval_start": 1, "interval_step": 2, "interval_max": 30, }
[docs] def parse_amqp_publish_kwds(params): all_publish_params = filter_destination_params(params, "amqp_publish_") retry_policy_params = {} for key in all_publish_params.copy().keys(): if key.startswith("retry_"): value = all_publish_params[key] retry_policy_params[key[len("retry_"):]] = value del all_publish_params[key] if all_publish_params.get("retry"): # Defense-in-depth: a single broker hiccup at the wrong moment must not # drop a status_update. Defaults are bounded so we don't block the # postprocess thread indefinitely; the persistent outbox handles the # case where retries are exhausted. for key, default in DEFAULT_PUBLISH_RETRY_POLICY.items(): retry_policy_params.setdefault(key, default) if retry_policy_params: all_publish_params["retry_policy"] = retry_policy_params all_publish_params.setdefault("retry", True) return all_publish_params
[docs] def parse_ack_kwds(params, manager_name): ack_params = {} persistence_directory = params.get('persistence_directory', None) if persistence_directory: subdirs = ['amqp_ack-%s' % manager_name] ack_params['publish_uuid_store'] = MessageQueueUUIDStore(persistence_directory, subdirs=subdirs + ['publish']) ack_params['consume_uuid_store'] = MessageQueueUUIDStore(persistence_directory, subdirs=subdirs + ['consume']) republish_time = params.get('amqp_ack_republish_time', None) if republish_time: ack_params['republish_time'] = int(republish_time) return ack_params