"""
Contains core application structure for Micro.Web.
"""
import asyncio
import logging
import re
import socket
import weakref
from aiohttp import web_exceptions, web_urldispatcher, hdrs
from aiolocals import configure_logging as aiolocals_configure_logging,\
context_middleware_factory as context_wrapper_factory
from . import middleware
from .factory import ApiRequestHandlerFactory
from .response import Response
from .utils import Borg, is_debug, maybe_coroutine, get_printable_sockinfo
logger = logging.getLogger(__name__)
def default_404_handler(exc, request):
return Response(status=404, text="No matched route for request: {!r}".format(request.path))
[docs]class Router(web_urldispatcher.UrlDispatcher):
"""
This is basically aiohttp router which capable of mounting resources into prefixes.
"""
PREFIX_RE = re.compile(r"^/[/.a-zA-Z0-9-]+$")
def _joinpaths(self, prefix, path):
# Disrespect the ending slashes
if prefix.endswith('/'):
path = path.lstrip('/')
elif path == '/':
path = ''
return prefix + path
def _joinrepaths(self, prefix, regex):
if prefix.endswith('/'):
prefix = prefix.rstrip('/')
if prefix.startswith('/'):
prefix = prefix.lstrip('/')
if prefix:
pattern = regex.pattern.replace('/', '/%s/' % prefix, 1)
else:
pattern = regex.pattern
return re.compile(pattern)
@property
def routes(self):
return self._routes
[docs] def mount(self, prefix, router):
"""
Add the routes of child router with prefix.
:param prefix: Prefix
:param router: micro.web.app.Router
"""
if not self.PREFIX_RE.match(prefix) and prefix != '/':
raise RuntimeError("An attempt to mount resource with invalid prefix")
for (route_name, route) in router.routes.items():
if isinstance(route, web_urldispatcher.PlainRoute):
new_route = self.add_route(
route.method, self._joinpaths(prefix, route._path), route.handler, name=route.name)
elif isinstance(route, web_urldispatcher.DynamicRoute):
new_route = web_urldispatcher.DynamicRoute(
route.method, route.handler, route.name, self._joinrepaths(prefix, route._pattern),
self._joinpaths(prefix, route._formatter)
)
self.register_route(new_route)
new_route.handler.route = new_route
# The next is for invoking additional configuration hooks on the application when the mounting of resource
# is performed
for callback in GlobalState().mount_callbacks:
callback(route, new_route)
class GlobalState(metaclass=Borg):
"""
This maintains global application state.
"""
bound_instances = weakref.WeakKeyDictionary()
mount_callbacks = []
route_callbacks = []
finish_callbacks = []
loop = None
app_dict = {}
wrappers = [context_wrapper_factory, ]
error_handlers = {404: default_404_handler}
debug = is_debug()
[docs]class Micro:
"""
Micro is an entity which is in charge of routing configuration for our application and exception handling.
"""
def __init__(self):
self.global_state = GlobalState()
self._router = Router()
self._instance = None
self._mounts = weakref.WeakValueDictionary()
if not self.global_state.loop:
self.global_state.loop = asyncio.get_event_loop()
@property
[docs] def loop(self):
"""
Event loop instance.
"""
return self.global_state.loop
@property
[docs] def router(self):
"""
An application router instance.
"""
return self._router
@property
def mounts(self):
return self._mounts
@property
[docs] def debug(self):
"""
Debug flag.
"""
return self.global_state.debug
@property
[docs] def instance(self):
"""
An instance of resource which is bound to this app, or None if app initialized not in resource scope.
"""
return self._instance
@property
[docs] def wrappers(self):
"""
A list of aiohttp`s middleware factories.
"""
return self.global_state.wrappers
@property
[docs] def error_handlers(self):
"""
Dictionary of error handlers.
"""
return self.global_state.error_handlers
def __get__(self, instance, owner):
"""
Get Micro instance which is bound to self.
"""
if instance is None:
return self
micro = self.global_state.bound_instances.get(instance, None)
if micro is None:
micro = Micro()
micro.clone(self)
self.global_state.bound_instances[instance] = micro
micro.set_instance(instance)
return micro
####
#### Dictionary interface start
####
def __getitem__(self, item):
return self.global_state.app_dict[item]
def __setitem__(self, key, value):
self.global_state.app_dict[key] = value
def __delitem__(self, key):
del self.global_state.app_dict[key]
####
#### Dictionary interface end
####
def __call__(self, *args, **kwargs):
"""Gunicorn compatibility"""
return self
@asyncio.coroutine
[docs] def finish(self):
"""
Executes the callbacks registered for finish.
"""
for cb in self.global_state.finish_callbacks:
try:
yield from maybe_coroutine(cb)
except Exception as exc:
self.loop.call_exception_handler({
'message': "Error in finish callback",
'exception': exc,
'application': self,
})
[docs] def set_debug(self, debug):
"""
Sets the debug mode.
:param debug: Boolean indicating whether to run application in debug mode or not
:type debug: bool
:return:
"""
GlobalState().debug = debug
[docs] def set_instance(self, instance):
"""
Updates the instance of resource to which the application is bound.
"""
self._instance = instance
[docs] def clone(self, micro):
"""
Clones the instance`s properties into self.
"""
self._router = micro.router
self._instance = micro.instance
self._mounts = micro.mounts
self._debug = micro.debug
[docs] def set_finish_callbacks(self, *callbacks):
"""
Add the callbacks that will be performed when application finishes.
:param callbacks:
:type callbacks: iterable
"""
for callback in callbacks:
self.global_state.finish_callbacks.append(callback)
[docs] def serve_endpoint(self, route, request, *args, **kwargs):
"""
Serve an application handler with care of matching the needed resource for methods as handlers.
:param route: A route instance
:type route: aiohttp.web_urldispatcher.Route
:param request: A request instance
:type request: micro.core.request.Request
"""
if isinstance(route, web_urldispatcher.StaticRoute):
request._keep_alive = False
return route.handler(request)
instance = self._get_instance_for_request(request)
if instance is not None:
return route.handler(instance, request, *args, **kwargs)
else:
return route.handler(request, *args, **kwargs)
[docs] def mount(self, prefix, resource):
"""
Mount the resource and maps all its routes to be accessible within prefix.
:param prefix: A string prefix, which may start with "/" (auto-prepended if not)
:type prefix: str
:param resource: A resource - an object which holds instance of :class:`Micro` in its scope.
"""
for attr_name in dir(resource):
if not attr_name.startswith('__') and not attr_name.endswith('__'):
attribute = getattr(resource, attr_name)
if isinstance(attribute, Micro):
self._add_mount(attribute.router, prefix, resource, attribute)
break
else:
raise RuntimeError("An attempt to mount resource without bound Micro instance")
[docs] def route(self, url, method=hdrs.METH_ANY, endpoint=None):
"""
Wrap the function to make it accessible via URL.
:param url: aiohttp routing expression
:type url: str or regular expression
:param method: HTTP method which should be passed onto micro.web.app.Router.add_route
:type method: string
:param endpoint: Route identifier which is used internally to maintain the state of routes.
Endpoints should be unique across the application. Defaults to wrapped function name.
:type endpoint: str
"""
def wrapper(f):
route = self._router.add_route(method, url, f, name=endpoint or f.__name__)
f.route = route
for callback in self.global_state.route_callbacks:
callback(route)
return f
return wrapper
[docs] def set_error_handler(self, status_code, callback):
"""
Set error handler callback for specific response status code.
:param status_code: status code
:param callback: error handler function
"""
self.error_handlers[status_code] = callback
[docs] def make_handler(self, *args, **kwargs):
"""
Make a protocol factory which compatible with asyncio.loop.create_server.
:return:
"""
logger.info("Initializing HipchatMicro framework")
return ApiRequestHandlerFactory(self, debug=self.debug)
[docs] def run(self, host='0.0.0.0', port=7000, name=None, configure_logging=True, socket_family=socket.AF_UNSPEC):
"""
A "run" handler, primarily for debugging purposes. This is only called when the server
is started without gunicorn.
:param host: A host where to bind. Probably either '0.0.0.0' or 'localhost'.
:param port: A port where to bind
:param name: Service name to use in logging. If none, tries to discover it from gunicorn app name.
:param configure_logging: Whether to apply default aiolocals logging configuration or not
:param socket_family The socket family which will be passed to server socket constructor
"""
if configure_logging:
aiolocals_configure_logging(name, self.debug)
loop = asyncio.get_event_loop()
create_server_task = loop.create_server(self.make_handler(), host, port, family=socket_family)
server_instance = loop.run_until_complete(create_server_task)
host, port = get_printable_sockinfo(server_instance)
logger.info("{app_name} running on {host}:{port}".format(app_name=name, host=host, port=port))
loop.run_forever()
[docs] def default_error_handler(self, exception_instance, request):
"""
Default error handler.
:param exception_instance: Instance of the exception to handle
:type exception_instance: Exception
:param request:
:type request: micro.core.request.Request
:return:
"""
logger.error('Unhandled exception: %r'.format(exception_instance))
if isinstance(exception_instance, web_exceptions.HTTPError):
status = exception_instance.status
else:
status = 500
if self.debug:
return request.protocol.handle_error(status=status, exc=exception_instance)
else:
if isinstance(exception_instance, web_exceptions.HTTPException):
return exception_instance
else:
return web_exceptions.HTTPInternalServerError(text="Internal server error")
@asyncio.coroutine
[docs] def handle_error(self, e, request, endpoint_name):
"""
Error handler.
:param e: Exception happened while processing request
:param request: micro.web.request.Request instance
:param endpoint_name: Name of endpoint which produced the request
:return:
"""
available_middlewares = middleware.get_available_middlewares(endpoint_name)
middleware_response = yield from middleware.middleware_hook(available_middlewares, 'exception', request, e)
if middleware_response is not None:
middleware_class, response = middleware_response
logger.debug("Got middleware %s response within exception hook %r" % (middleware_class.__name__, request))
return response
else:
status_code = getattr(e, 'status_code', None)
error_handler = self.error_handlers.get(status_code, self.default_error_handler)
logging.debug("Handling exception {!r} with error handler {!r}".format(e, error_handler))
return error_handler(e, request)
def _add_mount(self, child_router, prefix, resource, micro):
"""
Add mount to router, handle instance configuration.
"""
if not prefix.startswith('/'):
prefix = '/' + prefix
self._router.mount(prefix, child_router)
self._mounts[prefix if not prefix.endswith('/') else prefix.rstrip('/')] = resource
if micro.mounts:
for mount_prefix, mount in micro.mounts.items():
mount_prefix = prefix + mount_prefix
self._mounts[mount_prefix] = mount
micro.clone(self)
def _get_instance_for_request(self, request):
"""
Gets instance of resource for request with awareness for mounts.
"""
path = request.path
matches = []
for prefix in self._mounts.keys():
if path.startswith(prefix):
matches.append(prefix)
if matches:
prefix = sorted(matches, key=lambda pref: -len(pref))[0]
instance = self._mounts.get(prefix)
return instance
else:
return self._instance
global_micro = Micro()
__all__ = ["Micro", "GlobalState"]