Source code for micro.web.app

"""
Contains core application structure for Micro.Web.
"""
import asyncio
import logging
import re
import socket
import weakref

from aiohttp import web_exceptions, web_urldispatcher, hdrs
from aiolocals import configure_logging as aiolocals_configure_logging,\
    context_middleware_factory as context_wrapper_factory

from . import middleware
from .factory import ApiRequestHandlerFactory
from .response import Response
from .utils import Borg, is_debug, maybe_coroutine, get_printable_sockinfo


logger = logging.getLogger(__name__)


def default_404_handler(exc, request):
    return Response(status=404, text="No matched route for request: {!r}".format(request.path))


[docs]class Router(web_urldispatcher.UrlDispatcher): """ This is basically aiohttp router which capable of mounting resources into prefixes. """ PREFIX_RE = re.compile(r"^/[/.a-zA-Z0-9-]+$") def _joinpaths(self, prefix, path): # Disrespect the ending slashes if prefix.endswith('/'): path = path.lstrip('/') elif path == '/': path = '' return prefix + path def _joinrepaths(self, prefix, regex): if prefix.endswith('/'): prefix = prefix.rstrip('/') if prefix.startswith('/'): prefix = prefix.lstrip('/') if prefix: pattern = regex.pattern.replace('/', '/%s/' % prefix, 1) else: pattern = regex.pattern return re.compile(pattern) @property def routes(self): return self._routes
[docs] def mount(self, prefix, router): """ Add the routes of child router with prefix. :param prefix: Prefix :param router: micro.web.app.Router """ if not self.PREFIX_RE.match(prefix) and prefix != '/': raise RuntimeError("An attempt to mount resource with invalid prefix") for (route_name, route) in router.routes.items(): if isinstance(route, web_urldispatcher.PlainRoute): new_route = self.add_route( route.method, self._joinpaths(prefix, route._path), route.handler, name=route.name) elif isinstance(route, web_urldispatcher.DynamicRoute): new_route = web_urldispatcher.DynamicRoute( route.method, route.handler, route.name, self._joinrepaths(prefix, route._pattern), self._joinpaths(prefix, route._formatter) ) self.register_route(new_route) new_route.handler.route = new_route # The next is for invoking additional configuration hooks on the application when the mounting of resource # is performed for callback in GlobalState().mount_callbacks: callback(route, new_route)
class GlobalState(metaclass=Borg): """ This maintains global application state. """ bound_instances = weakref.WeakKeyDictionary() mount_callbacks = [] route_callbacks = [] finish_callbacks = [] loop = None app_dict = {} wrappers = [context_wrapper_factory, ] error_handlers = {404: default_404_handler} debug = is_debug()
[docs]class Micro: """ Micro is an entity which is in charge of routing configuration for our application and exception handling. """ def __init__(self): self.global_state = GlobalState() self._router = Router() self._instance = None self._mounts = weakref.WeakValueDictionary() if not self.global_state.loop: self.global_state.loop = asyncio.get_event_loop() @property
[docs] def loop(self): """ Event loop instance. """ return self.global_state.loop
@property
[docs] def router(self): """ An application router instance. """ return self._router
@property def mounts(self): return self._mounts @property
[docs] def debug(self): """ Debug flag. """ return self.global_state.debug
@property
[docs] def instance(self): """ An instance of resource which is bound to this app, or None if app initialized not in resource scope. """ return self._instance
@property
[docs] def wrappers(self): """ A list of aiohttp`s middleware factories. """ return self.global_state.wrappers
@property
[docs] def error_handlers(self): """ Dictionary of error handlers. """ return self.global_state.error_handlers
def __get__(self, instance, owner): """ Get Micro instance which is bound to self. """ if instance is None: return self micro = self.global_state.bound_instances.get(instance, None) if micro is None: micro = Micro() micro.clone(self) self.global_state.bound_instances[instance] = micro micro.set_instance(instance) return micro #### #### Dictionary interface start #### def __getitem__(self, item): return self.global_state.app_dict[item] def __setitem__(self, key, value): self.global_state.app_dict[key] = value def __delitem__(self, key): del self.global_state.app_dict[key] #### #### Dictionary interface end #### def __call__(self, *args, **kwargs): """Gunicorn compatibility""" return self @asyncio.coroutine
[docs] def finish(self): """ Executes the callbacks registered for finish. """ for cb in self.global_state.finish_callbacks: try: yield from maybe_coroutine(cb) except Exception as exc: self.loop.call_exception_handler({ 'message': "Error in finish callback", 'exception': exc, 'application': self, })
[docs] def set_debug(self, debug): """ Sets the debug mode. :param debug: Boolean indicating whether to run application in debug mode or not :type debug: bool :return: """ GlobalState().debug = debug
[docs] def set_instance(self, instance): """ Updates the instance of resource to which the application is bound. """ self._instance = instance
[docs] def clone(self, micro): """ Clones the instance`s properties into self. """ self._router = micro.router self._instance = micro.instance self._mounts = micro.mounts self._debug = micro.debug
[docs] def set_finish_callbacks(self, *callbacks): """ Add the callbacks that will be performed when application finishes. :param callbacks: :type callbacks: iterable """ for callback in callbacks: self.global_state.finish_callbacks.append(callback)
[docs] def serve_endpoint(self, route, request, *args, **kwargs): """ Serve an application handler with care of matching the needed resource for methods as handlers. :param route: A route instance :type route: aiohttp.web_urldispatcher.Route :param request: A request instance :type request: micro.core.request.Request """ if isinstance(route, web_urldispatcher.StaticRoute): request._keep_alive = False return route.handler(request) instance = self._get_instance_for_request(request) if instance is not None: return route.handler(instance, request, *args, **kwargs) else: return route.handler(request, *args, **kwargs)
[docs] def mount(self, prefix, resource): """ Mount the resource and maps all its routes to be accessible within prefix. :param prefix: A string prefix, which may start with "/" (auto-prepended if not) :type prefix: str :param resource: A resource - an object which holds instance of :class:`Micro` in its scope. """ for attr_name in dir(resource): if not attr_name.startswith('__') and not attr_name.endswith('__'): attribute = getattr(resource, attr_name) if isinstance(attribute, Micro): self._add_mount(attribute.router, prefix, resource, attribute) break else: raise RuntimeError("An attempt to mount resource without bound Micro instance")
[docs] def route(self, url, method=hdrs.METH_ANY, endpoint=None): """ Wrap the function to make it accessible via URL. :param url: aiohttp routing expression :type url: str or regular expression :param method: HTTP method which should be passed onto micro.web.app.Router.add_route :type method: string :param endpoint: Route identifier which is used internally to maintain the state of routes. Endpoints should be unique across the application. Defaults to wrapped function name. :type endpoint: str """ def wrapper(f): route = self._router.add_route(method, url, f, name=endpoint or f.__name__) f.route = route for callback in self.global_state.route_callbacks: callback(route) return f return wrapper
[docs] def set_error_handler(self, status_code, callback): """ Set error handler callback for specific response status code. :param status_code: status code :param callback: error handler function """ self.error_handlers[status_code] = callback
[docs] def make_handler(self, *args, **kwargs): """ Make a protocol factory which compatible with asyncio.loop.create_server. :return: """ logger.info("Initializing HipchatMicro framework") return ApiRequestHandlerFactory(self, debug=self.debug)
[docs] def run(self, host='0.0.0.0', port=7000, name=None, configure_logging=True, socket_family=socket.AF_UNSPEC): """ A "run" handler, primarily for debugging purposes. This is only called when the server is started without gunicorn. :param host: A host where to bind. Probably either '0.0.0.0' or 'localhost'. :param port: A port where to bind :param name: Service name to use in logging. If none, tries to discover it from gunicorn app name. :param configure_logging: Whether to apply default aiolocals logging configuration or not :param socket_family The socket family which will be passed to server socket constructor """ if configure_logging: aiolocals_configure_logging(name, self.debug) loop = asyncio.get_event_loop() create_server_task = loop.create_server(self.make_handler(), host, port, family=socket_family) server_instance = loop.run_until_complete(create_server_task) host, port = get_printable_sockinfo(server_instance) logger.info("{app_name} running on {host}:{port}".format(app_name=name, host=host, port=port)) loop.run_forever()
[docs] def default_error_handler(self, exception_instance, request): """ Default error handler. :param exception_instance: Instance of the exception to handle :type exception_instance: Exception :param request: :type request: micro.core.request.Request :return: """ logger.error('Unhandled exception: %r'.format(exception_instance)) if isinstance(exception_instance, web_exceptions.HTTPError): status = exception_instance.status else: status = 500 if self.debug: return request.protocol.handle_error(status=status, exc=exception_instance) else: if isinstance(exception_instance, web_exceptions.HTTPException): return exception_instance else: return web_exceptions.HTTPInternalServerError(text="Internal server error")
@asyncio.coroutine
[docs] def handle_error(self, e, request, endpoint_name): """ Error handler. :param e: Exception happened while processing request :param request: micro.web.request.Request instance :param endpoint_name: Name of endpoint which produced the request :return: """ available_middlewares = middleware.get_available_middlewares(endpoint_name) middleware_response = yield from middleware.middleware_hook(available_middlewares, 'exception', request, e) if middleware_response is not None: middleware_class, response = middleware_response logger.debug("Got middleware %s response within exception hook %r" % (middleware_class.__name__, request)) return response else: status_code = getattr(e, 'status_code', None) error_handler = self.error_handlers.get(status_code, self.default_error_handler) logging.debug("Handling exception {!r} with error handler {!r}".format(e, error_handler)) return error_handler(e, request)
def _add_mount(self, child_router, prefix, resource, micro): """ Add mount to router, handle instance configuration. """ if not prefix.startswith('/'): prefix = '/' + prefix self._router.mount(prefix, child_router) self._mounts[prefix if not prefix.endswith('/') else prefix.rstrip('/')] = resource if micro.mounts: for mount_prefix, mount in micro.mounts.items(): mount_prefix = prefix + mount_prefix self._mounts[mount_prefix] = mount micro.clone(self) def _get_instance_for_request(self, request): """ Gets instance of resource for request with awareness for mounts. """ path = request.path matches = [] for prefix in self._mounts.keys(): if path.startswith(prefix): matches.append(prefix) if matches: prefix = sorted(matches, key=lambda pref: -len(pref))[0] instance = self._mounts.get(prefix) return instance else: return self._instance
global_micro = Micro() __all__ = ["Micro", "GlobalState"]