# statsd.py
# Based on example from https://github.com/etsy/statsd/
# Key format: apps.<app>.<server>.<app-key>
# e.g.: apps.tetra-app.bacon.request
import asyncio
import logging
import random
log = logging.getLogger(__name__)
[docs]class StatsdProtocol(asyncio.DatagramProtocol):
def __init__(self, prefix):
self.transport = None
""" :type : asyncio.DatagramTransport """
self.prefix = prefix
[docs] def connection_made(self, transport):
self.transport = transport
[docs] def connection_lost(self, exc):
self.transport = False
[docs] def send(self, data, sample_rate=1):
"""Squirt the metrics over UDP"""
sampled_data = {}
if sample_rate < 1:
if random.random() <= sample_rate:
for stat in data.keys():
value = data[stat]
sampled_data[stat] = "%s|@%s" % (value, sample_rate)
else:
sampled_data = data
try:
for stat in sampled_data.keys():
value = data[stat]
send_data = "%s.%s:%s" % (self.prefix, stat, value)
self.transport.sendto(send_data.encode('utf-8'))
except Exception as e:
log.error("Statsd error %r" % e)
[docs]class StatsD:
"""Sends statistics to the stats daemon over UDP"""
def __init__(self, loop, my_hostname, host, port, prefix):
self.loop = loop
self.host = host
self.port = port
self.prefix = "apps.%s.%s" % (prefix, my_hostname)
self.protocol = None
""" :type : StatsdProtocol """
@asyncio.coroutine
[docs] def start(self):
log.info('Creating StatsD connection to %s:%s with prefix=%s' % (self.host, self.port, self.prefix))
_, self.protocol = yield from self.loop.create_datagram_endpoint(lambda: StatsdProtocol(self.prefix),
remote_addr=(self.host, self.port))
[docs] def timing(self, stats, time, sample_rate=1):
"""Log timing information
>>> # noinspection PyUnresolvedReferences
>>> timing('some.time', '500')
"""
self._update_stats(stats, time, sample_rate, 'ms')
[docs] def increment(self, stats, sample_rate=1, value=1):
""" Increments one or more stats counters
>>> # noinspection PyUnresolvedReferences
>>> increment('some.int')
>>> # noinspection PyUnresolvedReferences
>>> increment('some.int', 0.5)
"""
self._update_stats(stats, 1, sample_rate)
[docs] def decrement(self, stats, sample_rate=1, value=1):
"""Decrements one or more stats counters
>>> # noinspection PyUnresolvedReferences
>>> decrement('some.int')
"""
self._update_stats(stats, -1, sample_rate)
[docs] def gauge(self, stats, value, sample_rate=1):
"""Sets one or more gauges to a value
>>> # noinspection PyUnresolvedReferences
>>> gauge('some.int', 'some_value')
"""
self._update_stats(stats, value, sample_rate, 'g')
def _update_stats(self, stats, delta=1, sample_rate=1, metric='c'):
"""
Updates a single stat counter by arbitrary amounts
>>> _update_stats('some.int', 10)
:param stat: The stat string to update
:type stat: str
:param value: The quantity to assign to the stat.
:type value: string
:param sample_rate: The rate at which to sample. A value of 1
means to capture all submitted metrics, a value of 0.1 means
10%, etc. A value of > 1 is nonsense, so don't do that.
:type sample_rate: int | float
:param metric: The magic value to indicate to graphite
how the data are to be interpreted.
:type metric: basestring
"""
if sample_rate < 1 and random.random() > sample_rate:
return
if type(stats) is not list:
stats = [stats]
data = {}
for stat in stats:
data[stat] = "%s|%s" % (delta, metric)
if sample_rate < 1:
data[stat] = "{}|@{}".format(data, sample_rate)
self.protocol.send(data, sample_rate)
[docs]class WrappedStatsD:
"""Sends statistics to the stats daemon over UDP"""
def __init__(self, key_pattern, delegate):
"""
:param key_pattern: The key pattern. Must contain {stats}
:type key_pattern:
:param delegate:
:type delegate:
:return:
:rtype:
"""
self.delegate = delegate
self.key = key_pattern
pass
[docs] def timing(self, stats, time, sample_rate=1):
"""Log timing information
>>> # noinspection PyUnresolvedReferences
>>> timing('some.time', '500')
"""
key = self.key.format(stats=stats)
if self.delegate:
delegate = self.delegate if not callable(self.delegate) else self.delegate()
delegate.timing(stats, time, sample_rate)
else:
log.debug("Stats: %s %s", key, time)
[docs] def increment(self, stats, value=1, sample_rate=1):
""" Increments one or more stats counters
>>> # noinspection PyUnresolvedReferences
>>> increment('some.int')
>>> # noinspection PyUnresolvedReferences
>>> increment('some.int', 0.5)
"""
key = self.key.format(stats=stats)
if self.delegate:
delegate = self.delegate if not callable(self.delegate) else self.delegate()
delegate.increment(stats, sample_rate, value)
else:
log.debug("Stats: %s %s", key, value)
[docs] def decrement(self, stats, sample_rate=1, value=1):
"""Decrements one or more stats counters
>>> # noinspection PyUnresolvedReferences
>>> decrement('some.int')
"""
key = self.key.format(stats=stats)
if self.delegate:
delegate = self.delegate if not callable(self.delegate) else self.delegate()
delegate.decrement(stats, sample_rate, value)
else:
log.debug("Stats: %s %s", key, value)
[docs] def gauge(self, stats, value=1, sample_rate=1):
"""Sets one or more gauges to a value
>>> # noinspection PyUnresolvedReferences
>>> gauge('some.int', 'some_value')
"""
key = self.key.format(stats=stats)
if self.delegate:
delegate = self.delegate if not callable(self.delegate) else self.delegate()
delegate.gauge(stats, sample_rate, value)
else:
log.debug("Stats: %s %s", key, value)
[docs]class MultiStatsD:
"""Sends statistics to statsd and / or dogstatsd, only update one stat at a time."""
def __init__(self, statsd=None, statsd_enabled=False,
dogstatsd=None, dogstatsd_enabled=False):
"""
:param statsd: a statsd object
:type statsd: txhipchat.storage.StatsD
:param statsd_enabled: whether to send metrics to plain statsd
:type statsd_enabled: bool
:param dogstatsd: a datadog statsd object
:type dogstatsd: datadog.statsd
:param dogstatsd_enabled: whether to send metrics to datadog
:type dogstatsd_enabled: bool
"""
self.statsd = statsd
self.statsd_enabled = statsd_enabled and statsd is not None
self.dogstatsd = dogstatsd
self.dogstatsd_enabled = dogstatsd_enabled and dogstatsd is not None
@asyncio.coroutine
[docs] def start(self):
yield from self.statsd.start()
[docs] def timing(self, stat, time, sample_rate=1, tags=None):
"""Log timing information
>>> timing('some.time', '500')
"""
if self.statsd_enabled:
self.statsd.timing(stat, time, sample_rate)
if self.dogstatsd_enabled:
self.dogstatsd.timing(stat, time, tags, sample_rate)
[docs] def increment(self, stat, sample_rate=1, value=1, tags=None):
""" Increments a single counter
>>> increment('some.int')
>>> increment('some.int', 0.5)
"""
if self.statsd_enabled:
self.statsd.increment(stat, sample_rate, value)
if self.dogstatsd_enabled:
self.dogstatsd.increment(stat, value, tags, sample_rate)
[docs] def decrement(self, stat, sample_rate=1, value=1, tags=None):
"""Decrements a single counter
>>> decrement('some.int')
"""
if self.statsd_enabled:
self.statsd.decrement(stat, sample_rate, value)
if self.dogstatsd_enabled:
self.dogstatsd.decrement(stat, value, tags, sample_rate)
[docs] def gauge(self, stat, value, sample_rate=1, tags=None):
"""Sets a single gauge to a value
>>> gauge('some.int', 'some_value')
"""
if self.statsd_enabled:
self.statsd.gauge(stat, value, sample_rate)
if self.dogstatsd_enabled:
self.dogstatsd.gauge(stat, value, tags, sample_rate)