Source code for pulsar.client.transport.curl

import io
import logging
import os.path

try:
    import pycurl
    from pycurl import (
        Curl,
        error,
        HTTP_CODE,
    )
    curl_available = True
except ImportError:
    curl_available = False

from ..exceptions import PulsarClientTransportError

PYCURL_UNAVAILABLE_MESSAGE = \
    "You are attempting to use the Pycurl version of the Pulsar client but pycurl is unavailable."

NO_SUCH_FILE_MESSAGE = "Attempt to post file %s to URL %s, but file does not exist."
POST_FAILED_MESSAGE = "Failed to post_file properly for url %s, remote server returned status code of %s."
GET_FAILED_MESSAGE = "Failed to get_file properly for url %s, remote server returned status code of %s."

log = logging.getLogger(__name__)


[docs] class PycurlTransport: def __init__(self, timeout=None, **kwrgs): self.timeout = timeout
[docs] def execute(self, url, method=None, data=None, input_path=None, output_path=None): buf = _open_output(output_path) try: c = _new_curl_object_for_url(url) c.setopt(c.WRITEFUNCTION, buf.write) if method: c.setopt(c.CUSTOMREQUEST, method) if input_path: c.setopt(c.UPLOAD, 1) c.setopt(c.READFUNCTION, open(input_path, 'rb').read) filesize = os.path.getsize(input_path) c.setopt(c.INFILESIZE, filesize) if data: c.setopt(c.POST, 1) if isinstance(data, str): data = data.encode('UTF-8') c.setopt(c.POSTFIELDS, data) if self.timeout: c.setopt(c.TIMEOUT, self.timeout) try: c.perform() except error as exc: raise PulsarClientTransportError( _error_curl_to_pulsar(exc.args[0]), transport_code=exc.args[0], transport_message=exc.args[1]) if not output_path: return buf.getvalue() finally: buf.close()
[docs] def post_file(url, path): if not os.path.exists(path): # pycurl doesn't always produce a great exception for this, # wrap it in a better one. message = NO_SUCH_FILE_MESSAGE % (path, url) raise Exception(message) c = _new_curl_object_for_url(url) c.setopt(c.HTTPPOST, [("file", (c.FORM_FILE, path.encode('ascii')))]) c.perform() status_code = c.getinfo(HTTP_CODE) if int(status_code) != 200: message = POST_FAILED_MESSAGE % (url, status_code) raise Exception(message)
[docs] def get_file(url, path): if path and os.path.exists(path): buf = _open_output(path, 'ab') size = os.path.getsize(path) success_codes = (200, 206) else: buf = _open_output(path) size = 0 success_codes = (200,) try: c = _new_curl_object_for_url(url) c.setopt(c.WRITEFUNCTION, buf.write) if size > 0: log.info('transfer of %s will resume at %s bytes', url, size) c.setopt(c.RESUME_FROM, size) c.perform() status_code = int(c.getinfo(HTTP_CODE)) if status_code not in success_codes: message = GET_FAILED_MESSAGE % (url, status_code) raise Exception(message) finally: buf.close()
def _open_output(output_path, mode='wb'): return open(output_path, mode) if output_path else io.BytesIO() def _new_curl_object_for_url(url): c = _new_curl_object() c.setopt(c.URL, url.encode('ascii')) return c def _new_curl_object(): try: return Curl() except NameError: raise ImportError(PYCURL_UNAVAILABLE_MESSAGE) def _error_curl_to_pulsar(code): if code == pycurl.E_OPERATION_TIMEDOUT: return PulsarClientTransportError.TIMEOUT elif code == pycurl.E_COULDNT_CONNECT: return PulsarClientTransportError.CONNECTION_REFUSED return None __all__ = [ 'PycurlTransport', 'post_file', 'get_file' ]