confluentinc / confluent-kafka-go

Confluent's Apache Kafka Golang client
Apache License 2.0
4.6k stars 655 forks source link

Producer sends messages with a hightly-fluctuating speed #563

Closed elftech closed 7 months ago

elftech commented 3 years ago

Description

Producer sends messages to a specified topic with a hightly-fluctuating speed with a cycle of a few mininutes. Any wrong of my way sending messages or my configuration??

How to reproduce

I've record the speed of messaing and show them in monitor tools grafana.

截屏2020-11-05 下午6 18 34

Please neglect the decrease near 16:50 'cause I had decreased the upsteam requests.

And Here's my code for production:


func init() {
        init_producer_limiter = make(map[string]*sync.Once)
    for _, topic := range producer_topics {
        var once sync.Once
        init_producer_limiter[topic] = &once

    }
}

func ProduceKafkaMsg(topic string, b []byte) {

    p_once, ok := init_producer_limiter[topic]
    if !ok {
        glog.Info("InitDelKafkaProducerFail!")
        panic("InitDelKafkaProducerFail!")
    }

    init_func := func() {
        var err error
        producers[topic], err = kafka.NewProducer(&kafka.ConfigMap{
            //"debug":             "cgrp,fetch,topic,broker,protocol",
            "bootstrap.servers": GetWholeBrokerList(topic)})
        if err != nil {
            glog.Info("InitDelKafkaProducerFail!")
            panic("InitDelKafkaProducerFail!")
        }

        wg.Add(1)
        go func() {
            begin_t := MakeTimestamp()
            PREFIX := "[" + topic + "]"
            code := 0
            msg := "success"

        LOOP:
            for true {
                select {
                case e, ok := <-producers[topic].Events():
                    switch ev := e.(type) {
                    case *kafka.Message:
                        if ev.TopicPartition.Error != nil {
                            Err_report(PREFIX, "SendKafkaFail_"+topic, &msg, &code, " ", ev.TopicPartition)
                        } else {
                                                       //Here is the line to report count of message sended to the grafana monitor panel.
                            Info_report(PREFIX, "DeliveredKafkaMsg_"+topic, begin_t, ev.TopicPartition)
                        }
                    }

                    if !ok {
                        break LOOP
                    }

                    begin_t = MakeTimestamp()
                case <-time.After(10 * time.Second):
                    Err_report(PREFIX, "SendKafkaTimeLong", &msg, &code)
                }
            }
            Err_report(PREFIX, "ProducerExiting", &msg, &code)

            wg.Done()
        }()

    }

    (*p_once).Do(init_func)

    producers[topic].Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
        Value:          b,
    }, nil)

}

Checklist

Please provide the following information:

edenhill commented 3 years ago
  1. Verify that your producer-reusal code is in fact used and that a new producer is not created for each Produce()
  2. To break this problem down you'll also need to measure the ingestion rate, i.e., at what rate you are calling Produce(), and measure the latency, per message, between Produce() and the delivery report event.
edenhill commented 3 years ago

Also read this: https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#latency-measurement

elftech commented 3 years ago
  • Verify that your producer-reusal code is in fact used and that a new producer is not created for each Produce()
  • To break this problem down you'll also need to measure the ingestion rate, i.e., at what rate you are calling Produce(), and measure the latency, per message, between Produce() and the delivery report event.

1.I've added initialization of p_once slice in the code snippet posted to illustrate why only one producer can be used for a specified topic, demonstrating that my producer-reusal code is ok.

  1. I've also record the ingestion rate in grafana, which was smooth.

Finally, i switched to use sarama https://github.com/Shopify/sarama instead in the producer side and it never fluctuates again. I guess the fluctuation may blame to the buffer strategy of rdkafka but I've no enough time to dive deep into it.

Thanks a lot for your reply and I hope someone could figure out this issue in the future.

milindl commented 7 months ago

Closing this for now - if someone encounters this issue in the latest version again, please reopen this including the latency, per message, between Produce() and the delivery report event.