Source code for pulsar.client.transport.requests
import logging
try:
import requests
except ImportError:
requests = None # type: ignore
try:
import requests_toolbelt
requests_multipart_post_available = True
except ImportError:
requests_multipart_post_available = False
requests_toolbelt = None # type: ignore
REQUESTS_UNAVAILABLE_MESSAGE = "Pulsar configured to use requests module - but it is unavailable. Please install requests."
REQUESTS_TOOLBELT_UNAVAILABLE_MESSAGE = "Pulsar configured to use requests_toolbelt module - but it is unavailable. Please install requests_toolbelt."
log = logging.getLogger(__name__)
[docs]
def post_file(url, path):
if requests_toolbelt is None:
raise ImportError(REQUESTS_TOOLBELT_UNAVAILABLE_MESSAGE)
__ensure_requests()
m = requests_toolbelt.MultipartEncoder(
fields={'file': ('filename', open(path, 'rb'))}
)
requests.post(url, data=m, headers={'Content-Type': m.content_type})
[docs]
def get_file(url, path):
__ensure_requests()
r = requests.get(url, stream=True)
with open(path, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
f.flush()
def __ensure_requests():
if requests is None:
raise ImportError(REQUESTS_UNAVAILABLE_MESSAGE)