influxdata / telegraf

Agent for collecting, processing, aggregating, and writing metrics, logs, and other arbitrary data.
https://influxdata.com/telegraf
MIT License
14.6k stars 5.57k forks source link

Kafka output plugin should support batching at the producer level #4161

Closed JimHagan closed 6 years ago

JimHagan commented 6 years ago

Support Batching at the Producer Level

This will make it possible to scale to higher degree, by making fewer calls and may allow our compression of output objects to be more efficient.

Proposal:

Currently the Write method uses an individual call to sendMessage per metric. This could be refactored to allow for a batch paramter passed to write (and injected down from the Kafka output plugin). We could use sendMessages instead.

Suggestion...

func (k *Kafka, batch) Write(metrics []telegraf.Metric) error {
    if len(metrics) == 0 {
        return nil
    }

   counter:=0
   m_list:= []

    for _, metric := range metrics {
        buf, err := k.serializer.Serialize(metric)
        if err != nil {
            return err
        }

       counter += 1
       m_list.append(m)

        topicName := k.GetTopicName(metric)

        m := &sarama.ProducerMessage{
            Topic: topicName,
            Value: sarama.ByteEncoder(buf),
        }
        if h, ok := metric.Tags()[k.RoutingTag]; ok {
            m.Key = sarama.StringEncoder(h)
        }

        if counter%batch == 0 {
        _, _, err = k.producer.SendMessages(m_list)
         counter = 0;
         m_list = []
       }

        if err != nil {
            return fmt.Errorf("FAILED to send kafka message: %s\n", err)
        }
    }

    if counter > 0  {
        _, _, err = k.producer.SendMessages(m_list)
    }

    return nil
}

Use case: [Why is this important (helps with prioritizing requests)]

We have a very large multi-data center Kafka pipeline and we need batching to lower the number of calls as well as to make Kafka compression possibly more efficient.

russorat commented 6 years ago

see also https://github.com/influxdata/telegraf/issues/2825

JimHagan commented 6 years ago

@russorat Is there code for this in 1.8 that we could give a look at? I was just talking with @edbernier

danielnelson commented 6 years ago

@JimHagan I have two potential patches that I could use your help performance testing, if you are up for it.

I have them in separate branches: kafka-perf-send-messages and kafka-perf-batch.

The send-messages branch works by simply using the SendMessages method on sarama.SyncProducer, this sends all messages out in a Write batch and waits for them all to complete, instead of the previous method of send/recv, send/recv, etc.

The other method combines SendMessages with message batching, this changes the output format to contain multiple metrics per message. It is a bit more complicated though and in simple testing it doesn't seem to show any performance improvements for me. I would be interested in what performance differences can be found in a more substantial test environment though.

There is a potential issue you may run into when using the master branch where Telegraf could deadlock shutting down, I will of course take care of this before the next release.

JimHagan commented 6 years ago

I will get back to you @danielnelson I believe we want the second and the reason is we are using Kafka mirror maker to move large amounts of metrics to and from data centers and that should provide optimizations for what we are doing. I want to check in with one of our Kafka experts.

The only concern about the batching option is it looks like it attempts to send the whole buffer's worht of data as a batch. Am I reading that correctly? The idea I had was that we could constrain the number of messages that would be bundled per batch. I can certain test the version as is for now but we could run into the kafka max message size on our infrastructure (as it's configured).

JimHagan commented 6 years ago

@danielnelson and @edbernier After consulting with Tom, he indicated that the first option (kafka-perf-send-messages) with SendMessages may make sense IF we rely on the Kafka producer to handle batching. Tom thinks that Kafka producers should know how to take a bunch of messages and do it's own batching provided configuration is exposed.

danielnelson commented 6 years ago

The producer should be able to send the data efficiently using SendMessages, in my testing it is faster than sending with batching but I don't have a high traffic test environment. You can control the size of the batches using the metric_batch_size agent option, although be aware that this value is shared by all outputs.

I think we shouldn't merge the batching support pending benchmarks showing that it is needed, if you are available to help check this I could provide a development build.

JimHagan commented 6 years ago

@danielnelson Should I be getting an RC to test this with?

danielnelson commented 6 years ago

I added links on https://github.com/influxdata/telegraf/pull/4517, let me know if you need a different package.