Source code for sidecar.statsd

# statsd.py
# Based on example from https://github.com/etsy/statsd/

# Key format: apps.<app>.<server>.<app-key>
# e.g.: apps.tetra-app.bacon.request
import asyncio
import logging
import random


log = logging.getLogger(__name__)


[docs]class StatsdProtocol(asyncio.DatagramProtocol): def __init__(self, prefix): self.transport = None """ :type : asyncio.DatagramTransport """ self.prefix = prefix
[docs] def connection_made(self, transport): self.transport = transport
[docs] def connection_lost(self, exc): self.transport = False
[docs] def send(self, data, sample_rate=1): """Squirt the metrics over UDP""" sampled_data = {} if sample_rate < 1: if random.random() <= sample_rate: for stat in data.keys(): value = data[stat] sampled_data[stat] = "%s|@%s" % (value, sample_rate) else: sampled_data = data try: for stat in sampled_data.keys(): value = data[stat] send_data = "%s.%s:%s" % (self.prefix, stat, value) self.transport.sendto(send_data.encode('utf-8')) except Exception as e: log.error("Statsd error %r" % e)
[docs]class StatsD: """Sends statistics to the stats daemon over UDP""" def __init__(self, loop, my_hostname, host, port, prefix): self.loop = loop self.host = host self.port = port self.prefix = "apps.%s.%s" % (prefix, my_hostname) self.protocol = None """ :type : StatsdProtocol """ @asyncio.coroutine
[docs] def start(self): log.info('Creating StatsD connection to %s:%s with prefix=%s' % (self.host, self.port, self.prefix)) _, self.protocol = yield from self.loop.create_datagram_endpoint(lambda: StatsdProtocol(self.prefix), remote_addr=(self.host, self.port))
[docs] def timing(self, stats, time, sample_rate=1): """Log timing information >>> # noinspection PyUnresolvedReferences >>> timing('some.time', '500') """ self._update_stats(stats, time, sample_rate, 'ms')
[docs] def increment(self, stats, sample_rate=1, value=1): """ Increments one or more stats counters >>> # noinspection PyUnresolvedReferences >>> increment('some.int') >>> # noinspection PyUnresolvedReferences >>> increment('some.int', 0.5) """ self._update_stats(stats, 1, sample_rate)
[docs] def decrement(self, stats, sample_rate=1, value=1): """Decrements one or more stats counters >>> # noinspection PyUnresolvedReferences >>> decrement('some.int') """ self._update_stats(stats, -1, sample_rate)
[docs] def gauge(self, stats, value, sample_rate=1): """Sets one or more gauges to a value >>> # noinspection PyUnresolvedReferences >>> gauge('some.int', 'some_value') """ self._update_stats(stats, value, sample_rate, 'g')
def _update_stats(self, stats, delta=1, sample_rate=1, metric='c'): """ Updates a single stat counter by arbitrary amounts >>> _update_stats('some.int', 10) :param stat: The stat string to update :type stat: str :param value: The quantity to assign to the stat. :type value: string :param sample_rate: The rate at which to sample. A value of 1 means to capture all submitted metrics, a value of 0.1 means 10%, etc. A value of > 1 is nonsense, so don't do that. :type sample_rate: int | float :param metric: The magic value to indicate to graphite how the data are to be interpreted. :type metric: basestring """ if sample_rate < 1 and random.random() > sample_rate: return if type(stats) is not list: stats = [stats] data = {} for stat in stats: data[stat] = "%s|%s" % (delta, metric) if sample_rate < 1: data[stat] = "{}|@{}".format(data, sample_rate) self.protocol.send(data, sample_rate)
[docs]class WrappedStatsD: """Sends statistics to the stats daemon over UDP""" def __init__(self, key_pattern, delegate): """ :param key_pattern: The key pattern. Must contain {stats} :type key_pattern: :param delegate: :type delegate: :return: :rtype: """ self.delegate = delegate self.key = key_pattern pass
[docs] def timing(self, stats, time, sample_rate=1): """Log timing information >>> # noinspection PyUnresolvedReferences >>> timing('some.time', '500') """ key = self.key.format(stats=stats) if self.delegate: delegate = self.delegate if not callable(self.delegate) else self.delegate() delegate.timing(stats, time, sample_rate) else: log.debug("Stats: %s %s", key, time)
[docs] def increment(self, stats, value=1, sample_rate=1): """ Increments one or more stats counters >>> # noinspection PyUnresolvedReferences >>> increment('some.int') >>> # noinspection PyUnresolvedReferences >>> increment('some.int', 0.5) """ key = self.key.format(stats=stats) if self.delegate: delegate = self.delegate if not callable(self.delegate) else self.delegate() delegate.increment(stats, sample_rate, value) else: log.debug("Stats: %s %s", key, value)
[docs] def decrement(self, stats, sample_rate=1, value=1): """Decrements one or more stats counters >>> # noinspection PyUnresolvedReferences >>> decrement('some.int') """ key = self.key.format(stats=stats) if self.delegate: delegate = self.delegate if not callable(self.delegate) else self.delegate() delegate.decrement(stats, sample_rate, value) else: log.debug("Stats: %s %s", key, value)
[docs] def gauge(self, stats, value=1, sample_rate=1): """Sets one or more gauges to a value >>> # noinspection PyUnresolvedReferences >>> gauge('some.int', 'some_value') """ key = self.key.format(stats=stats) if self.delegate: delegate = self.delegate if not callable(self.delegate) else self.delegate() delegate.gauge(stats, sample_rate, value) else: log.debug("Stats: %s %s", key, value)
[docs]class MultiStatsD: """Sends statistics to statsd and / or dogstatsd, only update one stat at a time.""" def __init__(self, statsd=None, statsd_enabled=False, dogstatsd=None, dogstatsd_enabled=False): """ :param statsd: a statsd object :type statsd: txhipchat.storage.StatsD :param statsd_enabled: whether to send metrics to plain statsd :type statsd_enabled: bool :param dogstatsd: a datadog statsd object :type dogstatsd: datadog.statsd :param dogstatsd_enabled: whether to send metrics to datadog :type dogstatsd_enabled: bool """ self.statsd = statsd self.statsd_enabled = statsd_enabled and statsd is not None self.dogstatsd = dogstatsd self.dogstatsd_enabled = dogstatsd_enabled and dogstatsd is not None @asyncio.coroutine
[docs] def start(self): yield from self.statsd.start()
[docs] def timing(self, stat, time, sample_rate=1, tags=None): """Log timing information >>> timing('some.time', '500') """ if self.statsd_enabled: self.statsd.timing(stat, time, sample_rate) if self.dogstatsd_enabled: self.dogstatsd.timing(stat, time, tags, sample_rate)
[docs] def increment(self, stat, sample_rate=1, value=1, tags=None): """ Increments a single counter >>> increment('some.int') >>> increment('some.int', 0.5) """ if self.statsd_enabled: self.statsd.increment(stat, sample_rate, value) if self.dogstatsd_enabled: self.dogstatsd.increment(stat, value, tags, sample_rate)
[docs] def decrement(self, stat, sample_rate=1, value=1, tags=None): """Decrements a single counter >>> decrement('some.int') """ if self.statsd_enabled: self.statsd.decrement(stat, sample_rate, value) if self.dogstatsd_enabled: self.dogstatsd.decrement(stat, value, tags, sample_rate)
[docs] def gauge(self, stat, value, sample_rate=1, tags=None): """Sets a single gauge to a value >>> gauge('some.int', 'some_value') """ if self.statsd_enabled: self.statsd.gauge(stat, value, sample_rate) if self.dogstatsd_enabled: self.dogstatsd.gauge(stat, value, tags, sample_rate)