Source code for pulsar.client.transport.curl

import io
import logging
import os.path

import requests
try:
    import pycurl
    from pycurl import (
        Curl,
        error,
        HTTP_CODE,
    )
    curl_available = True
except ImportError:
    curl_available = False

from ..exceptions import PulsarClientTransportError

PYCURL_UNAVAILABLE_MESSAGE = \
    "You are attempting to use the Pycurl version of the Pulsar client but pycurl is unavailable."

NO_SUCH_FILE_MESSAGE = "Attempt to post file %s to URL %s, but file does not exist."
POST_FAILED_MESSAGE = "Failed to post_file properly for url %s, remote server returned status code of %s."
GET_FAILED_MESSAGE = "Failed to get_file properly for url %s, remote server returned status code of %s."

log = logging.getLogger(__name__)


[docs] class PycurlTransport: def __init__(self, timeout=None, **kwrgs): self.timeout = timeout
[docs] def execute(self, url, method=None, data=None, input_path=None, output_path=None): buf = _open_output(output_path) try: c = _new_curl_object_for_url(url) c.setopt(c.WRITEFUNCTION, buf.write) if method: c.setopt(c.CUSTOMREQUEST, method) if input_path: c.setopt(c.UPLOAD, 1) c.setopt(c.READFUNCTION, open(input_path, 'rb').read) filesize = os.path.getsize(input_path) c.setopt(c.INFILESIZE, filesize) if data: c.setopt(c.POST, 1) if isinstance(data, str): data = data.encode('UTF-8') c.setopt(c.POSTFIELDS, data) if self.timeout: c.setopt(c.TIMEOUT, self.timeout) try: c.perform() except error as exc: raise PulsarClientTransportError( _error_curl_to_pulsar(exc.args[0]), transport_code=exc.args[0], transport_message=exc.args[1]) if not output_path: return buf.getvalue() finally: buf.close()
[docs] def post_file(url, path): if not os.path.exists(path): # pycurl doesn't always produce a great exception for this, # wrap it in a better one. message = NO_SUCH_FILE_MESSAGE % (path, url) raise Exception(message) c = _new_curl_object_for_url(url) c.setopt(c.HTTPPOST, [("file", (c.FORM_FILE, path.encode('ascii')))]) c.perform() status_code = int(c.getinfo(HTTP_CODE)) if status_code != 200: raise PulsarClientTransportError( transport_code=status_code, transport_message=POST_FAILED_MESSAGE % (url, status_code), )
def get_size(url) -> int: response = requests.head(url, headers={"accept-encoding": "identity"}) if response.status_code >= 299: log.warning("Response to HEAD request for '%s' with status code %s, cannot resume download", url, response.status_code) return -1 try: return int(response.headers["content-length"]) except KeyError: log.error("'content-length' header not sent for '%s', cannot resume download", url) return -1
[docs] def get_file(url, path: str): success_codes = [200] size = 0 if os.path.exists(path): size = os.path.getsize(path) remote_size = get_size(url) if size and remote_size == size: # Already got the whole file, fixes https://github.com/galaxyproject/pulsar/issues/340 return if remote_size == -1: # Don't know how large remote file is, so we'll have to start over size = 0 buf = _open_output(path) else: # We got some data left to download buf = _open_output(path, 'ab') success_codes = [200, 206] else: # definitely a new download buf = _open_output(path) try: c = _new_curl_object_for_url(url) c.setopt(c.WRITEFUNCTION, buf.write) if size > 0: log.info('transfer of %s will resume at %s bytes', url, size) c.setopt(c.RESUME_FROM, size) c.perform() status_code = int(c.getinfo(HTTP_CODE)) if status_code not in success_codes: raise PulsarClientTransportError( transport_code=status_code, transport_message=GET_FAILED_MESSAGE % (url, status_code), ) finally: buf.close()
def _open_output(output_path, mode='wb'): return open(output_path, mode) if output_path else io.BytesIO() def _new_curl_object_for_url(url): c = _new_curl_object() c.setopt(c.URL, url.encode('ascii')) return c def _new_curl_object(): try: return Curl() except NameError: raise ImportError(PYCURL_UNAVAILABLE_MESSAGE) def _error_curl_to_pulsar(code): if code == pycurl.E_OPERATION_TIMEDOUT: return PulsarClientTransportError.TIMEOUT elif code == pycurl.E_COULDNT_CONNECT: return PulsarClientTransportError.CONNECTION_REFUSED return None __all__ = [ 'PycurlTransport', 'post_file', 'get_file' ]