influxdata / kapacitor

Open source framework for processing, monitoring, and alerting on time series data
MIT License
2.32k stars 493 forks source link

Add kafka as metrics consumer #73

Closed panda87 closed 4 years ago

panda87 commented 8 years ago

This will be awesome if instead of using the InfluxDB resources like query it or add UDP subscriptions, the Kapacitor will be more standalone solution, so it will be able to consume metrics from Kafka and analyze them as sliding window.

The stream is very powerful for the feature above and can complete the kafka consumer. This integration may need to work with a small db to be able store the sliding window metrics for further queries.

D.

yosiat commented 8 years ago

:+1:

In bad network two computers (influx and kapacitor) talking to one another by udp it can be a huge data loss, and storing all of the data for stream alerts in influxdb can be a waste of space.

Another input devices like kafka/other mq (rabbit for example)/http or something can be really awesome!

nathanielc commented 8 years ago

@yosiat I agree Kafka support would be a great addition.

But I do want to point out there are more ways than UDP to get data into Kapacitor, specially if all you need is a more reliable transport.

Kapacitor supports all the same input formats as InfluxDB including its HTTP write API. You can even configure Telegraf to send data to both Kapacitor and InfluxDB simultaneously.

yosiat commented 8 years ago

@nathanielc How can I send to kapacitor? I don't see it documented in influx website.

nathanielc commented 8 years ago

@yosiat From where?

yosiat commented 8 years ago

@nathanielc HTTP write

nathanielc commented 8 years ago

@yosiat The same way you do for InfluxDB.

Example HTTP write for Kapacitor, the port change is the only difference.

curl -i -XPOST 'http://localhost:9092/write?db=mydb' --data-binary 'cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000'

https://docs.influxdata.com/influxdb/v0.9/guides/writing_data/

The graphite udp collectd etc input plugins all work for Kapacitor in the same way as InfluxDB.

And since the database and retention policy only scope the data for which task can consume it there is no need to explicitly create databases etc, just write the data.

yosiat commented 8 years ago

@nathanielc Thanks :)

  1. I think it needs to be added to the documentation of Kapacitor, at least specify this is an option.
  2. If have stream alert, where the data will be saved? or this is in-memory?
nathanielc commented 8 years ago

@yosiat

  1. Agreed, I know its in the docs somewhere but obviously not prominent enough. I'll work on a fix.
  2. Yes, Kapacitor does everything in memory. If you want it saved you have to send to somewhere. InfluxDB for example. Or an alert handler like Sensu or PagerDuty etc.
yosiat commented 8 years ago

@nathanielc Just to make sure I understand:

  1. If I have alerts that for some reason needs to save the points for one hour - they will be stored in memory for 1 hour, and after that 1 hour they will be deleted?
  2. It's great that the points are saved in memory, but how can I make it more fault-tolerant? for example - if the instance goes down I don't want to loose all the data.
  3. Is there stats API for seeing how much is stored in Kapacitor internal memory?
nathanielc commented 8 years ago

@yosiat Not sure I follow what you mean by storing alerts for 1 hour. Can you open a new issue with an example TICKscript and these questions? Then we can discuss it there.

yosiat commented 8 years ago

@nathanielc Thanks for the fast replies :+1: I opened another issue - https://github.com/influxdata/kapacitor/issues/179

bfgoodrich commented 8 years ago

Have Kafka streams as a source of data for Kapacitor would be amazing and would avoid sending data from InfluxDB to Kapacitor as well.

yosiat commented 8 years ago

@nathanielc Can we revive this issue? I think supporting different "stream source" can be great deal (for us, as well), and I think I can submit pull request for this.

I am thinking about two approaches:

What do you think?

nathanielc commented 8 years ago

Can we revive this issue?

Absolutely, this is something we have wanted to prioritize internally but haven't had the resources yet.

I think the second approach is best. I would model it after the graphite, collectd, and udp services.

yosiat commented 8 years ago

@nathanielc Where are the graphite/collectd services?

I want to start developing it with support for ActiveMQ, specifically I want ActiveMQ because it has support for Message Grouping which will help scale kapacitor if you message group id is combination of measurement and it's tags.

I think I will start another issue for ActiveMQ with simple explanation about how I will implement it?

nathanielc commented 8 years ago

The graphite/collectd services are imported from the InfluxDB code base.

I think I will start another issue for ActiveMQ with simple explanation about how I will implement it?

Sounds good.

panda87 commented 8 years ago

Hi @yosiat did you start to work on this feature? I see real value on not depends on influxdb but connect to the source that in our case is Kafka. Don't make me wrong, influxdb is awesome, but it's not fully high available and we experienced few times of downtime which made a lot of mess to our alerts.

yosiat commented 8 years ago

Hi @panda87, I haven't start working on this feature yet..

bfgoodrich commented 8 years ago

@nathanielc Is this something that InfluxData can re-prioritize? I think this would be a very valuable feature for Kapacitor. We are planning on using Telegraf to push messages to and from Kafka topics. Being able to pull directly from Kafka would allow us to alert without going through InfluxDB and then we can annotate the alert metadata back into another metrics Kafka queue.

yosiat commented 8 years ago

Hi, I am starting to research about how to implement this for kapacitor, and I have multiple questions:

Topics

