Source code for pulsar.client.transport.standard

"""
Pulsar HTTP Client layer based on Python Standard Library (urllib)
"""


import mmap
import socket
from os.path import getsize
from urllib.error import (
    HTTPError,
    URLError,
)
from urllib.request import (
    Request,
    urlopen,
)

from ..exceptions import PulsarClientTransportError


[docs] class UrllibTransport: def __init__(self, timeout=None, **kwrgs): self.timeout = timeout def _url_open(self, request, data): # data is intentionally not used here (it is part of the request object), the parameter remains for tests return urlopen(request, timeout=self.timeout)
[docs] def execute(self, url, method=None, data=None, input_path=None, output_path=None): request = self.__request(url, data, method) input = None try: if input_path: size = getsize(input_path) if size: input = open(input_path, 'rb') data = mmap.mmap(input.fileno(), 0, access=mmap.ACCESS_READ) else: data = b"" # setting the data property clears content-length, so the header must be set after (if content-length is # unset, urllib sets transfer-encoding to chunked, which is not supported by webob on the server side). request.data = data request.add_header('Content-Length', str(size)) try: response = self._url_open(request, data) except (socket.timeout, TimeoutError): raise PulsarClientTransportError(code=PulsarClientTransportError.TIMEOUT) except HTTPError as exc: raise PulsarClientTransportError( transport_code=exc.code, transport_message=exc.reason, ) except URLError as exc: raise PulsarClientTransportError(transport_message=exc.reason) finally: if input: input.close() if output_path: with open(output_path, 'wb') as output: while True: buffer = response.read(1024) if not buffer: break output.write(buffer) return response else: return response.read()
def __request(self, url, data, method): request = Request(url=url, data=data) if method: request.get_method = lambda: method return request