influxdata / kapacitor

Open source framework for processing, monitoring, and alerting on time series data
MIT License
2.31k stars 492 forks source link

Alerts will delay because of statecount and max node #2001

Open F4ncyMooN opened 6 years ago

F4ncyMooN commented 6 years ago

Hi,

When I use kapacitor for monitoring, some weird delay happened to me. It seems statecount and max node will create data interval. Here is the tick script.

// Subscribe data stream.
var from_M0001 = stream
    |from()
        .database('test_perf')
        .retentionPolicy('oneday')
        .measurement('M0001')
    |default()
        .tag('group', '')
        .field('metric1', 0.0)
    |groupBy('group')

// Drop useless metrics, reduce memory usage amount.
var eval_M0001 = from_M0001
    |eval()
        .keep('metric1')

// Pre-aggregation metrics by check level.
var preAggrSum_M0001_tx_user_ad_u_total = eval_M0001
    |sum('metric1')
        .as('M0001::metric1__metric1')

// Join pre-aggregation result into nodes for each measurement.
var preAggrJoin_M0001 = preAggrSum_M0001_tx_user_ad_u_total

// Join data streams from all measurement.
var join0 = preAggrJoin_M0001

// Set check period and frequency.
var window = join0
    |window()
        .period(1m)
        .every(1m)
        .fillPeriod()

// Aggregate window data.
var sum_M0001_tx_user_ad_u_total = window
    |sum('M0001::metric1__metric1')
        .as('kSum_metric1')

var join_aggregation = sum_M0001_tx_user_ad_u_total
    |join()
        .as('M0001_kSum_metric1')
        .tolerance(1s)
        .delimiter('::')

// Set check expression and criteria.
var info_count = join_aggregation
    |stateCount(lambda: float("M0001_kSum_metric1::kSum_metric1") > 2500000.0)
        .as('kInfoCount')

var warn_count = info_count
    |stateCount(lambda: float("M0001_kSum_metric1::kSum_metric1") < 2500.0)
        .as('kWarnCount')

var crit_count = warn_count
    |stateCount(lambda: float("M0001_kSum_metric1::kSum_metric1") < 200.0)
        .as('kCritCount')

var info_max = crit_count
    |max('kInfoCount')
        .as('kInfoCount')
    |log()

var warn_max = crit_count
    |max('kWarnCount')
        .as('kWarnCount')
    |log()

var crit_max = crit_count
    |max('kCritCount')
        .as('kCritCount')
    |log()

var join_max = info_max
    |join(warn_max, crit_max)
        .as('info_max', 'warn_max', 'crit_max')
        .tolerance(1m)
        .delimiter('::')

var keep_max = join_max
    |eval()
        .keep('info_max::kInfoCount', 'warn_max::kWarnCount', 'crit_max::kCritCount')

// Save alerts.
keep_max
    |alert()
        .id('{{ .TaskName }}')
        .info(lambda: "info_max::kInfoCount" >= 1)
        .warn(lambda: "warn_max::kWarnCount" >= 1)
        .crit(lambda: "crit_max::kCritCount" >= 1)
        .message('')
        .details('')
        .post('url')

After window node, batch interval is 1m, and max node will be waiting 1m for the next batch to arrive. So the final alert will be delayed for 1m(batch interval).