Do we Kapacitor subscribe to one topic? or multiple topics? or topic with wildcard?

Offset

As I understand a Kafka's consumer must provide offset, I understand that there are 3 options:

I think that the kapacitor configuration will accept something like "offset-strategy" that will have the next options:

and the same for oldest (etc: oldest and oldest-save)

what do you think? @bfgoodrich @panda87 @nathanielc

Side note: I won't be available for the next week, if anybody want to implement this, go ahead, it's ok

panda87 commented 8 years ago

@yosiat thanks for the initiative!

Following my thoughts,

Topic: I think that kapacitor should be able to subscribe many topics as you want.

Offset: I would prefer the newest (latest) offsets, but in regards to your question, both options are good for different use cases, if kapacitor experience an outage, if this 1-5m the persistent option will be better but for 1h of outage I don't think you will want to get alerts for "old" events.

But for most of the use cases of outages, in my opinion, persistent will be preferable. btw, because of the offset mechanism of kafka, the batch option kapacitor will have to go through influxDB anyway

bfgoodrich commented 8 years ago

@yosiat

Thanks for the opportunity to supply some feedback.

It would be extraordinarily nice if you could subscribe to one or more topics based on a wildcard or even better a regex.

It would be brilliant if you could specify your own offsets for each Kafka per topic or regex/wildcard definition. This would allow Kapacitor to stream through the log messages to test new alerts or even generate alerts for annotations in case there would have been alerts during a network outage.

"Newest (no offset)" could be a good global default, but being able to change the mode to "persistent offset" or being able to specify/override with a starting Kafka offset would be incredibly useful. Perhaps there could be a way in influx to only generate escalated alerts within a certain time window or configure a test mode to disable alert escalation/notification.

I realize the alerting logic is out of scope, but that (configurable alert logic) coupled with allowing configurable windows or starting points for Kafka offsets would create a very powerful tool for testing alert thresholds or even going through queued/unprocessed metrics data to generate historical alert data/metrics (such as annotations for auditing/reporting purposes or determining if there was a violation of an SLA contract.)

Also using Kafka to stream messages directly to Kapacitor decouples Kapacitor from InfluxDB and reduces load and bandwidth from InfluxDB. InfluxDB just acts as another consumer of the same Kafka topic as Kapacitor - if you use Kafka as a transport for metrics going into InfluxDB. (Which is what we are currently doing using Telegraf.)

szibis commented 8 years ago

Consuming Kafka data like multiple logs data (one central place for logs and metrics data from multiple systems) then converting selected data in Kapacitor which will makes alerting and send converted data to InfluxDB (to visualise in Grafana and generate reports) would be great feature. With feature like defining offset for old events when Kapacitor dies or when we detect some data corruption and we need reprocess data once again (without alert maybe this time). This would be great to use it on production.

nathanielc commented 8 years ago

Thanks for all the input everyone. I think it would also be worth a quick look at the Telegraf Kafka plugin to get a feel for how it is used there. https://github.com/influxdata/telegraf/tree/master/plugins/inputs/kafka_consumer

yosiat commented 8 years ago

Thanks everyone for the feedback 👍 @nathanielc I looked at Telegraf and I planned to take inspiration/configuration form their, the only thing I think we should add is support for "oldest-save" and "newest-save" offsets, so we can continue processing from the last point since we crashed.

I have thought about it, and IMHO specifying in the configuration of kapacitor from which kafka topics to read will be strange, I think it should be specified in the tick script, for example:


// "kafka" is like stream/batch ;)
var  points = kafka.topics('data1', 'data2')
  // we take defaults from configuration like offset, but we can override in the tick script
  .offset('newest')
  // we can't change the kafka, metrics buffer

points|alert()...

It would be nicer and easier, if you have more than single digit topics that you want to listen to. What do you think?

/CC: @bfgoodrich @panda87

nathanielc commented 8 years ago

@yosiat I like the idea of using kafka like a stream/batch special var. Right now a task type is coupled with its entry point aka stream/batch.

I see two options:

  1. Remove the need for the task type and allow any task to use stream/batch/kafka at will. (Possibly even mixed within the same task). This is a lot of work but would be the cleaner solution in my mind.
  2. Simply add a kafka task type.
  3. There maybe another option I haven't though of ATM...

We are approaching a 1.0 release, where we are locking down the API etc, so I thing task types are here to stay since we don't have time to change it before the release. I think we should take option 2 for now, but leave the door open for option 1 later.

yosiat commented 8 years ago

@nathanielc Ok, sounds great!

We might have some problems that I want to tackle right now:

After those questions, I think we have enough for starting a WIP pull request.

bfgoodrich commented 8 years ago

@yosiat @nathanielc It would be REALLY nice if there was a way to have shared state between multiple Kapacitor instances so that Kapacitor had the ability to scale horizontally. I am sure that it would be non-trivial (aka difficult), but it would be a really great for Kapacitor to be able to keep up with very high throughput data streams by running many instances of Kapacitor.

@nathanielc Also, FWIW - Regarding your previous post - "Remove the need for the task type and allow any task to use stream/batch/kafka at will. " - Option 1 would be really great!

bfgoodrich commented 8 years ago

I hate to ask, but any chance that someone is working on this still - or did this kind of fall by the wayside because of other, higher priorities?

docmerlin commented 4 years ago

Closing this as it already exists