Source code for pulsar.client.destination
from re import match
from .util import filter_destination_params
SUBMIT_PREFIX = "submit_"
[docs]
def url_to_destination_params(url):
"""Convert a legacy runner URL to a job destination
>>> params_simple = url_to_destination_params("http://localhost:8913/")
>>> params_simple["url"]
'http://localhost:8913/'
>>> params_simple["private_token"] is None
True
>>> advanced_url = "https://1234x@example.com:8914/managers/longqueue"
>>> params_advanced = url_to_destination_params(advanced_url)
>>> params_advanced["url"]
'https://example.com:8914/managers/longqueue/'
>>> params_advanced["private_token"]
'1234x'
>>> runner_url = "pulsar://http://localhost:8913/"
>>> runner_params = url_to_destination_params(runner_url)
>>> runner_params['url']
'http://localhost:8913/'
"""
if url.startswith("pulsar://"):
url = url[len("pulsar://"):]
if not url.endswith("/"):
url += "/"
# Check for private token embedded in the URL. A URL of the form
# https://moo@cow:8913 will try to contact https://cow:8913
# with a private key of moo
private_token_format = "https?://(.*)@.*/?"
private_token_match = match(private_token_format, url)
private_token = None
if private_token_match:
private_token = private_token_match.group(1)
url = url.replace("%s@" % private_token, '', 1)
destination_args = {"url": url,
"private_token": private_token}
return destination_args
[docs]
def submit_params(destination_params):
"""
>>> destination_params = {"private_token": "12345", "submit_native_specification": "-q batch"}
>>> result = submit_params(destination_params)
>>> result
{'native_specification': '-q batch'}
"""
return filter_destination_params(destination_params, SUBMIT_PREFIX)