confluentinc / confluent-kafka-go

Confluent's Apache Kafka Golang client
Apache License 2.0
4.59k stars 652 forks source link

Producer and Consumer fail logs #706

Open AnuragJha2601 opened 2 years ago

AnuragJha2601 commented 2 years ago

SCENARIO

We have a use case where we consume events from a topic in our microservice and eventually after certain business flows we produce multiple events to different topic(s).

Service -> Consume and then multiple Produce

We see below logs after a few hours when the service is actively running

|REQTMOUT|rdkafka#producer-2| [thrd:sasl_ssl://{{placeholder}}.aws.confluent.cloud:9092/2]: sasl_ssl://{{placeholder}}.aws.confluent.cloud:9092/2: Timed out ProduceRequest in flight (after 60728ms, timeout #0) |REQTMOUT|rdkafka#producer-2| [thrd:sasl_ssl://{{placeholder}}.aws.confluent.cloud:9092/2]: sasl_ssl://{{placeholder}}.aws.confluent.cloud:9092/2: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests |FAIL|rdkafka#producer-2| [thrd:sasl_ssl://{{placeholder}}.aws.confluent.cloud:9092/2]: sasl_ssl://{{placeholder}}.aws.confluent.cloud:9092/2: 1 request(s) timed out: disconnect (after 706381222ms in state UP)

|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: {{placeholder}}.aws.confluent.cloud:9092: Disconnected (after 345599852ms in state UP, 1 identical error(s) suppressed) error: Local: Broker transport failure: GroupCoordinator: {{placeholder}}.aws.confluent.cloud:9092: Disconnected (after 345599852ms in state UP, 1 identical error(s) suppressed)

|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://{{placeholder}}.aws.confluent.cloud:9092/0]: sasl_ssl://{{placeholder}}.aws.confluent.cloud:9092/0: Disconnected (after 345600240ms in state UP, 1 identical error(s) suppressed) error: Local: Broker transport failure: sasl_ssl://{{placeholder}}.aws.confluent.cloud:9092/0: Disconnected (after 345600240ms in state UP, 1 identical error(s) suppressed) |FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://{{placeholder}}.aws.confluent.cloud:9092/8]: sasl_ssl://{{placeholder}}.aws.confluent.cloud:9092/8: Disconnected (after 345600352ms in state UP, 1 identical error(s) suppressed)

OTHER DETAILS

  1. confluent-kafka-go version - v1.7.0
  2. Kafka Cluster : Confluent Platform Basic plan (since we are in development phase now)
  3. Consumer config - { UserName: username, Password: password, Brokers: broker, Topics: []string{kafkaConfig.CONSUMER_TOPIC}, ConsumerGroup: kafkaConfig.CONSUMER_GROUP, FetchMinBytes: 1000, PollTimeout: 100, }
  4. Producer config - { UserName: username, Password: password, Brokers: broker, Topic: kafkaConfig.PRODUCER_TOPIC, BatchSize: 100000, LingerMs: 10, }
  5. The service runs on kubernetes pods deployed on EKS (AWS)

QUESTIONS/QUERIES : We as a team are new to Kafka and golang. So would appreciate below queries to be answered.

  1. Any specific reason we see these logs?
  2. We create producer during the start of the application and that remains a single producer throughout the application lifecycle. Do producers have a timeout/expiry ? Other option for us is to create producers at the time of producing messages but in that case we will end up creating many(in order of thousands) producers. What should be the best way to handle this scenario ?
  3. Are there any other configurations or better practices you may want to recommend ?
mangup commented 2 years ago

Faced with similar errors. Why still no response here ? )

jliunyu commented 2 years ago
  1. For your first question, you're using batch producer and set the linger.ms parameter which defines how long to hold data back before sending it (even if batches are not full). Once batch.size is reached or at least linger.ms time has passed, the system will send the batch as soon as it is able. While a ProduceRequest can contain multiple partitions, librdkafka will only send one partition per request, and due to the brokers HOLB processing of produce requests, the subsequent request's will need to wait for the previous ones to finish, so it maybe time out.

  2. For your second question, I don't think creating producers at the time of producing messages is a good idea.

sangianpatrick commented 2 years ago

I've the same issues, and I've tried to put linger.ms=100 to the producer config but the err logs are still there. My question is, does the err logs come because of time exceeded even the queue is empty ? So I think it wouldn't get the err if the producer frequently sending messages. If it so, when the service isn't too frequent to produce some messages, maybe we should try to put linger.ms=5000 and the queue capacity set smaller. Yeah the err will still exist eventually, but at least it reduced, maybe comes in every 5-6 seconds