Source code for pulsar.web.wsgi

import atexit
import inspect
import logging

import pulsar.web.routes
from pulsar.core import PulsarApp
from pulsar.main import load_app_configuration
from pulsar.web.framework import RoutingApp

log = logging.getLogger(__name__)


[docs] def app_factory(global_conf, **local_conf): """ Returns the Pulsar WSGI application. """ configuration_file = global_conf.get("__file__", None) webapp = init_webapp(ini_path=configuration_file, local_conf=local_conf) return webapp
[docs] def init_webapp(**config_kwds): app_conf = load_app_configuration(**config_kwds) pulsar_app = PulsarApp(**app_conf) webapp = PulsarWebApp(pulsar_app=pulsar_app) atexit.register(webapp.shutdown) return webapp
[docs] class PulsarWebApp(RoutingApp): """ Web application for Pulsar web server. """ def __init__(self, pulsar_app): super().__init__() self.pulsar_app = pulsar_app self.__setup_routes() def __setup_routes(self): for func_name, func in inspect.getmembers(pulsar.web.routes, lambda x: getattr(x, '__controller__', False)): self.__add_route_for_function(func) def __add_route_for_function(self, function): path = function.__path__ method = function.__method__ # Default or old-style route without explicit manager specified, # will be routed to manager '_default_'. default_manager_route = path self.add_route(default_manager_route, method, function) # Add route for named manager as well. named_manager_route = '/managers/{manager_name}%s' % path self.add_route(named_manager_route, method, function) def __getattr__(self, name): return getattr(self.pulsar_app, name)