import asyncio
from functools import wraps, WRAPPER_ASSIGNMENTS
import functools
import logging
from time import time
from inspect import signature
from sidecar.breaker import CircuitBreakerSet
from sidecar.error import SidecarTimeoutError, SidecarRetryRequestedError, SidecarCircuitOpenError, \
SidecarRateLimitedError, SidecarRetryExhaustedError
from aiolocals import wrap_async
log = logging.getLogger(__name__)
[docs]def optional_call_arg(call_arg, func):
"""
Allow run method to contain a 'call' argument to access its context
"""
if func is None:
return None
sig = signature(func)
@wraps_run(func)
def call(*args, **kwargs):
if 'call' in sig.parameters:
kwargs['call'] = call_arg
return func(*args, **kwargs)
return call
[docs]def timeout(statsd, timeout_secs, func):
"""
Ensure the run method executes in a certain amount of time
"""
@wraps_run(func)
@asyncio.coroutine
def call(*args, **kwargs):
netloc = call.netloc
d = wrap_async(func(*args, **kwargs))
try:
return (yield from asyncio.wait_for(d, timeout_secs))
except asyncio.TimeoutError:
statsd.increment("timeout")
log.warn("Timeout calling %s" % netloc)
raise SidecarTimeoutError()
return call
[docs]def retry(statsd, tries, retry_on_timeout, func):
"""
Will retry certain results from the run method execution
"""
@wraps_run(func)
@asyncio.coroutine
def call(*args, **kwargs):
for attempt in range(tries):
try:
statsd.increment("attempt")
d = yield from func(*args, **kwargs)
return d
except (SidecarTimeoutError, SidecarRetryRequestedError, SidecarCircuitOpenError, SidecarRateLimitedError) \
as e:
if not retry_on_timeout and isinstance(e, SidecarTimeoutError):
raise
if attempt < tries - 1:
timeout_secs = 0 if isinstance(e, SidecarCircuitOpenError) else attempt * 1.5
yield from asyncio.sleep(timeout_secs)
log.warn("Retrying %s" % func)
statsd.increment("retry_%s" % attempt)
elif tries > 1:
statsd.increment("retry_exhausted")
raise SidecarRetryExhaustedError(e.message, e.code)
else:
raise
return call
[docs]def load_balance(load_balancer, func):
"""
Retrieve load balancer results and set on function as "scheme" and "netloc"
"""
if func is None:
return None
@wraps_run(func)
@asyncio.coroutine
def call(*args, **kwargs):
lb = find_property('load_balancer', load_balancer, args, lambda: (None, None))
lb_data = lb()
call.scheme = lb_data[0]
call.netloc = lb_data[1]
call.path = '' if len(lb_data) == 2 else lb_data[2]
result = yield from func(*args, **kwargs)
return result
return call
[docs]def logger(func):
"""
Log exceptions as they get swallowed in order to return fallback results
"""
@wraps_run(func)
@asyncio.coroutine
def call(*args, **kwargs):
try:
d = yield from func(*args, **kwargs)
return d
except Exception as e:
log.info("Exception calling {}: {}".format(func, e))
raise
return call
[docs]def circuit(statsd, circuit_reset_timeout, expected_in, func):
"""
Apply a circuit breaker, keyed to the netloc from the load balancer
"""
if func is None:
return None
loop = asyncio.get_event_loop()
breaker = CircuitBreakerSet(
loop.time,
log,
reset_timeout=circuit_reset_timeout,
maxfail=3,
time_unit=60)
breaker.handle_error(Exception)
@wraps_run(func)
@asyncio.coroutine
def call(*args, **kwargs):
with breaker.context(func.netloc) as cb:
start = time()
try:
d = yield from func(*args, **kwargs)
actual = time() - start
if expected_in and actual > expected_in:
cb.error("Expected in %s, was %s" % (expected_in, actual))
statsd.increment("expected_exceeded")
statsd.timing("success", actual)
return d
except SidecarCircuitOpenError:
statsd.increment("circuit_open")
raise
except Exception:
statsd.increment("failure")
raise
return call
[docs]def rate_limit(statsd, max_per_netloc, func):
"""
Limits the number of open requests
"""
open_calls = {}
@wraps_run(func)
@asyncio.coroutine
def call(*args, **kwargs):
netloc = call.netloc
open_calls.setdefault(netloc, 0)
open_for_location = open_calls[netloc]
if open_for_location == max_per_netloc:
statsd.increment("rate_limited")
log.warn("Rate limited to %s" % netloc)
raise SidecarRateLimitedError(message="Requests to {} have exceeded the ratelimit".format(netloc))
open_calls[netloc] += 1
log.debug("start call to %s: %s", netloc, open_calls[netloc])
try:
d = yield from func(*args, **kwargs)
return d
finally:
open_calls[netloc] -= 1
log.debug("after completion of call to %s: %s", netloc, open_calls[netloc])
return call
[docs]def wraps_run(func):
"""
Ensure all run function wrappers get the same dict, as it is used for context information
"""
return wraps(func, assigned=WRAPPER_ASSIGNMENTS + ('__dict__',))
[docs]def find_property(name, value, args, default=None):
"""
Helper for the `fault_tolerant` decorator to use values from the decorated
method's bound object to fulfill the `fault_tolerant` logic.
If the decorated function is indeed an instance method of a class, this helper does one of two things:
1. If `value` is `None`, look for an attribute on the object with the given `name` to populate the value.
2. If `value` is a callable, add the `self` arg to the callable.
Args:
name (str): The attribute name to look up on the object for the value if value is `None`.
value (Any): The value to fill if `None` or to add `self` if it is a callable.
args (tuple): Positional args provided to the decorated method.
default (Any): Default value to use if the object has no attribute with the given name, or if the
decorated function is not a bound instance method.
"""
is_method = len(args) == 1
if value is None:
if is_method:
if hasattr(args[0], name):
return getattr(args[0], name)
return default
elif is_method and callable(value):
return functools.partial(value, args[0])
return value