Source code for sidecar.proxy_handler

import aiohttp
import aiohttp.errors as aiohttp_errrors
import aiohttp.web
import asyncio
from aiohttp.multidict import MultiDict
import logging
from sidecar.call import fault_tolerant
from sidecar.defaultdict import DefaultDict
from sidecar.error import SidecarTimeoutError, UnknownServiceError, ServiceUnavailableError
from aiolocals import wrap_async
import weakref

log = logging.getLogger(__name__)

FIRE_AND_FORGET_TYPE = "fire-and-forget"


[docs]class ProxyHandler: def __init__(self, statsd, service_discovery): self.statsd = statsd self.inflight_async = weakref.WeakSet() self.inflight_sync = 0 self.finished_sync = 0 self.finished_async = 0 self.service_discovery = service_discovery self._processors = DefaultDict(lambda key: self.__proxy_factory(key)) def __proxy_factory(self, key): @asyncio.coroutine def _proxy_one_request(path, request, call): """ :type request: aiohttp.web.Request :type call: sidecar.call.Call :rtype: aiohttp.web.Response """ url = "http://{netloc}{path}".format( netloc=call.netloc, path=path) headers = MultiDict(request.headers) headers['HOST'] = call.netloc version = request.version data = yield from request.read() # Here is connection need to be obtained from connection pool log.info("Proxying to backend %s", url) log.debug("Request headers: %s", headers) try: # noinspection PyUnresolvedReferences resp = yield from aiohttp.request(request.method, url, headers=headers, version=version, data=data, allow_redirects=False, connector=call.http_connector) except aiohttp_errrors.ClientConnectionError as ex: message = "Unable to contact {}: {}".format(url, ex) log.error(message) return aiohttp.web.HTTPBadGateway(reason=message) # 502 except aiohttp_errrors.ClientTimeoutError as ex: message = "Timed out reqeusting {}: {}".format(url, ex) log.error(message) return aiohttp.web.HTTPGatewayTimeout(reason=message) # 504 except aiohttp_errrors.ClientResponseError as ex: message = "Bad response from {}: {!r}. Cause: {!r}".format(url, ex, ex.__cause__) log.error(message) return aiohttp.web.HTTPBadGateway(reason=message) # 502 except aiohttp_errrors.ClientError as ex: message = "Unable to process call to {}: {!r}. Cause: {!r}".format(url, ex, ex.__cause__) log.error(message) return aiohttp.web.HTTPBadGateway(reason=message) # 502 log.debug("Response status: %s", resp.status) """ :type : aiohttp.ClientResponse """ response = aiohttp.web.Response(status=resp.status) response.enable_chunked_encoding() for hname, hval in resp.headers.items(): if hname != "TRANSFER-ENCODING" and hval != "chunked": response.headers.add(hname, hval) log.debug("Response header: %s=%s", hname, hval) response.body = yield from resp.read() log.debug("Response body: %s", response.body) return response # Treat an int as a service identifier as a port number. This is needed for our integration tests. if key.isdigit(): lb = lambda: ("http", "localhost:{}".format(int(key))) else: lb = self.service_discovery.service_load_balancer(key) return fault_tolerant(dependency_id="proxy-{}".format(key), load_balancer=lb, timeout=5, retry_attempts=10, expected_in=2, statsd=self.statsd)(_proxy_one_request) @asyncio.coroutine
[docs] def stop(self): try: yield from asyncio.wait_for(self.inflight_async, timeout=5) except Exception as ex: log.error("Unable to wait for all proxied requests: {}".format(ex))
@asyncio.coroutine
[docs] def proxy_request_handler(self, request): service_id = request.match_info.get('service_id') service_ids = service_id.split(',') path = "/{}".format(request.match_info.get('path', '')) proxy_type = request.headers.get('X-Hipchat-ServiceProxyType') if proxy_type == FIRE_AND_FORGET_TYPE: result = yield from self.proxy_request(request, service_ids, path, True) else: result = yield from self.proxy_request(request, service_ids, path, False) return result
@asyncio.coroutine
[docs] def proxy_request(self, request, service_ids, path, async=False): """ :type request: aiohttp.web.Request """ try: if not async: if len(service_ids) == 1: service_id = service_ids[0] processor = self._processors[service_id] try: self.inflight_sync += 1 response = yield from processor(path, request) self.statsd.increment("proxy.syncronous.count".format(service_id)) self.statsd.increment("proxy.syncronous.{}.count".format(service_id)) except SidecarTimeoutError: response = aiohttp.web.Response(text="Timeout waiting for operation", status=504) except UnknownServiceError: log.error("Config for the service {!r} was not found when looking up hosts".format(service_id)) self.statsd.increment("proxy.hosts.unknownServiceError.count") response = aiohttp.web.Response( text="The requested service {!r} does not exist".format(service_id), status=502) except ServiceUnavailableError: log.error("No hosts available to proxy service {!r}".format(service_id)) self.statsd.increment("proxy.hosts.serviceUnavailableError.count") self.statsd.increment("proxy.hosts.serviceUnavailableError.{}.count".format(service_id)) response = aiohttp.web.Response( text="No hosts avialable to proxy service {!r}".format(service_id), status=503) finally: self.inflight_sync -= 1 self.finished_sync += 1 else: response = aiohttp.web.Response(text="Service Id is not provided or is more than 1", status=400) else: self.statsd.increment("proxy.fireAndForget.count") for service_id in service_ids: self.statsd.increment("proxy.fireAndForget.{}.count".format(service_id)) processor = self._processors[service_id] coro = processor(path, request) task = wrap_async(coro) def cleanup_inflight(future): self.inflight_async.discard(task) self.finished_async += 1 task.add_done_callback(cleanup_inflight) self.inflight_async.add(task) response = aiohttp.web.Response(status=202) return response except Exception as e: # TODO: use @unhandled_exception_500 from web.py log.error("Unhandled/unexpected exception {!r} proxying service(s) {!r}".format(e, service_ids), exc_info=1) return aiohttp.web.Response(status=500)