Source code for pulsar.scripts.submit_util

""" CLI related utilities for submitting Pulsar jobs.
"""
import json
import logging
import time

from pulsar.client.util import from_base64_json
from pulsar.main import (
    load_pulsar_app,
    PulsarManagerConfigBuilder,
)
from pulsar.manager_endpoint_util import submit_job
from pulsar.managers.status import is_job_done

log = logging.getLogger(__name__)

DEFAULT_POLL_TIME = 2


[docs] def add_common_submit_args(arg_parser): arg_parser.add_argument("--file", default=None) arg_parser.add_argument("--base64", default=None) PulsarManagerConfigBuilder.populate_options(arg_parser)
[docs] def run_server_for_job(args): wait = args.wait config_builder = PulsarManagerConfigBuilder(args) manager, app = manager_from_args(config_builder) try: job_config = _load_job_config(args) submit_job(manager, job_config) if wait: log.info("Co-execution job setup, now waiting for job completion and postprocessing.") wait_for_job(manager, job_config) log.info("Leaving finish_execution and shutting down app") except BaseException: if wait: message = "Failure submitting or waiting on job." else: message = "Failure submitting job." log.exception(message) finally: app.shutdown()
[docs] def run_server_for_job_finish(args): config_builder = PulsarManagerConfigBuilder(args) manager, app = manager_from_args(config_builder) try: # We only need the job config so there should be an option to just # send that I think. job_config = _load_job_config(args) job_id = job_config.get('job_id') log.info("Informing Pulsar app the target job has completed") manager._proxied_manager.finish_execution(job_id) log.info("Waiting for job to complete") wait_for_job(manager, job_config) log.info("Leaving finish_execution and shutting down app") except BaseException: log.exception("Failure finishing job.") finally: app.shutdown()
[docs] def wait_for_job(manager, job_config, poll_time=DEFAULT_POLL_TIME): job_id = job_config.get('job_id') while True: status = manager.get_status(job_id) if is_job_done(status): break time.sleep(poll_time)
def _load_job_config(args): if args.base64: base64_job_config = args.base64 job_config = from_base64_json(base64_job_config) else: job_config = json.load(open(args.file)) return job_config
[docs] def manager_from_args(config_builder): manager_name = config_builder.manager pulsar_app = load_pulsar_app( config_builder, # Set message_queue_consume so this Pulsar app doesn't try to consume # setup/kill messages and only publishes status updates to configured # queue. message_queue_consume=False, ) manager = pulsar_app.managers[manager_name] return manager, pulsar_app