IBM / sarama

Sarama is a Go library for Apache Kafka.
MIT License
11.57k stars 1.76k forks source link

Consumers stop consuming messages after brokers restart #1407

Closed dhbarman closed 1 year ago

dhbarman commented 5 years ago
Versions

Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly. Sarama Version: v1.19.0 Kafka Version: 2.0.0 Go Version: 1.12.4

Configuration

config := sarama.NewConfig() config.Consumer.Return.Errors = true config.Version = sarama.V2_0_0_0 config.Consumer.Offsets.Initial = sarama.OffsetNewest

Everytime Kafka client/consumer restart, we change the consumer group.

Logs

level=error ts=2019-06-20T22:19:46.917Z caller=kafka.go:293 msg="Error received" err="kafka: error while consuming device_telemetry/1: kafka server: The provided member is not known in the current generation." : level=error ts=2019-06-20T22:19:49.849Z caller=kafka.go:293 msg="Error received" err="kafka: error while consuming device_telemetry/27: dial tcp 10.192.28.67:9092: connect: connection refused" : level=error ts=2019-06-20T22:27:21.009Z caller=kafka.go:145 err="kafka server: Request was for a consumer group that is not coordinated by this broker." : level=error ts=2019-06-20T21:57:10.157Z caller=kafka.go:293 msg="Error received" err="kafka: error while consuming device_telemetry_2/36: EOF : level=error ts=2019-06-20T08:45:24.734Z caller=kafka.go:293 msg="Error received" err="kafka: error while consuming device_telemetry/23: kafka server: Request exceeded the user-specified time limit in the request."

Problem Description

We restarted broker nodes for OS upgrade. We started seeing the following errors. It did not happen all the time but enough number of times. Whenever we observed error logs as above, Kafka client/consumers were not consuming messages.

ganesh-karthick commented 5 years ago

@dhbarman with sarama version v1.22.1 , I have noticed consume method exits on broker restart. One could write a sticky loop to reconnect.

dhbarman commented 5 years ago

@ganesh-karthick Is there any bug report to track this issue ? By reconnect, you mean - re-invoke client.Consume() ?

ganesh-karthick commented 5 years ago

@dhbarman not exactly, close existing connection and re init the connection afresh. Below is a pseudo code

for unlessCancelled {
client,err: = NewConsumerGroup(params...)
if err!=nil{
continue
}
err := Consume()
client.Close()
}
ghost commented 4 years ago

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the master branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

d1egoaz commented 4 years ago

@dhbarman not sure if you're using consumer groups, but we added anote regarding how .Consume needs to be on a loop, basically as the consumer group will need to get the new claims after the rebalance

https://github.com/Shopify/sarama/pull/1602

ghost commented 3 years ago

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the master branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

dnwe commented 1 year ago

Fixed. by docs https://github.com/IBM/sarama/issues/1407#issuecomment-601442269