from functools import wraps
import functools
from ._statsd import CallStatsD
from twisted.internet.defer import Deferred
from twisted.python.failure import Failure
from dace.http import Http
from twisted.internet import defer
from dace import _decorator, log
from ._agent import DependencyAgent
from twisted.web.client import HTTPConnectionPool
_dependencies = {}
__all__ = ['fault_tolerant']
def fault_tolerant(dependency_id=None, reactor=None, circuit_reset_timeout=10, load_balancer=None,
http_pool_size=10, fallback=None, timeout=5, retry_attempts=3, max_open_per_netloc=10,
retry_on_timeout=False, expected_in=None, statsd=None, use_proxy=False, context_factory=None,
connect_timeout=1):
"""
Wraps a function with a fault tolerant version that supports timeouts, retries, load balancing, and failovers.
The wrapped function can optionally take a :class:`dace.call.Call` argument, which will contain services for the
specific call.
The following services will wrap executions of the wrapped function:
1. Logging of the operation, specifically of any errors
2. Timeout tracking to ensure the operation doesn't hang
3. Rate limiting to ensure the operation doesn't hammer a particular server
4. Circuit breaker for making sure multiple errors cause the target service to not be contacted again until the
reset timeout
5. Load balancing function execution to have each operation spread load across multiple targets
6. Retry handling
7. Fallback function execution in case of failure
:param dependency_id: The id to use in stats and logs
:param reactor: The reactor to use, otherwise the default
:type reactor: :class:`twisted.internet.reactor.IReactorCore`
:param circuit_reset_timeout: How long in seconds to wait until the circuit is reset and tested again
:type circuit_reset_timeout: int
:param load_balancer: A function that returns a (protocol, netloc) pair for load balancing
:param http_pool_size: The number of concurrent HTTP connections to allow
:type http_pool_size: int
:param fallback: The function to call when the operation fails to generate the return value
:param timeout: The timeout for the entire operation execution
:param retry_attempts: The number of retry attempts to make if the operation fails
:type retry_attempts: int
:param max_open_per_netloc: The maximum number of concurrent operations per netloc
:type max_open_per_netloc: int
:param retry_on_timeout: Whether to retry the request on HTTP timeout or not
:type retry_on_timeout: bool
:param expected_in: The time the operation is expected to return in. Used to determine a circuit failure.
:type expected_in: int
:param statsd: The statsd service instance to use for stats tracking
:type use_proxy: bool
:param use_proxy: Whether to support calls through a proxy or not
:type context_factory: :class:`twisted.internet.ssl.ClientContextFactory`
:param context_factory: A custom SSL context factory
:param connect_timeout: The connection timeout for the Agent when making connections
"""
if reactor is None:
import twisted.internet
# noinspection PyUnresolvedReferences
reactor = twisted.internet.reactor
if dependency_id is not None:
dep = _dependencies.get(dependency_id)
if not dep:
dep = Dependency(dependency_id, reactor, http_pool_size=http_pool_size)
_dependencies[dependency_id] = dep
else:
dep = Dependency("__default__", reactor, http_pool_size=http_pool_size)
if load_balancer is None:
load_balancer = lambda: (None, None)
def inner(func):
call = Call(reactor, dep, run=func, fallback=fallback, timeout=timeout, retry_attempts=retry_attempts,
max_open_per_netloc=max_open_per_netloc, retry_on_timeout=retry_on_timeout, expected_in=expected_in,
statsd=statsd, load_balancer=load_balancer, circuit_reset_timeout=circuit_reset_timeout,
use_proxy=use_proxy, context_factory=context_factory, connect_timeout=connect_timeout)
call = wraps(func)(call)
return call
return inner
class Dependency(object):
def __init__(self, dependency_id, reactor, http_pool_size):
super(Dependency, self).__init__()
self.id = dependency_id
self._reactor = reactor
self._http_pool_size = http_pool_size
self._http_pool = None
@property
def http_pool(self):
if self._http_pool is None:
persistent = self._http_pool_size > 0
pool = HTTPConnectionPool(self._reactor, persistent=persistent)
# noinspection PyPep8Naming
pool.maxPersistentPerHost = self._http_pool_size
# noinspection PyPep8Naming
pool.retryAutomatically = False
self._http_pool = pool
return self._http_pool
[docs]class Call(object):
def __init__(self, reactor, dependency, run, fallback, timeout, retry_attempts, max_open_per_netloc,
retry_on_timeout, expected_in, statsd, load_balancer, circuit_reset_timeout, use_proxy,
context_factory, connect_timeout=1):
super(Call, self).__init__()
self.id = run.func_name
self.dependency = dependency
self.load_balancer = load_balancer
statsd = CallStatsD(dependency.id, self.id, statsd)
self._run = _decorator.logger(
_decorator.optional_call_arg(self, run, exception_as_failure=True))
if timeout:
self._run = _decorator.timeout(reactor, statsd, timeout, self._run)
if max_open_per_netloc:
self._run = _decorator.rate_limit(statsd, max_open_per_netloc, self._run)
if circuit_reset_timeout:
self._run = _decorator.circuit(self.id, reactor, statsd, circuit_reset_timeout, expected_in, self._run)
self._run = _decorator.load_balance(self.load_balancer, self._run)
self._run = _decorator.retry(reactor, statsd, retry_attempts + 1, retry_on_timeout, self._run)
self._fallback = _decorator.optional_call_arg(dependency, fallback)
self._timeout = timeout
self._reactor = reactor
self._agent = None
self._http = None
self._use_proxy = use_proxy
self._context_factory = context_factory
self._agent_connect_timeout = connect_timeout
@property
[docs] def agent(self):
"""
The Http agent to use for lower-level HTTP services
:rtype: :class:`twisted.web.client.Agent`
"""
if not self._agent:
self._agent = DependencyAgent(self.dependency.http_pool, self._run, self._reactor,
self._use_proxy, self._context_factory,
connect_timeout=self._agent_connect_timeout)
return self._agent
@property
[docs] def netloc(self):
"""
The load-balanced host to use for any calls
"""
return self._run.netloc
@property
[docs] def http(self):
"""
The HTTP service
:rtype: :class:`dace.http.Http`
"""
if not self._http:
self._http = Http(self.agent, reactor=self._reactor)
return self._http
def _handle_error(self, response, failure, *args, **kwargs):
if self._fallback:
try:
fb = self._fallback(*args, **kwargs)
if isinstance(fb, Deferred):
response.errback(Failure(ValueError("Fallback result cannot be a deferred")))
else:
response.callback(fb)
except Exception as e:
response.errback(Failure(e))
else:
response.errback(failure)
def __call__(self, *args, **kwargs):
response = defer.Deferred()
def eb(failure):
self._handle_error(response, failure, *args, **kwargs)
d = self._run(*args, **kwargs)
d.addCallback(response.callback)
d.addErrback(eb)
return response
def __get__(self, instance, owner):
return functools.partial(self.__call__, instance)