IBM / sarama

Sarama is a Go library for Apache Kafka.
MIT License
11.45k stars 1.75k forks source link

kafka: broker not connected #2732

Closed ovdienkonikolai closed 9 months ago

ovdienkonikolai commented 10 months ago

Hello everyone, has anyone encountered the "kafka: broker not connected" error in Sarama? We are facing a situation where we have a consumer that operates normally, doesn't miss messages, and works as expected, but every 10 seconds the connection breaks and this error pops up in the Errors() channel. Digging deeper, we got the following logs:

[sarama]2023/11/20 14:09:08 Connected to broker at kafka:29094 (registered as #0)
[sarama]2023/11/20 14:09:08 client/brokers registered new broker #0 at kafka:29094
[sarama]2023/11/20 14:09:08 Connected to broker at kafka:29094 (registered as #0)
[sarama]2023/11/20 14:09:10 client/metadata fetching metadata for [messaging.service.dev] from broker kafka-all-broker:29094
[sarama]2023/11/20 14:09:10 consumer/broker/0 accumulated 1 new subscriptions
[sarama]2023/11/20 14:09:10 consumer/broker/0 added subscription to messaging.service.dev/0
[sarama]2023/11/20 14:09:18 Closed connection to broker kafka:29094
[sarama]2023/11/20 14:09:18 client/metadata fetching metadata for all topics from broker kafka-all-broker:29094
[sarama]2023/11/20 14:09:18 client/metadata fetching metadata for [messaging.service.dev] from broker kafka-all-broker:29094
[sarama]2023/11/20 14:09:18 client/brokers registered new broker #0 at kafka:29094
[sarama]2023/11/20 14:09:18 Closed connection to broker kafka:29094
[sarama]2023/11/20 14:09:18 client/metadata fetching metadata for all topics from broker kafka-all-broker:29094
[sarama]2023/11/20 14:09:18 consumer/broker/0 disconnecting due to error processing FetchRequest: kafka: broker not connected
{"level":"error","ts":"2023-11-20T14:09:18.274Z","caller":"runtime/asm_amd64.s:1594","message":"got error","error":"kafka: error while consuming messaging.service.dev/0: kafka: broker not connected"}

These [sarama] prefixed messages are from the library's internal logger, representing one cycle of polling the topic when there are no messages. There's nothing in the Kafka logs related to this behavior. We've tentatively concluded that this is normal behavior, but would like to double-check. Interestingly, if we disable authorization in Kafka, the connection stops breaking, and we don't see the following message:

[sarama]2023/11/20 14:09:18 Closed connection to broker <...>

Sarama itself restores the connection and, ignoring the error, everything works fine. We searched through the issues but found nothing; we even tried solutions from there, but the behavior didn't change: it still worked the same, and the error popped up every 10 seconds. Why it happens exactly every 10 seconds is also unclear. Changing the parameters Producer.Timeout, Consumer.Group.Session.Timeout, Consumer.Group.Heartbeat.Interval had no effect in our context - the error still occurs every 10 seconds. It seems as if the topic polling happens in a session lasting 10 seconds. But if that's the case and it's normal behavior, it's unclear why it would send an error and how to set the TTL of this session.

Sarama config:

sConfig.ClientID = ClientNameConsumer
sConfig.Consumer.Return.Errors = true
sConfig.Net.TLS.Enable = true
sConfig.Net.TLS.Config.MinVersion = tls.VersionTLS13
sConfig.Net.TLS.Config.InsecureSkipVerify = true
sConfig.Net.TLS.Config.Certificates = <...>
sConfig.Net.TLS.Config.RootCAs = <...>

Example below, this code is quite simplified, I've removed all error handling to provide short snippet:

kafkaClient, err := sarama.NewClient(config.Brokers, sConfig)
kafkaConsumer, err := sarama.NewConsumerFromClient(kafkaClient)
partitions, err := c.consumer.Partitions(c.topic)

for _, partition := range partitions {
  pConsumer, err := c.consumer.ConsumePartition(c.topic, partition, sarama.OffsetNewest)
  c.partitionsConsumers = append(c.partitionsConsumers, pConsumer)

  go func(partitionConsumer sarama.PartitionConsumer) {
    for msg := range partitionConsumer.Messages() {
      if msg != nil {
        monitoring.IncCountOfConsumedMessages()
        c.messages <- msg
      }
    }
  }(pConsumer)

  go func(partitionConsumer sarama.PartitionConsumer) {
    for err := range partitionConsumer.Errors() {
      if err != nil {
        monitoring.IncCountOfConsumedErrors()
        c.logger.Error("got error", zap.Error(err))
      }
    }
  }(pConsumer)

So, the question to the studio is: is this normal behavior for Sarama, and should we just ignore this error?

hanyuancheung commented 10 months ago

Hi, I've met similar problem, which version of kafka server did you used?

ovdienkonikolai commented 10 months ago

@hanyuancheung we're using sarama v1.38.1 and kafka version 3.5.0

BTW we're still having this problem.

dnwe commented 9 months ago

@ovdienkonikolai no this wouldn't be considered normal behaviour — i noticed from your logs that you might be using a bootstrap hostname (kafka-all-broker) that perhaps resolves to multiple A records for each broker? I wondered if there was some strangeness here compared with the metadata that comes back from an individual broker that might be causing conflicting state. Though I'd have expected to see (e.g.,) 'client/brokers replaced registered broker' in your logs.

Can you look at upgrading to the latest release and confirm that the issue is still present? If so, can you detail a little more about what your bootstrap addresses are, how many brokers you have and what your hostname+port setup are?

ovdienkonikolai commented 9 months ago

@dnwe thank your for the response.

upgrading to the latest release and confirm that the issue is still present

Yep. Still present. Version v1.42.1

how many brokers you have

5 brokers

what your hostname+port setup

I'm using cluster-local kafka: kafka-all-broker:29094. I cannot reproduce it on my localhost; every time I try to do that, all works fine.

Fresh logs are below.

Logs from remote server:

{"level":"info","ts":"2024-01-09T12:43:18.058Z","caller":"kafka/consumer.go:38","message":"Kafka certificates initialization completed"}
[sarama] Initializing new client
[sarama] client/metadata fetching metadata for all topics from broker kafka-all-broker:29094
[sarama] Connected to broker at kafka-all-broker:29094 (unregistered)
[sarama] client/brokers registered new broker #0 at kafka-0.kafka-headless.spectacular-nativity-55-2024-1-9.svc.cluster.local:29094
[sarama] Successfully initialized new client
[sarama] Connected to broker at kafka-0.kafka-headless.spectacular-nativity-55-2024-1-9.svc.cluster.local:29094 (registered as #0)
{"level":"info","ts":"2024-01-09T12:43:18.077Z","caller":"kafka/consumer.go:134","message":"started consuming partition","partition":0}
[sarama] Closed connection to broker kafka-0.kafka-headless.spectacular-nativity-55-2024-1-9.svc.cluster.local:29094
[sarama] client/metadata fetching metadata for all topics from broker kafka-all-broker:29094
[sarama] client/metadata fetching metadata for [servie-name.topic-name.v1.dev] from broker kafka-all-broker:29094
[sarama] client/brokers registered new broker #0 at kafka-0.kafka-headless.spectacular-nativity-55-2024-1-9.svc.cluster.local:29094
[sarama] client/metadata fetching metadata for all topics from broker kafka-all-broker:29094
{"level":"error","ts":"2024-01-09T12:43:36.708Z","caller":"runtime/asm_amd64.s:1594","message":"got error","error":"kafka: error while consuming servie-name.topic-name.v1.dev/0: kafka: broker not connected"}
[sarama] client/brokers registered new broker #0 at kafka-0.kafka-headless.spectacular-nativity-55-2024-1-9.svc.cluster.local:29094
[sarama] Connected to broker at kafka-0.kafka-headless.spectacular-nativity-55-2024-1-9.svc.cluster.local:29094 (registered as #0)
[sarama] Connected to broker at kafka-0.kafka-headless.spectacular-nativity-55-2024-1-9.svc.cluster.local:29094 (registered as #0)
[sarama] client/metadata fetching metadata for [servie-name.topic-name.v1.dev] from broker kafka-0.kafka-headless.spectacular-nativity-55-2024-1-9.svc.cluster.local:29094
[sarama] Closed connection to broker kafka-0.kafka-headless.spectacular-nativity-55-2024-1-9.svc.cluster.local:29094
[sarama] client/metadata fetching metadata for all topics from broker kafka-all-broker:29094
[sarama] client/metadata fetching metadata for [servie-name.topic-name.v1.dev] from broker kafka-all-broker:29094
[sarama] client/brokers registered new broker #0 at kafka-0.kafka-headless.spectacular-nativity-55-2024-1-9.svc.cluster.local:29094
[sarama] Closed connection to broker kafka-0.kafka-headless.spectacular-nativity-55-2024-1-9.svc.cluster.local:29094
[sarama] client/metadata fetching metadata for all topics from broker kafka-all-broker:29094
[sarama] client/brokers registered new broker #0 at kafka-0.kafka-headless.spectacular-nativity-55-2024-1-9.svc.cluster.local:29094
{"level":"error","ts":"2024-01-09T12:43:46.827Z","caller":"runtime/asm_amd64.s:1594","message":"got error","error":"kafka: error while consuming servie-name.topic-name.v1.dev/0: kafka: broker not connected"}
[sarama] Connected to broker at kafka-0.kafka-headless.spectacular-nativity-55-2024-1-9.svc.cluster.local:29094 (registered as #0)
[sarama] Connected to broker at kafka-0.kafka-headless.spectacular-nativity-55-2024-1-9.svc.cluster.local:29094 (registered as #0)
[sarama] client/metadata fetching metadata for [servie-name.topic-name.v1.dev] from broker kafka-0.kafka-headless.spectacular-nativity-55-2024-1-9.svc.cluster.local:29094
[sarama] Closed connection to broker kafka-0.kafka-headless.spectacular-nativity-55-2024-1-9.svc.cluster.local:29094
[sarama] client/metadata fetching metadata for all topics from broker kafka-all-broker:29094
[sarama] client/metadata fetching metadata for [servie-name.topic-name.v1.dev] from broker kafka-all-broker:29094
[sarama] client/brokers registered new broker #0 at kafka-0.kafka-headless.spectacular-nativity-55-2024-1-9.svc.cluster.local:29094
[sarama] Closed connection to broker kafka-0.kafka-headless.spectacular-nativity-55-2024-1-9.svc.cluster.local:29094
[sarama] client/metadata fetching metadata for all topics from broker kafka-all-broker:29094
{"level":"error","ts":"2024-01-09T12:43:56.445Z","caller":"runtime/asm_amd64.s:1594","message":"got error","error":"kafka: error while consuming servie-name.topic-name.v1.dev/0: kafka: broker not connected"}
[sarama] client/brokers registered new broker #0 at kafka-0.kafka-headless.spectacular-nativity-55-2024-1-9.svc.cluster.local:29094
[sarama] Connected to broker at kafka-0.kafka-headless.spectacular-nativity-55-2024-1-9.svc.cluster.local:29094 (registered as #0)
[sarama] Connected to broker at kafka-0.kafka-headless.spectacular-nativity-55-2024-1-9.svc.cluster.local:29094 (registered as #0)

Logs from local machine:

{"level":"info","ts":"2024-01-09T14:51:56.439+0200","caller":"kafka/consumer.go:38","message":"Kafka certificates initialization completed"}
[sarama] Initializing new client
[sarama] client/metadata fetching metadata for all topics from broker kafka-all-broker.spectacular-nativity-55-2024-1-9:29094
[sarama] Connected to broker at kafka-all-broker.spectacular-nativity-55-2024-1-9:29094 (unregistered)
[sarama] client/brokers registered new broker #0 at kafka-0.kafka-headless.spectacular-nativity-55-2024-1-9.svc.cluster.local:29094
[sarama] Successfully initialized new client
{"level":"info","ts":"2024-01-09T14:51:56.715+0200","caller":"kafka/consumer.go:134","message":"starting consuming partitions","topic_name":"servie-name.topic-name.v1.dev"}
[sarama] Connected to broker at kafka-0.kafka-headless.spectacular-nativity-55-2024-1-9.svc.cluster.local:29094 (registered as #0)
{"level":"info","ts":"2024-01-09T14:51:57.065+0200","caller":"kafka/consumer.go:134","message":"started consuming partition","partition":0}
ovdienkonikolai commented 9 months ago

I've tried again to get deep into this problem and found the root cause - Client.RefreshController().

Under the hood, we're using client.RefreshController as a Ping() in our HC procedure. The HC service calls it every 10 seconds.

So, it seems like the issue is now resolved.

@dnwe, could you please add logging to Client.RefreshController()?

@hanyuancheung, FYI.

abit2 commented 5 months ago

hi! Sorry i couldn't understand the solution you mentioned above. Can you please elaborate how you fixed this issue? I am facing the same issue. 🙇🏼 @ovdienkonikolai

I've tried again to get deep into this problem and found the root cause - Client.RefreshController().

Under the hood, we're using client.RefreshController as a Ping() in our HC procedure. The HC service calls it every 10 seconds.

So, it seems like the issue is now resolved.

@dnwe, could you please add logging to Client.RefreshController()?

@hanyuancheung, FYI.

ovdienkonikolai commented 5 months ago

@abit2 in my case the root cause was a call of Client.RefreshController(), this method reset the connection under the hood.