import aiohttp
import aiohttp.errors as aiohttp_errrors
import aiohttp.web
import asyncio
from aiohttp.multidict import MultiDict
import logging
from sidecar.call import fault_tolerant
from sidecar.defaultdict import DefaultDict
from sidecar.error import SidecarTimeoutError, UnknownServiceError, ServiceUnavailableError
from aiolocals import wrap_async
import weakref
log = logging.getLogger(__name__)
FIRE_AND_FORGET_TYPE = "fire-and-forget"
[docs]class ProxyHandler:
def __init__(self, statsd, service_discovery):
self.statsd = statsd
self.inflight_async = weakref.WeakSet()
self.inflight_sync = 0
self.finished_sync = 0
self.finished_async = 0
self.service_discovery = service_discovery
self._processors = DefaultDict(lambda key: self.__proxy_factory(key))
def __proxy_factory(self, key):
@asyncio.coroutine
def _proxy_one_request(path, request, call):
"""
:type request: aiohttp.web.Request
:type call: sidecar.call.Call
:rtype: aiohttp.web.Response
"""
url = "http://{netloc}{path}".format(
netloc=call.netloc,
path=path)
headers = MultiDict(request.headers)
headers['HOST'] = call.netloc
version = request.version
data = yield from request.read()
# Here is connection need to be obtained from connection pool
log.info("Proxying to backend %s", url)
log.debug("Request headers: %s", headers)
try:
# noinspection PyUnresolvedReferences
resp = yield from aiohttp.request(request.method,
url,
headers=headers,
version=version,
data=data,
allow_redirects=False,
connector=call.http_connector)
except aiohttp_errrors.ClientConnectionError as ex:
message = "Unable to contact {}: {}".format(url, ex)
log.error(message)
return aiohttp.web.HTTPBadGateway(reason=message) # 502
except aiohttp_errrors.ClientTimeoutError as ex:
message = "Timed out reqeusting {}: {}".format(url, ex)
log.error(message)
return aiohttp.web.HTTPGatewayTimeout(reason=message) # 504
except aiohttp_errrors.ClientResponseError as ex:
message = "Bad response from {}: {!r}. Cause: {!r}".format(url, ex, ex.__cause__)
log.error(message)
return aiohttp.web.HTTPBadGateway(reason=message) # 502
except aiohttp_errrors.ClientError as ex:
message = "Unable to process call to {}: {!r}. Cause: {!r}".format(url, ex, ex.__cause__)
log.error(message)
return aiohttp.web.HTTPBadGateway(reason=message) # 502
log.debug("Response status: %s", resp.status)
""" :type : aiohttp.ClientResponse """
response = aiohttp.web.Response(status=resp.status)
response.enable_chunked_encoding()
for hname, hval in resp.headers.items():
if hname != "TRANSFER-ENCODING" and hval != "chunked":
response.headers.add(hname, hval)
log.debug("Response header: %s=%s", hname, hval)
response.body = yield from resp.read()
log.debug("Response body: %s", response.body)
return response
# Treat an int as a service identifier as a port number. This is needed for our integration tests.
if key.isdigit():
lb = lambda: ("http", "localhost:{}".format(int(key)))
else:
lb = self.service_discovery.service_load_balancer(key)
return fault_tolerant(dependency_id="proxy-{}".format(key),
load_balancer=lb,
timeout=5,
retry_attempts=10,
expected_in=2,
statsd=self.statsd)(_proxy_one_request)
@asyncio.coroutine
[docs] def stop(self):
try:
yield from asyncio.wait_for(self.inflight_async, timeout=5)
except Exception as ex:
log.error("Unable to wait for all proxied requests: {}".format(ex))
@asyncio.coroutine
[docs] def proxy_request_handler(self, request):
service_id = request.match_info.get('service_id')
service_ids = service_id.split(',')
path = "/{}".format(request.match_info.get('path', ''))
proxy_type = request.headers.get('X-Hipchat-ServiceProxyType')
if proxy_type == FIRE_AND_FORGET_TYPE:
result = yield from self.proxy_request(request, service_ids, path, True)
else:
result = yield from self.proxy_request(request, service_ids, path, False)
return result
@asyncio.coroutine
[docs] def proxy_request(self, request, service_ids, path, async=False):
"""
:type request: aiohttp.web.Request
"""
try:
if not async:
if len(service_ids) == 1:
service_id = service_ids[0]
processor = self._processors[service_id]
try:
self.inflight_sync += 1
response = yield from processor(path, request)
self.statsd.increment("proxy.syncronous.count".format(service_id))
self.statsd.increment("proxy.syncronous.{}.count".format(service_id))
except SidecarTimeoutError:
response = aiohttp.web.Response(text="Timeout waiting for operation", status=504)
except UnknownServiceError:
log.error("Config for the service {!r} was not found when looking up hosts".format(service_id))
self.statsd.increment("proxy.hosts.unknownServiceError.count")
response = aiohttp.web.Response(
text="The requested service {!r} does not exist".format(service_id), status=502)
except ServiceUnavailableError:
log.error("No hosts available to proxy service {!r}".format(service_id))
self.statsd.increment("proxy.hosts.serviceUnavailableError.count")
self.statsd.increment("proxy.hosts.serviceUnavailableError.{}.count".format(service_id))
response = aiohttp.web.Response(
text="No hosts avialable to proxy service {!r}".format(service_id), status=503)
finally:
self.inflight_sync -= 1
self.finished_sync += 1
else:
response = aiohttp.web.Response(text="Service Id is not provided or is more than 1", status=400)
else:
self.statsd.increment("proxy.fireAndForget.count")
for service_id in service_ids:
self.statsd.increment("proxy.fireAndForget.{}.count".format(service_id))
processor = self._processors[service_id]
coro = processor(path, request)
task = wrap_async(coro)
def cleanup_inflight(future):
self.inflight_async.discard(task)
self.finished_async += 1
task.add_done_callback(cleanup_inflight)
self.inflight_async.add(task)
response = aiohttp.web.Response(status=202)
return response
except Exception as e:
# TODO: use @unhandled_exception_500 from web.py
log.error("Unhandled/unexpected exception {!r} proxying service(s) {!r}".format(e, service_ids), exc_info=1)
return aiohttp.web.Response(status=500)