Source code for pulsar.client.transport.tus
import os
import re
from typing import Dict
from urllib.parse import urlparse
import requests
try:
from tusclient import client
tus_client_available = True
except ImportError:
tus_client_available = False
TUS_CLIENT_UNAVAILABLE_MESSAGE = \
"You are attempting to use the Tus transport with the Pulsar client but tuspy is unavailable."
DEFAULT_PULSAR_TUS_CHUNK_SIZE = 10**7
PULSAR_TUS_CHUNK_SIZE = int(os.getenv('PULSAR_TUS_CHUNK_SIZE', DEFAULT_PULSAR_TUS_CHUNK_SIZE))
[docs]
def tus_upload_file(url: str, path: str) -> None:
if not tus_client_available:
raise Exception(TUS_CLIENT_UNAVAILABLE_MESSAGE)
storage = None
metadata: Dict[str, str] = {}
headers: Dict[str, str] = {}
tus_url = find_tus_endpoint(url)
my_client = client.TusClient(tus_url, headers=headers)
uploader = my_client.uploader(path, metadata=metadata, url_storage=storage)
uploader.chunk_size = PULSAR_TUS_CHUNK_SIZE
uploader.upload()
upload_session_url = uploader.url
assert upload_session_url
tus_session_id = upload_session_url.rsplit("/", 1)[1]
# job_key and such are encoded in the URL but this route expects a POST body
# and if it has one Galaxy sticks the URL parameters into the "payload" object
# for the controller. So encoded session_id in the POST body - it probably
# all belongs there anyway.
post_response = requests.post(url, data={"session_id": tus_session_id})
post_response.raise_for_status()
[docs]
def find_tus_endpoint(job_files_endpoint: str) -> str:
parsed = urlparse(job_files_endpoint)
job_files_url_path = parsed.path
tus_endpoint = re.sub(r"jobs/[^/]*/files", "job_files/resumable_upload", job_files_url_path, 1)
new_url = parsed._replace(path=tus_endpoint)
return new_url.geturl()