nathanielc / morgoth

Metric anomaly detection
http://docs.morgoth.io
Apache License 2.0
280 stars 31 forks source link

How to detect anomaly for Kafka rate of messages being processed #37

Open sukantasaha opened 7 years ago

sukantasaha commented 7 years ago

Hi Nathaniel,

I need to find out anomaly on kafka message produced to our kafka-topic-* measurements using morgoth, Here is the tick script
can you verify and check if this should give correct anomaly data

Also I am am not able to save the topic name which is the name of the measurement into the influxdb(kafka-morgoth-alert)

also can you explain what should be value for minSupport and errorTolerance

var groups = 'host'
var field = 'produced'

var scoreField = 'anomalyScore'
var minSupport = 0.05
var errorTolerance = 0.01
var consensus = 0.05
var sigmas = 3.5

var last_day_mean = batch
    |query('SELECT * FROM "sensu"."default"./kafka-topic-lst_plugin.*/')
        .groupBy(groups)
        .period(1d)
        .every(10m)
        .align()
    @morgoth()
        .field(field)
        .scoreField(scoreField)
        .minSupport(minSupport)
        .errorTolerance(errorTolerance)
        .consensus(consensus)
        .sigma(sigmas)
    |alert()
        .details('Kafka Message Produced Is Anomalous')
        .crit(lambda: "anomalyScore" > 0.98)
        .log('/tmp/kafka-morgoth-alert.log')
    |influxDBOut()
        .database('sensu')
        .retentionPolicy('default')
        .measurement('kafka-morgoth-alert')\
sukantasaha commented 7 years ago

can you please help

nathanielc commented 7 years ago

@sukantasaha Looks like you are headed in the right direction. Here are a few pointers:

  1. From the sample data you provided the produced field looks like it is a counter, meaning it is an always increasing value. If that is the case computing the derivative of produced so that you have a rate of produced values would be better. This way morgoth can compute the anomalies in the rate of production, which is probably what you want.
  2. To under stand the values of minSupport and errorTolerance read through this doc http://docs.morgoth.io/docs/detection_framework/
sukantasaha commented 7 years ago

Can you check this , have modified the script to find out derivative

var groups = 'host' var field = 'derivative'

var scoreField = 'anomalyScore' var minSupport = 0.05 var errorTolerance = 0.01 var consensus = 0.5

var sigmas = 4.0

var last_day_mean = batch |query('SELECT derivative(produced,6m) FROM "sensu"."default"./kafka-topic.*/') .groupBy(groups) .offset(1d) .period(18m) .every(6m) .align() @morgoth() .field(field) .scoreField(scoreField) .minSupport(minSupport) .errorTolerance(errorTolerance) .consensus(consensus) .sigma(sigmas) |alert() .crit(lambda: "anomalyScore" > 0.98) .log('/tmp/kafka-anomaly-with-morgoth-derivative.log') |influxDBOut() .database('sensu') .retentionPolicy('default') .measurement('kafka-anomaly-with-morgoth-derivative')

nathanielc commented 7 years ago

@sukantasaha It looks good. The only part that you might want to double check is the .offset(1d) means that you are always querying data from the previous day.If you want to query data from the current time just remove that line.

sukantasaha commented 7 years ago

ok I removed offset and enable the tasks, now I see in the logs alerts generated,

[kafka-derivative:alert3] 2016/12/07 11:48:00 D! CRITICAL alert triggered id:kafka-topic-social_feedback_event_queue:host=Kafka-Staging8 msg:kafka-topic-social_feedback_event_queue:host=Kafka-Staging8 is CRITICAL data:&{kafka-topic-social_feedback_event_queue map[host:Kafka-Staging8] [time anomalyScore derivative] [[2016-12-07 11:42:08.421 +0000 UTC 0.9803921568627451 0]] }

question is if derivative value is 0 then why its detecting as anomaly ?