influxdata / kapacitor

Open source framework for processing, monitoring, and alerting on time series data
MIT License
2.32k stars 492 forks source link

shift on stream does not work on task built using chronograf #1880

Open steveh999 opened 6 years ago

steveh999 commented 6 years ago

Hi First, thanks for creating such an amazing stack, it's truly astoundingly good.

I've been trying to set up alerting using kapacitor streams and have been tearing my hair out for 2 days. I have previously successfuly created a batch query that compares a current mean value with the same mean value from an hour previous...the guts of it is this :

var past = batch
        |query('select mean(traf_i_total) from "network_summ"."thirty_days".network_summ')
        .period(1m)
        .every(30s)
        .offset(1h)
        .align()
        .groupBy(*)
var current = batch
        |query('select mean(traf_i_total) from "network_summ"."thirty_days".network_summ')
        .period(1m)
        .every(30s)
        .align()
        .groupBy(*)

...and it works a treat

However when I try to so the same thing with stream data the values from "past" are always the same as the values from "current". I've even tried using chronograf to create the following task(and added the writebacks to the "backtest" db as I found in an example), but the results clearly show the values are the same.

the tickscript is :

var db = 'network_summ'

var rp = 'thirty_days'

var measurement = 'network_summ'

var groupBy = ['endpoint']

var whereFilter = lambda: TRUE

var period = 1m

var every = 1m

var name = 'madness'

var idVar = name + ':{{.Group}}'

var message = ' {{.ID}} {{.Name}} {{.TaskName}} {{.Group}} {{.Tags}} {{.Level}} {{ index .Fields "value" }} {{.Time}}'

var idTag = 'alertID'

var levelTag = 'level'

var messageField = 'message'

var durationField = 'duration'

var triggerType = 'relative'

var shift = 1h

var crit = 0.001

var data = stream
    |from()
        .database(db)
        .retentionPolicy(rp)
        .measurement(measurement)
        .groupBy(groupBy)
        .where(whereFilter)
    |window()
        .period(period)
        .every(every)
        .align()
    |mean('traf_i_total')
        .as('value')

var past = data
    |shift(shift)

var current = data

var trigger = past
    |join(current)
        .as('past', 'current')
    |eval(lambda: abs(float("current.value" - "past.value")) / float("past.value") * 100.0)
        .keep()
        .as('value')
    |alert()
        .crit(lambda: "value" > crit)
        .stateChangesOnly()
        .message(message)
        .id(idVar)
        .idTag(idTag)
        .levelTag(levelTag)
        .messageField(messageField)
        .durationField(durationField)
        .log('/var/tmp/madness.log')

trigger
    |influxDBOut()
        .database('madness')
        .create()
        .retentionPolicy('autogen')
        .measurement('backtestvals')
        .tag('query', 'joined')

past
    |influxDBOut()
        .database('madness')
        .create()
        .retentionPolicy('autogen')
        .measurement('backtestvals')
        .tag('query', 'baselineAvg')

current
    |influxDBOut()
        .database('madness')
        .create()
        .retentionPolicy('autogen')
        .measurement('backtestvals')
        .tag('query', 'currentAvg')

and the output in the backtestvals db :

> select * from backtestvals where endpoint='jp89667-tok-9888' or endpoint='de27341-fra-9876' or endpoint='fr19556-par-9874' order by time asc;
name: backtestvals
time                 endpoint         query       value
----                 --------         -----       -----
2018-03-30T12:35:00Z jp89667-tok-9888 currentAvg  1924.25
2018-03-30T12:35:00Z de27341-fra-9876 currentAvg  2160.4285714285716
2018-03-30T12:35:00Z fr19556-par-9874 currentAvg  2218.875
2018-03-30T12:36:00Z de27341-fra-9876 currentAvg  2219.529411764706
2018-03-30T12:36:00Z fr19556-par-9874 currentAvg  2523.266666666667
2018-03-30T12:36:00Z jp89667-tok-9888 currentAvg  2393.3333333333335
2018-03-30T12:37:00Z fr19556-par-9874 currentAvg  2206.4
2018-03-30T12:37:00Z jp89667-tok-9888 currentAvg  2175.35
2018-03-30T12:37:00Z de27341-fra-9876 currentAvg  2154.65
2018-03-30T12:38:00Z jp89667-tok-9888 currentAvg  2270.5
2018-03-30T12:38:00Z de27341-fra-9876 currentAvg  2519.133333333333
2018-03-30T12:38:00Z fr19556-par-9874 currentAvg  2232.4117647058824
2018-03-30T12:39:00Z de27341-fra-9876 currentAvg  2145.9473684210525
2018-03-30T12:39:00Z fr19556-par-9874 currentAvg  2256.3684210526317
2018-03-30T12:39:00Z jp89667-tok-9888 currentAvg  2286.2105263157896
2018-03-30T13:35:00Z fr19556-par-9874 baselineAvg 2218.875
2018-03-30T13:35:00Z jp89667-tok-9888 baselineAvg 1924.25
2018-03-30T13:35:00Z de27341-fra-9876 baselineAvg 2160.4285714285716
2018-03-30T13:36:00Z jp89667-tok-9888 baselineAvg 2393.3333333333335
2018-03-30T13:36:00Z de27341-fra-9876 baselineAvg 2219.529411764706
2018-03-30T13:36:00Z fr19556-par-9874 baselineAvg 2523.266666666667
2018-03-30T13:37:00Z de27341-fra-9876 baselineAvg 2154.65
2018-03-30T13:37:00Z fr19556-par-9874 baselineAvg 2206.4
2018-03-30T13:37:00Z jp89667-tok-9888 baselineAvg 2175.35
2018-03-30T13:38:00Z fr19556-par-9874 baselineAvg 2232.4117647058824
2018-03-30T13:38:00Z jp89667-tok-9888 baselineAvg 2270.5
2018-03-30T13:38:00Z de27341-fra-9876 baselineAvg 2519.133333333333
2018-03-30T13:39:00Z jp89667-tok-9888 baselineAvg 2286.2105263157896
2018-03-30T13:39:00Z de27341-fra-9876 baselineAvg 2145.9473684210525
2018-03-30T13:39:00Z fr19556-par-9874 baselineAvg 2256.3684210526317
> 

I've been thru the docs and looked at lots of examples so I guess I must be misunderstanding something :(

I am using influxdb 1.3.7, Telegraf 1.5.2 and Kapacitor 1.4.0

any pointers greatly appreciated

Many Thanks !!

Steve

stanislav-zaprudskiy commented 6 years ago

Looks like what shift() effectively does is it just shifts points' time for the duration specified. But does not make points from a shifted window() to cache, which we expect it to do.

Some other side thing is that positive duration shifts time forward, where negative - backward in time (your samples show that, and the documentation does too - https://docs.influxdata.com/kapacitor/v1.5/nodes/shift_node).

I found an example which shows that all that was not the case in the past - https://www.influxdata.com/blog/tldr-influxdb-tech-tips-may-26-2016/.

Not sure if this a bug or a feature, but the only possible way to compare with past results is batch, query and offset then.