Source code for sidecar.decorator

import asyncio
from functools import wraps, WRAPPER_ASSIGNMENTS
import functools
import logging
from time import time
from inspect import signature
from sidecar.breaker import CircuitBreakerSet
from sidecar.error import SidecarTimeoutError, SidecarRetryRequestedError, SidecarCircuitOpenError, \
    SidecarRateLimitedError, SidecarRetryExhaustedError
from aiolocals import wrap_async


log = logging.getLogger(__name__)


[docs]def optional_call_arg(call_arg, func): """ Allow run method to contain a 'call' argument to access its context """ if func is None: return None sig = signature(func) @wraps_run(func) def call(*args, **kwargs): if 'call' in sig.parameters: kwargs['call'] = call_arg return func(*args, **kwargs) return call
[docs]def timeout(statsd, timeout_secs, func): """ Ensure the run method executes in a certain amount of time """ @wraps_run(func) @asyncio.coroutine def call(*args, **kwargs): netloc = call.netloc d = wrap_async(func(*args, **kwargs)) try: return (yield from asyncio.wait_for(d, timeout_secs)) except asyncio.TimeoutError: statsd.increment("timeout") log.warn("Timeout calling %s" % netloc) raise SidecarTimeoutError() return call
[docs]def retry(statsd, tries, retry_on_timeout, func): """ Will retry certain results from the run method execution """ @wraps_run(func) @asyncio.coroutine def call(*args, **kwargs): for attempt in range(tries): try: statsd.increment("attempt") d = yield from func(*args, **kwargs) return d except (SidecarTimeoutError, SidecarRetryRequestedError, SidecarCircuitOpenError, SidecarRateLimitedError) \ as e: if not retry_on_timeout and isinstance(e, SidecarTimeoutError): raise if attempt < tries - 1: timeout_secs = 0 if isinstance(e, SidecarCircuitOpenError) else attempt * 1.5 yield from asyncio.sleep(timeout_secs) log.warn("Retrying %s" % func) statsd.increment("retry_%s" % attempt) elif tries > 1: statsd.increment("retry_exhausted") raise SidecarRetryExhaustedError(e.message, e.code) else: raise return call
[docs]def load_balance(load_balancer, func): """ Retrieve load balancer results and set on function as "scheme" and "netloc" """ if func is None: return None @wraps_run(func) @asyncio.coroutine def call(*args, **kwargs): lb = find_property('load_balancer', load_balancer, args, lambda: (None, None)) lb_data = lb() call.scheme = lb_data[0] call.netloc = lb_data[1] call.path = '' if len(lb_data) == 2 else lb_data[2] result = yield from func(*args, **kwargs) return result return call
[docs]def logger(func): """ Log exceptions as they get swallowed in order to return fallback results """ @wraps_run(func) @asyncio.coroutine def call(*args, **kwargs): try: d = yield from func(*args, **kwargs) return d except Exception as e: log.info("Exception calling {}: {}".format(func, e)) raise return call
[docs]def circuit(statsd, circuit_reset_timeout, expected_in, func): """ Apply a circuit breaker, keyed to the netloc from the load balancer """ if func is None: return None loop = asyncio.get_event_loop() breaker = CircuitBreakerSet( loop.time, log, reset_timeout=circuit_reset_timeout, maxfail=3, time_unit=60) breaker.handle_error(Exception) @wraps_run(func) @asyncio.coroutine def call(*args, **kwargs): with breaker.context(func.netloc) as cb: start = time() try: d = yield from func(*args, **kwargs) actual = time() - start if expected_in and actual > expected_in: cb.error("Expected in %s, was %s" % (expected_in, actual)) statsd.increment("expected_exceeded") statsd.timing("success", actual) return d except SidecarCircuitOpenError: statsd.increment("circuit_open") raise except Exception: statsd.increment("failure") raise return call
[docs]def rate_limit(statsd, max_per_netloc, func): """ Limits the number of open requests """ open_calls = {} @wraps_run(func) @asyncio.coroutine def call(*args, **kwargs): netloc = call.netloc open_calls.setdefault(netloc, 0) open_for_location = open_calls[netloc] if open_for_location == max_per_netloc: statsd.increment("rate_limited") log.warn("Rate limited to %s" % netloc) raise SidecarRateLimitedError(message="Requests to {} have exceeded the ratelimit".format(netloc)) open_calls[netloc] += 1 log.debug("start call to %s: %s", netloc, open_calls[netloc]) try: d = yield from func(*args, **kwargs) return d finally: open_calls[netloc] -= 1 log.debug("after completion of call to %s: %s", netloc, open_calls[netloc]) return call
[docs]def wraps_run(func): """ Ensure all run function wrappers get the same dict, as it is used for context information """ return wraps(func, assigned=WRAPPER_ASSIGNMENTS + ('__dict__',))
[docs]def find_property(name, value, args, default=None): """ Helper for the `fault_tolerant` decorator to use values from the decorated method's bound object to fulfill the `fault_tolerant` logic. If the decorated function is indeed an instance method of a class, this helper does one of two things: 1. If `value` is `None`, look for an attribute on the object with the given `name` to populate the value. 2. If `value` is a callable, add the `self` arg to the callable. Args: name (str): The attribute name to look up on the object for the value if value is `None`. value (Any): The value to fill if `None` or to add `self` if it is a callable. args (tuple): Positional args provided to the decorated method. default (Any): Default value to use if the object has no attribute with the given name, or if the decorated function is not a bound instance method. """ is_method = len(args) == 1 if value is None: if is_method: if hasattr(args[0], name): return getattr(args[0], name) return default elif is_method and callable(value): return functools.partial(value, args[0]) return value