Source code for pulsar.client.transport.standard
"""
Pulsar HTTP Client layer based on Python Standard Library (urllib)
"""
import mmap
import socket
from os.path import getsize
from urllib.error import URLError
from urllib.request import (
Request,
urlopen,
)
from ..exceptions import PulsarClientTransportError
[docs]
class UrllibTransport:
def __init__(self, timeout=None, **kwrgs):
self.timeout = timeout
def _url_open(self, request, data):
# data is intentionally not used here (it is part of the request object), the parameter remains for tests
return urlopen(request, timeout=self.timeout)
[docs]
def execute(self, url, method=None, data=None, input_path=None, output_path=None):
request = self.__request(url, data, method)
input = None
try:
if input_path:
size = getsize(input_path)
if size:
input = open(input_path, 'rb')
data = mmap.mmap(input.fileno(), 0, access=mmap.ACCESS_READ)
else:
data = b""
# setting the data property clears content-length, so the header must be set after (if content-length is
# unset, urllib sets transfer-encoding to chunked, which is not supported by webob on the server side).
request.data = data
request.add_header('Content-Length', str(size))
try:
response = self._url_open(request, data)
except socket.timeout:
raise PulsarClientTransportError(code=PulsarClientTransportError.TIMEOUT)
except URLError as exc:
raise PulsarClientTransportError(transport_message=exc.reason)
finally:
if input:
input.close()
if output_path:
with open(output_path, 'wb') as output:
while True:
buffer = response.read(1024)
if not buffer:
break
output.write(buffer)
return response
else:
return response.read()
def __request(self, url, data, method):
request = Request(url=url, data=data)
if method:
request.get_method = lambda: method
return request