Source code for dace.call

from functools import wraps
import functools
from ._statsd import CallStatsD
from twisted.internet.defer import Deferred
from twisted.python.failure import Failure

from dace.http import Http
from twisted.internet import defer
from dace import _decorator, log
from ._agent import DependencyAgent
from twisted.web.client import HTTPConnectionPool


_dependencies = {}

__all__ = ['fault_tolerant']


def fault_tolerant(dependency_id=None, reactor=None, circuit_reset_timeout=10, load_balancer=None,
                   http_pool_size=10, fallback=None, timeout=5, retry_attempts=3, max_open_per_netloc=10,
                   retry_on_timeout=False, expected_in=None, statsd=None, use_proxy=False, context_factory=None,
                   connect_timeout=1):
    """
    Wraps a function with a fault tolerant version that supports timeouts, retries, load balancing, and failovers.

    The wrapped function can optionally take a :class:`dace.call.Call` argument, which will contain services for the
    specific call.

    The following services will wrap executions of the wrapped function:

    1. Logging of the operation, specifically of any errors
    2. Timeout tracking to ensure the operation doesn't hang
    3. Rate limiting to ensure the operation doesn't hammer a particular server
    4. Circuit breaker for making sure multiple errors cause the target service to not be contacted again until the
       reset timeout
    5. Load balancing function execution to have each operation spread load across multiple targets
    6. Retry handling
    7. Fallback function execution in case of failure

    :param dependency_id: The id to use in stats and logs
    :param reactor: The reactor to use, otherwise the default
    :type reactor: :class:`twisted.internet.reactor.IReactorCore`
    :param circuit_reset_timeout: How long in seconds to wait until the circuit is reset and tested again
    :type circuit_reset_timeout: int
    :param load_balancer: A function that returns a (protocol, netloc) pair for load balancing
    :param http_pool_size: The number of concurrent HTTP connections to allow
    :type http_pool_size: int
    :param fallback: The function to call when the operation fails to generate the return value
    :param timeout: The timeout for the entire operation execution
    :param retry_attempts: The number of retry attempts to make if the operation fails
    :type retry_attempts: int
    :param max_open_per_netloc: The maximum number of concurrent operations per netloc
    :type max_open_per_netloc: int
    :param retry_on_timeout: Whether to retry the request on HTTP timeout or not
    :type retry_on_timeout: bool
    :param expected_in: The time the operation is expected to return in.  Used to determine a circuit failure.
    :type expected_in: int
    :param statsd: The statsd service instance to use for stats tracking
    :type use_proxy: bool
    :param use_proxy: Whether to support calls through a proxy or not
    :type context_factory: :class:`twisted.internet.ssl.ClientContextFactory`
    :param context_factory: A custom SSL context factory
    :param connect_timeout: The connection timeout for the Agent when making connections
    """
    if reactor is None:
        import twisted.internet
        # noinspection PyUnresolvedReferences
        reactor = twisted.internet.reactor

    if dependency_id is not None:
        dep = _dependencies.get(dependency_id)
        if not dep:
            dep = Dependency(dependency_id, reactor, http_pool_size=http_pool_size)
            _dependencies[dependency_id] = dep
    else:
        dep = Dependency("__default__", reactor, http_pool_size=http_pool_size)

    if load_balancer is None:
        load_balancer = lambda: (None, None)

    def inner(func):

        call = Call(reactor, dep, run=func, fallback=fallback, timeout=timeout, retry_attempts=retry_attempts,
                    max_open_per_netloc=max_open_per_netloc, retry_on_timeout=retry_on_timeout, expected_in=expected_in,
                    statsd=statsd, load_balancer=load_balancer, circuit_reset_timeout=circuit_reset_timeout,
                    use_proxy=use_proxy, context_factory=context_factory, connect_timeout=connect_timeout)
        call = wraps(func)(call)
        return call
    return inner


class Dependency(object):
    def __init__(self, dependency_id, reactor, http_pool_size):
        super(Dependency, self).__init__()
        self.id = dependency_id
        self._reactor = reactor
        self._http_pool_size = http_pool_size

        self._http_pool = None

    @property
    def http_pool(self):
        if self._http_pool is None:
            persistent = self._http_pool_size > 0
            pool = HTTPConnectionPool(self._reactor, persistent=persistent)
            # noinspection PyPep8Naming
            pool.maxPersistentPerHost = self._http_pool_size
            # noinspection PyPep8Naming
            pool.retryAutomatically = False

            self._http_pool = pool

        return self._http_pool


[docs]class Call(object): def __init__(self, reactor, dependency, run, fallback, timeout, retry_attempts, max_open_per_netloc, retry_on_timeout, expected_in, statsd, load_balancer, circuit_reset_timeout, use_proxy, context_factory, connect_timeout=1): super(Call, self).__init__() self.id = run.func_name self.dependency = dependency self.load_balancer = load_balancer statsd = CallStatsD(dependency.id, self.id, statsd) self._run = _decorator.logger( _decorator.optional_call_arg(self, run, exception_as_failure=True)) if timeout: self._run = _decorator.timeout(reactor, statsd, timeout, self._run) if max_open_per_netloc: self._run = _decorator.rate_limit(statsd, max_open_per_netloc, self._run) if circuit_reset_timeout: self._run = _decorator.circuit(self.id, reactor, statsd, circuit_reset_timeout, expected_in, self._run) self._run = _decorator.load_balance(self.load_balancer, self._run) self._run = _decorator.retry(reactor, statsd, retry_attempts + 1, retry_on_timeout, self._run) self._fallback = _decorator.optional_call_arg(dependency, fallback) self._timeout = timeout self._reactor = reactor self._agent = None self._http = None self._use_proxy = use_proxy self._context_factory = context_factory self._agent_connect_timeout = connect_timeout @property
[docs] def agent(self): """ The Http agent to use for lower-level HTTP services :rtype: :class:`twisted.web.client.Agent` """ if not self._agent: self._agent = DependencyAgent(self.dependency.http_pool, self._run, self._reactor, self._use_proxy, self._context_factory, connect_timeout=self._agent_connect_timeout) return self._agent
@property
[docs] def netloc(self): """ The load-balanced host to use for any calls """ return self._run.netloc
@property
[docs] def http(self): """ The HTTP service :rtype: :class:`dace.http.Http` """ if not self._http: self._http = Http(self.agent, reactor=self._reactor) return self._http
def _handle_error(self, response, failure, *args, **kwargs): if self._fallback: try: fb = self._fallback(*args, **kwargs) if isinstance(fb, Deferred): response.errback(Failure(ValueError("Fallback result cannot be a deferred"))) else: response.callback(fb) except Exception as e: response.errback(Failure(e)) else: response.errback(failure) def __call__(self, *args, **kwargs): response = defer.Deferred() def eb(failure): self._handle_error(response, failure, *args, **kwargs) d = self._run(*args, **kwargs) d.addCallback(response.callback) d.addErrback(eb) return response def __get__(self, instance, owner): return functools.partial(self.__call__, instance)