""" CLI related utilities for submitting Pulsar jobs.
"""
import fnmatch
import sys
import uuid
from pulsar.client import (
ClientJobDescription,
ClientOutputs,
finish_job,
PulsarOutputs,
submit_job,
)
from pulsar.client.test.check import (
client_info,
extract_client_options,
HELP_DISABLE_CLEANUP,
HELP_JOB_ID,
HELP_PRIVATE_TOKEN,
HELP_SUPPRESS_OUTPUT,
HELP_TRANSPORT,
HELP_URL,
Waiter,
)
from pulsar.client.util import json_dumps
from pulsar.main import ArgumentParser
from pulsar.scripts.submit_util import (
add_common_submit_args,
run_server_for_job,
)
HELP_AMQP_URL = "Communicate with Pulsar listining on a message queue at this URL."
HELP_SERVER = "Run a Pulsar server locally instead of contacting a remote one."
HELP_COMMAND = "Shell command to execute on Pulsar server."
HELP_WORKING_DIRECTORY = "Local working directory (will be translated to a new directory)."
HELP_OUTPUT = "Output glob to collect from job (relative to remote working directory)."
HELP_OUTPUT_PATTERN = "Output pattern to collect from job (relative to remote working directory)."
DEFAULT_CLIENT_URL = 'http://localhost:8913/'
[docs]
def main(argv=None):
mod_docstring = sys.modules[__name__].__doc__
arg_parser = ArgumentParser(description=mod_docstring)
add_common_submit_args(arg_parser)
arg_parser.add_argument('--url', default=DEFAULT_CLIENT_URL, help=HELP_URL)
arg_parser.add_argument('--amqp_url', default=DEFAULT_CLIENT_URL, help=HELP_AMQP_URL)
arg_parser.add_argument('--private_token', default=None, help=HELP_PRIVATE_TOKEN)
# TODO: choices...
arg_parser.add_argument('--default_file_action', default="none")
arg_parser.add_argument('--file_action_config', default=None)
arg_parser.add_argument('--transport', default=None, choices=["urllib", "curl"], help=HELP_TRANSPORT) # set to curl to use pycurl
arg_parser.add_argument('--suppress_output', default=False, action="store_true", help=HELP_SUPPRESS_OUTPUT)
arg_parser.add_argument('--disable_cleanup', dest="cleanup", default=True, action="store_false", help=HELP_DISABLE_CLEANUP)
arg_parser.add_argument('--server', default=False, action="store_true", help=HELP_SERVER)
arg_parser.add_argument('--job_id', default=None, help=HELP_JOB_ID)
arg_parser.add_argument('--command', help=HELP_COMMAND)
arg_parser.add_argument('--working_directory', default=".", help=HELP_WORKING_DIRECTORY)
arg_parser.add_argument('--result_json', default=None)
arg_parser.add_argument('--output', default=[], action="append", help=HELP_OUTPUT)
arg_parser.add_argument('--output_pattern', default=[], action="append", help=HELP_OUTPUT_PATTERN)
args = arg_parser.parse_args(argv)
if args.server:
return run_server_for_job(args)
else:
failed = _run_client_for_job(args)
if failed:
return 1
else:
return 0
def _run_client_for_job(args):
if args.job_id is None:
args.job_id = str(uuid.uuid4())
output_patterns = []
output_patterns.extend(args.output_pattern)
for output in args.output:
output_patterns.append(fnmatch.translate(output))
client_options = extract_client_options(args)
client, client_manager = client_info(args, client_options)
try:
working_directory = args.working_directory
client_outputs = ClientOutputs(
working_directory=working_directory,
dynamic_outputs=output_patterns,
)
job_description = ClientJobDescription(
command_line=args.command,
working_directory=working_directory,
client_outputs=client_outputs,
)
submit_job(client, job_description)
waiter = Waiter(client, client_manager)
result_status = waiter.wait()
pulsar_outputs = PulsarOutputs.from_status_response(result_status)
if args.result_json:
open(args.result_json, "w").write(json_dumps(result_status))
finish_args = dict(
client=client,
job_completed_normally=True,
cleanup_job=args.cleanup,
client_outputs=client_outputs,
pulsar_outputs=pulsar_outputs,
)
failed = finish_job(**finish_args)
return failed
finally:
client_manager.shutdown()
if __name__ == "__main__":
main()