confluentinc / confluent-kafka-go

Confluent's Apache Kafka Golang client
Apache License 2.0
4.65k stars 659 forks source link

Increasing memory #540

Open cedricve opened 4 years ago

cedricve commented 4 years ago

Description

I have about 10 consumers continuously reading from different topics. What I see is that memory is increasing significantly, until it blocks all my consumers. All I do is reading from a topic and sending it to another topic. As suggested I'm reading from the delivery chan.

Screenshot 2020-09-30 at 19 42 58

I also noticed that after a while the memory gets a some extend "stable", however at some point it will freeze all the consumers, and things is read by any consumers.

How to reproduce

I create a consumer and producer

  func CreateKafkaQueue(name string) *KafkaQueue {
    kafka_broker := os.Getenv("KAFKA_BROKER")
    kafka_username := os.Getenv("KAFKA_USERNAME")
    kafka_password := os.Getenv("KAFKA_PASSWORD")
    kafka_mechanism := os.Getenv("KAFKA_MECHANISM")
    kafka_security := os.Getenv("KAFKA_SECURITY")
    configMap := kafka.ConfigMap{
      "bootstrap.servers": kafka_broker,
      "group.id":          "mygroup",
      "session.timeout.ms":   10000,
        "auto.offset.reset":    "earliest",
      "sasl.mechanisms":   kafka_mechanism, //"PLAIN",
      "security.protocol": kafka_security, //"SASL_PLAINTEXT",
      "sasl.username":     kafka_username,
      "sasl.password":     kafka_password,
    }
    c, _ := kafka.NewConsumer(&configMap)
    p, _ := kafka.NewProducer(&configMap)
    deliveryChan := make(chan kafka.Event)

    return &KafkaQueue {
      Name: name,
      Consumer: c,
      Producer: p,
      DeliveryChan: deliveryChan,
    }
  }

And then I have a reading function. What this function is doing is reading from a topic, and moving it to the next topic.

    func (q KafkaQueue) ReadMessagesForever(handleMessage func(Payload) (string, int), sync ...bool) {
      q.Consumer.SubscribeTopics([]string{q.Name}, nil)

      isSync := true
      if len(sync) > 0 {
        isSync = sync[0]
      }

      for {
            msg, err := q.Consumer.ReadMessage(-1)
            if err == nil {
            var payload Payload
                json.Unmarshal(msg.Value, &payload)
                fmt.Printf("Message on %s: %s\n", msg.TopicPartition, payload)
          result, _ := handleMessage(payload)

          if isSync {
            if result == "forward" {
              // Shift event
              payload.Events = payload.Events[1:]
              bytes, _ := json.Marshal(payload)
              topic := "kcloud-event-queue"
              q.Producer.Produce(&kafka.Message{
                TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
                Value: bytes,
              }, q.DeliveryChan)
              _ = <-q.DeliveryChan //wait
            } else if result == "cancel" {
              // todo, if needed ;)
            } else if result == "retry" {
              // todo, if needed ;)
            }
          }
            } else {
                // The client will automatically try to recover from all errors.
                fmt.Printf("Consumer error: %v (%v)\n", err, msg)
            }
        }
    }

Checklist

Please provide the following information:

mhowlett commented 4 years ago

try setting a low value for queued.max.messages.kbytes. The default has recently been reduced (to 65536), but IIRC it's still very high in the current release version of the go client.

cedricve commented 4 years ago

@mhowlett is this a recommended setting @mhowlett ? or why is it set. Thank you so much for your advice.

mhowlett commented 4 years ago

The default settings have traditionally been optimized for very high throughput. the new value, 65Mb is still a lot of caching though and I don't think it would impact max-throughput much. you can reduce it even further if you care a lot about memory.

cedricve commented 4 years ago

@mhowlett thanks for explaining I've been reading a bit, but still not clear what this setting is actually doing. I understand it's the cache size, but what will it do? When will it allocate memory and release memory.

mhowlett commented 4 years ago

librdkafka aggressively pre-fetches messages from brokers and caches them. this is the maximum size of that cache per topic. memory is allocated as needed and won't be reduced when no longer needed.

cedricve commented 4 years ago

Great thanks @mhowlett, so reducing the setting will make sure less messages can be cached.

memory is allocated as needed and won't be reduced when no longer needed.

Looking at the Kubernetes charts, I see incremental memory usage. So reducing the kbytes setting will make sure the memory increases less quickly but still not sure (in my mind at least) why this settings would stop this incremental increase.

Sorry for bothering with all these questions, I should pay you a consulting fee :P

cedricve commented 4 years ago

Just found it some of the logs are increasing because of sending a lot of big objects. Not sure when this should be removed, played a bit with the log settings of this Helm chart. https://github.com/bitnami/charts/tree/master/bitnami/kafka

Screenshot 2020-10-08 at 21 15 11
FarisFAhmed commented 2 years ago

I have a similar memory consumption behavior like @cedricve with version 1.9.1. My service simply consumes one topic and writes messages to two other topics. Even while the conumer is blocked by GetMessage(-1) and waiting for messages to come, the service memory consumption increases continuously.

Any advice on how to handle this? I expect the memory consumption to be stable when no messages are fetched by the consumer.