Source code for pulsar.scripts.mesos_executor
import logging
import sys
import threading
from pulsar.client.util import from_base64_json
from pulsar.main import (
ArgumentParser,
PulsarManagerConfigBuilder,
)
from pulsar.manager_endpoint_util import submit_job
from pulsar.mesos import (
ensure_mesos_libs,
Executor,
mesos_pb2,
MesosExecutorDriver,
)
from pulsar.scripts.submit_util import (
manager_from_args,
wait_for_job,
)
log = logging.getLogger(__name__)
DESCRIPTION = "Mesos executor for Pulsar"
[docs]
class PulsarExecutor(Executor):
def __task_update(self, driver, task, state, data=None):
try:
log.debug("Sending status update...")
update = mesos_pb2.TaskStatus()
update.task_id.value = task.task_id.value
update.state = state
if data:
update.data = data
driver.sendStatusUpdate(update)
except Exception:
log.exception("Failed to update status of task.")
[docs]
def launchTask(self, driver, task):
# Create a thread to run the task. Tasks should always be run in new
# threads or processes, rather than inside launchTask itself.
def run_task():
try:
log.info("Running task %s" % task.task_id.value)
task_data = from_base64_json(task.data)
manager_options = task_data["manager"]
config_builder = PulsarManagerConfigBuilder(**manager_options)
manager, pulsar_app = manager_from_args(config_builder)
job_config = task_data["job"]
submit_job(manager, job_config)
self.__task_update(driver, task, mesos_pb2.TASK_RUNNING)
wait_for_job(manager, job_config)
self.__task_update(driver, task, mesos_pb2.TASK_FINISHED)
pulsar_app.shutdown()
except Exception:
log.exception("Failed to run, update, or monitor task %s" % task)
raise
thread = threading.Thread(target=run_task)
thread.start()
[docs]
def frameworkMessage(self, driver, message):
# Send it back to the scheduler.
driver.sendFrameworkMessage(message)
[docs]
def run_executor(argv=None):
arg_parser = ArgumentParser(description=DESCRIPTION)
arg_parser.parse_args(argv)
ensure_mesos_libs()
log.info("Starting Pulsar executor")
driver = MesosExecutorDriver(PulsarExecutor())
exit_code = 0
if not driver.run() == mesos_pb2.DRIVER_STOPPED:
exit_code = 1
return exit_code
[docs]
def main(argv=None):
sys.exit(run_executor(argv))
if __name__ == "__main__":
main()