Source code for pulsar.client.transport.poster

import logging

try:
    from urllib2 import urlopen
except ImportError:
    from urllib.request import urlopen
try:
    from urllib2 import Request
except ImportError:
    from urllib.request import Request

try:
    import poster
except ImportError:
    poster = None

POSTER_UNAVAILABLE_MESSAGE = "Pulsar configured to use poster module - but it is unavailable. Please install poster."

log = logging.getLogger(__name__)


if poster is not None:
    poster.streaminghttp.register_openers()


[docs] def post_file(url, path): __ensure_poster() try: datagen, headers = poster.encode.multipart_encode({"file": open(path, "rb")}) request = Request(url, datagen, headers) return urlopen(request).read() except Exception: log.exception("Problem with poster post of [%s]" % path) raise
[docs] def get_file(url, path): __ensure_poster() request = Request(url=url) response = urlopen(request) with open(path, 'wb') as output: while True: buffer = response.read(1024) if not buffer: break output.write(buffer)
def __ensure_poster(): if poster is None: raise ImportError(POSTER_UNAVAILABLE_MESSAGE)