DOT:
digraph delay {
graph [throughput="0.00 points/s"];

stream0 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
stream0 -> from1 [processed="48"];

from1 [avg_exec_time_ns="1.496µs" errors="0" working_cardinality="0" ];
from1 -> default2 [processed="48"];

default2 [avg_exec_time_ns="780ns" errors="0" fields_defaulted="0" tags_defaulted="0" working_cardinality="0" ];
default2 -> groupby3 [processed="48"];

groupby3 [avg_exec_time_ns="844ns" errors="0" working_cardinality="0" ];
groupby3 -> eval4 [processed="48"];

eval4 [avg_exec_time_ns="2.562µs" errors="0" working_cardinality="2" ];
eval4 -> sum5 [processed="48"];

sum5 [avg_exec_time_ns="5.242µs" errors="0" working_cardinality="2" ];
sum5 -> window6 [processed="14"];

window6 [avg_exec_time_ns="0s" errors="0" working_cardinality="2" ];
window6 -> sum7 [processed="2"];

sum7 [avg_exec_time_ns="120ns" errors="0" working_cardinality="2" ];
sum7 -> join8 [processed="2"];

join8 [avg_exec_time_ns="0s" errors="0" working_cardinality="2" ];
join8 -> state_count9 [processed="2"];

state_count9 [avg_exec_time_ns="34.51µs" errors="0" working_cardinality="2" ];
state_count9 -> state_count10 [processed="2"];

state_count10 [avg_exec_time_ns="0s" errors="0" working_cardinality="2" ];
state_count10 -> state_count11 [processed="2"];

state_count11 [avg_exec_time_ns="0s" errors="0" working_cardinality="2" ];
state_count11 -> max16 [processed="2"];
state_count11 -> max14 [processed="2"];
state_count11 -> max12 [processed="2"];

max16 [avg_exec_time_ns="21.465µs" errors="0" working_cardinality="2" ];
max16 -> log17 [processed="0"];

log17 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
log17 -> join20 [processed="0"];

max14 [avg_exec_time_ns="0s" errors="0" working_cardinality="2" ];
max14 -> log15 [processed="0"];

log15 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
log15 -> join20 [processed="0"];

max12 [avg_exec_time_ns="4.003µs" errors="0" working_cardinality="2" ];
max12 -> log13 [processed="0"];

log13 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
log13 -> join20 [processed="0"];

join20 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
join20 -> eval21 [processed="0"];

eval21 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
eval21 -> alert22 [processed="0"];

alert22 [alerts_inhibited="0" alerts_triggered="0" avg_exec_time_ns="0s" crits_triggered="0" errors="0" infos_triggered="0" oks_triggered="0" warns_triggered="0" working_cardinality="0" ];

Kapacitor version: Kapacitor v1.5.0 (git: 4f10efc41b4dcac070495cf95ba2c41cfcc2aa3a)

yfxvector commented 5 years ago

have you fix this ?

F4ncyMooN commented 5 years ago

No, I didn't fix it from kapacitor source code. I change the script for work-around.

previous:

// Set check expression and criteria.
var info_count = join_aggregation
    |stateCount(lambda: float("M0001_kSum_metric1::kSum_metric1") > 2500000.0)
        .as('kInfoCount')

var warn_count = info_count
    |stateCount(lambda: float("M0001_kSum_metric1::kSum_metric1") < 2500.0)
        .as('kWarnCount')

var crit_count = warn_count
    |stateCount(lambda: float("M0001_kSum_metric1::kSum_metric1") < 200.0)
        .as('kCritCount')

var info_max = crit_count
    |max('kInfoCount')
        .as('kInfoCount')
    |log()

var warn_max = crit_count
    |max('kWarnCount')
        .as('kWarnCount')
    |log()

var crit_max = crit_count
    |max('kCritCount')
        .as('kCritCount')
    |log()

var join_max = info_max
    |join(warn_max, crit_max)
        .as('info_max', 'warn_max', 'crit_max')
        .tolerance(1m)
        .delimiter('::')

var keep_max = join_max
    |eval()
        .keep('info_max::kInfoCount', 'warn_max::kWarnCount', 'crit_max::kCritCount')

// Save alerts.
keep_max
    |alert()
        .id('{{ .TaskName }}')
        .info(lambda: "info_max::kInfoCount" >= 1)
        .warn(lambda: "warn_max::kWarnCount" >= 1)
        .crit(lambda: "crit_max::kCritCount" >= 1)
        .message('')
        .details('')
        .post('url')

now

var state_count = window|stateCount(lambda: float("M0001_kSum_metric1::kSum_metric1") > 2500000.0)
        .as('kInfoCount')
    |stateCount(lambda: float("M0001_kSum_metric1::kSum_metric1") > 2500000.0)
        .as('kWarnCount')
    |stateCount(lambda: float("M0001_kSum_metric1::kSum_metric1") > 2500000.0)
        .as('kCritCount')

// Save alerts.
state_count
    |alert()
        .id('{{ .TaskName }}')
        .info(lambda: "kInfoCount" >= 1)
        .warn(lambda: "kWarnCount" >= 1)
        .crit(lambda: "kCritCount" >= 1)
        .message('')
        .details('')
        .post('url')
F4ncyMooN commented 5 years ago

But I think this should be fixed in kapacitor side.