ccin2p3 / samplerr

Round robin timeseries middleware based on riemann and elasticsearch
Eclipse Public License 1.0
15 stars 4 forks source link

samplerr

Introduction

The main goal of this project is to provide a means for long term relevant storage for your metrics. It borrows some of rrdtool's concepts and leverages the power of a modern storage backend: elasticsearch.

The idea is to downsample metrics to multiple sampling rates by consolidating those using meaningful aggregation functions: multiple archive stores with different resolutions. Different resolution archives are mainly useful for two reasons:

  1. Keep storage space in bounds
  2. Keep data amount in bounds at query time

Different consolidation functions (e.g. min, max, avg, etc.) are mainly useful for keeping track of what matters in the metrics you keep.

samplerr keeps storage low and client queries fast by purging high-resolution data periodically and creates elasticsearch aliases to point the clients to the highest available resolution.

elasticsearch aliases

How it works

sampler diagram

In this example, samplerr ingests a metric which has a 5s interval. It then downsamples it to 3 different archives with different consolidation functions. It keeps different retention policies for each elasticsearch index. For instance, the highest resolution data (30s) is kept for two days, while the lowest resolution (3h) is kept for 1 year. The disk footprint is the same for all three data stores.

Features

Its architecure is modular, so you can use any of its following main functions:

Implementation

The current implementation:

Installation

After cloning the repo, you can build the plugin using leiningen

lein uberjar

This will create a plugin jar named samplerr-x.y.z-SNAPSHOT-standalone.jar which you can include into your java classpath, e.g.:

java -cp /usr/lib/riemann/riemann.jar:/usr/lib/riemann/samplerr-0.1.1-SNAPSHOT-standalone-up.jar riemann.bin start /etc/riemann/riemann.config

On debian or redhat you could also add the classpath using the EXTRA_CLASSPATH variable available respectively in /etc/default/riemann or /etc/sysconfig/riemann.

Synopsis

(load-plugins)
(require '[clj-time.core :as t])

(let [elastic      (samplerr/connect {:hosts ["http://localhost:9200"]})
      index-prefix ".samplerr"
      alias-prefix "samplerr"
      cfunc        [{:func samplerr/average :name "avg"}
                    {:func samplerr/minimum :name "min"}
                    {:func samplerr/maximum :name "max"}]
      archives     [{:tf "YYYY.MM.dd" :step (t/seconds 20) :ttl   (t/days 2) :cfunc cfunc}
                    {:tf "YYYY.MM"    :step (t/minutes 10) :ttl (t/months 2) :cfunc cfunc}
                    {:tf "YYYY"       :step    (t/hours 1) :ttl (t/years 10) :cfunc cfunc}]
      rotate       (samplerr/periodically-rotate {:interval (t/days 1) :conn elastic :index-prefix index-prefix :alias-prefix alias-prefix :archives archives})
      persist      (batch 1000 10 (samplerr/persist {:index-prefix index-prefix :index-type "samplerr" :conn elastic}))]

  (streams
    (where (tagged "collectd")
      (by [:host :service]
       (samplerr/down archives persist))))
  rotate)

Usage

samplerr provides five high-level functions, two of which are stream functions.

Stream functions

(down archives & children)

This stream function splits streams by archive and consolidation functions. It conveniently passes on events to child streams, for example to send those to elasticsearch using the persist stream function.

The sequence archives should contain at least one archive. Each archive describes the aggregation that shall be performed and the target archive:

(def archives [{:tf "YYYY.MM.dd" :step   20 :cfunc cfunc}
               {:tf "YYYY.MM"    :step  600 :cfunc cfunc}
               {:tf "YYYY"       :step 3600 :cfunc cfunc}])

Consolidation functions are a hash map containing two keys:

(def cfunc [{:func samplerr/average :name avg}
            {:func samplerr/minimum :name min}
            {:func samplerr/maximum :name max}])

samplerr provides some commonly used cfuncs like average, minimum and maximum which are described in the corresponding section.

(persist options & children)

This stream function sends events processed by down to the storage backend (elasticsearch). It is configured using the hash-map options:

(def options {:index-prefix index-prefix :index-type index-type :conn es-conn-handle})

Events should contain the riemann attribute :tf which will route them to the appropriate archive.

Other functions

(connect)

This is a proxy to qbits.spandex/client

(rotate {:conn es-conn-handle :index-prefix index-prefix :alias-prefix alias-prefix :archives archives)

This will manage elasticsearch aliases. Aliases will be created for each archive by concatenating index-prefix with the :tf formatted date and will point to the first unexpired index (prefix index-prefix). Expiry is computed using the archive's :ttl. The idea behind this is that clients will query elasticsearch using the aliases. Most high-level clients (e.g. [grafana], [kibana]) can only point to one time-base index pattern, e.g. foo-YYYY.MM.dd.

samplerr will transparently position aliases pointing to the highest possible resolution archive that overlaps with it and that is not expired. The algorithm is roughly the following:

The usual way to use this function is either:

(periodically-rotate {:interval periodicity :conn es-conn-handle :index-prefix index-prefix :alias-prefix alias-prefix :archives archives)

This function will call rotate every periodicity time interval. The first argument should be given in terms of a org.joda.time/PeriodType object conventiently provided by clj-time.core using e.g. hours, days, etc.

Note that the first rotation will not take effect immediately after riemann startup. Also note that configuration reloads will work as expected.

Example

Take the example in the synopsis section. Let's say today is 2016-02-01 at 03:14 PM and riemann started exactly 2 days ago. samplerr/rotate fires up and processes the elasticsearch indices:

(purge {:conn es-conn-handle :index-prefix index-prefix :archives archives)

This function will DELETE expired indices. Use with care.

The usual way to use this function is either:

(periodically-purge {:interval periodicity :conn es-conn-handle :index-prefix index-prefix :archives archives)

This function will call purge periodically.

Development

At the time of writing the contributors of this project are Fabien Wernli and some code from the elasticsearch integration was borrowed from tnn1t1s which itself borrowed from kiries.