Closed sitano closed 1 year ago
Can you provide code sample to reproduce the issue?
@burdiyan ok, I will try to provide something reproducible
@burdiyan https://github.com/sitano/sarama_close_bug
$ ... run Kafka locally with docker with an image like https://hub.docker.com/r/wurstmeister/kafka/
$ ... expose 9092 port to the localhost
$ env GO111MODULE=on go build -race && ./sarama_close_bug -brokers 127.0.0.1:9092
2019/06/05 15:01:47 Starting a new Sarama consumer
2019/06/05 15:01:47 client connected
2019/06/05 15:01:47 new topic with single partition = 5a5fb18d-343e-4bf8-9dea-886729a6b28e
2019/06/05 15:01:47 new group = 196cff9d-c66b-4666-a76a-0d93047500e7
2019/06/05 15:01:47 starting 1
2019/06/05 15:01:47 starting 2
2019/06/05 15:01:47 consume at 1
2019/06/05 15:01:47 consume at 2
2019/06/05 15:01:48 consumer setup 1
2019/06/05 15:01:48 consumer cleanup 1
2019/06/05 15:01:48 consume at 1
2019/06/05 15:01:48 consumer setup 2
2019/06/05 15:01:48 consumer setup 1
2019/06/05 15:01:48 Sarama consumers up and running!...
2019/06/05 15:01:48 trying to close cs... 2
^C
consumer group never finishes...
Hi,@sitano,have you solved this problem?
@bailu1901 yes, but with a workaround
@sitano Would you mind posting a synopsis of your workaround for others experiencing this issue while a perm-fix is looked into? The inability to close these consumer groups is causing some serious memory-leak headaches.
@PapayaJuice yes, but the gods of concurrent programming will not praise me. The main idea is to ignore happening data race in a bad case.
done := channel
go func() {
for not_shutdown {
cs.Consume()
}
cleanup()
close(done)
}()
CloseConsumerGroup(cs, 10 * time.Second, done)
kafkaConsumeGroupClient.Close()
func CloseConsumerGroup(cs sarama.ConsumerGroup, timeout time.Duration, wait ...<-chan struct{}) {
done := make(chan struct{})
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
// if consumer group will try to close shutdown first, it may
// hang on waiting for a consumer group session to complete
// when there were no partitions assigned to this node. in that
// case s.cs.Close() will hang until cluster re-balance will
// happen or disconnect. if that is a case, the cs Close()
// invocation will be a data race on closed client, but at least
// it will finish.
go func() {
if err := cs.Close(); err != nil {
log.Logger().WithError(err).Info("close consumer group")
}
close(done)
}()
select {
case <-ctx.Done():
// consumer did not exit
log.Logger().WithError(context.DeadlineExceeded).Info("close consumer group")
case <-done:
// cs.Close() finished successfully,
// consume exited and closed `done`
for _, ch := range wait {
select {
case <-ctx.Done():
// consumer did not exit
log.Logger().WithError(context.DeadlineExceeded).Info("close consumer group: wait external chan")
return
case <-ch:
// finished
}
}
}
}
The main idea behind that code is that it spawns a goroutine which tries to close a consumer group. If it deadlocks, the client which is owned by the CS will be closed forcibly in a goroutine which called CloseConsumerGroup
. In a good case, the close of the CS client has a happens-before relationship set by the waiting in CloseConsumerGroup
. In the bad case, it will be closed with a data race. But whatever, the cs.Close()
would not block an execution thread and shutdown will continue.
The wait channels in arguments are required for this function to wait for finishing of the goroutine which does the consumer group session loop. When cs.Consume() finishes this goroutine may have a desire to do some cleanup.
@burdiyan I have the same problem. Is there a better solution?
check https://github.com/sitano/sarama_close_bug to reproduce the error
Hi, do you already have a fix for this? we also suffer this problem. if we have more consumers than partitions. the close will hang forever.
Hi,
I ran into this exact problem as well. The issue was a bug in my implementation of
type ConsumerGroupHandler interface {
// Setup is run at the beginning of a new session, before ConsumeClaim.
Setup(ConsumerGroupSession) error
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
// but before the offsets are committed for the very last time.
Cleanup(ConsumerGroupSession) error
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// Once the Messages() channel is closed, the Handler must finish its processing
// loop and exit.
ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
}
Specifically
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// Once the Messages() channel is closed, the Handler must finish its processing
// loop and exit.
ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
Due to a race condition, my ConsumeClaim function would not finish its processing loop as required by the contract.
As a result, the close also got stuck trying to acquire a lock here: github.com/Shopify/sarama/consumer_group.go:121
// Close implements ConsumerGroup.
func (c *consumerGroup) Close() (err error) {
c.closeOnce.Do(func() {
close(c.closed)
c.lock.Lock() <- Can't acquire lock
Once I resolved the race condition in my code that locked ConsumeClaim
, the issue was resolved
Maybe that helps
Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the master branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.
I'm seeing this as well, I think. The examples that I followed implement ConsumeClaim() like:
for message := range claim.Messages() {
...
and there seems to be a race that the closer loses, so the channel never gets closed and everything hangs.
I'm going to try implementing a version that selects on claim.Messages() and ctx.Done() to see if that helps at all.
Although I'm calling client.Close() after cancelling the context. Not sure if that's a supported flow.
Never mind. Unrelated deadlock in my message handler. Sorry for the noise.
the root cause is that when the consumer got no claims , the session cant exit , and hold the consumerGroup.lock , we can just panic when got no claims ,
we are facing the same issue in sarama@1.38.1 @joshua0x @dnwe while calling consumerGroup.Close()
creates a huge leaks in our apps, as we are creating a lot consumerGroup
for consuming kafka topics based on the Topics
logs attached from debug/pprof/goroutine?debug=1
1 @ 0x104159d 0x104162a 0x10528dd 0x106f665 0x1081631 0x1081391 0x15d2fb5 0x15cea17 0x1081af8 0x1081985 0x15ce97c 0x1665214 0x1667105 0x16772fc 0x1073c01
# 0x106f664 sync.runtime_SemacquireMutex+0x24 /usr/local/Cellar/go/1.19.5/libexec/src/runtime/sema.go:77
# 0x1081630 sync.(*Mutex).lockSlow+0x270 /usr/local/Cellar/go/1.19.5/libexec/src/sync/mutex.go:171
# 0x1081390 sync.(*Mutex).Lock+0x50 /usr/local/Cellar/go/1.19.5/libexec/src/sync/mutex.go:90
# 0x15d2fb4 github.com/Shopify/sarama.(*consumerGroup).leave+0x54 /Users/sayanchatterjee/go/pkg/mod/github.com/!shopify/sarama@v1.38.1/consumer_group.go:545
# 0x15cea16 github.com/Shopify/sarama.(*consumerGroup).Close.func1+0x56 /Users/sayanchatterjee/go/pkg/mod/github.com/!shopify/sarama@v1.38.1/consumer_group.go:159
# 0x1081af7 sync.(*Once).doSlow+0x137 /usr/local/Cellar/go/1.19.5/libexec/src/sync/once.go:74
# 0x1081984 sync.(*Once).Do+0x44 /usr/local/Cellar/go/1.19.5/libexec/src/sync/once.go:65
# 0x15ce97b github.com/Shopify/sarama.(*consumerGroup).Close+0x7b /Users/sayanchatterjee/go/pkg/mod/github.com/!shopify/sarama@v1.38.1/consumer_group.go:155
# 0x1665213 github.ibm.com/BSS/golang-pulsar/pkg/worker.(*consumerGroup).Close+0x73 /Users/sayanchatterjee/Documents/codebase/BSS/hyperwarp/golang-pulsar/pkg/worker/worker.go:96
# 0x1667104 github.ibm.com/BSS/golang-pulsar/pkg/worker.(*PulsarWorker).Stop+0x204 /Users/sayanchatterjee/Documents/codebase/BSS/hyperwarp/golang-pulsar/pkg/worker/worker.go:347
# 0x16772fb github.ibm.com/BSS/golang-pulsar/pkg/manager.(*Manager).Stop.func1+0x3b /Users/sayanchatterjee/Documents/codebase/BSS/hyperwarp/golang-pulsar/pkg/manager/manager.go:462
1 @ 0x104159d 0x1051bc9 0x15d6c73 0x1073c01
# 0x15d6c72 github.com/Shopify/sarama.(*consumerGroupSession).heartbeatLoop+0x792 /Users/sayanchatterjee/go/pkg/mod/github.com/!shopify/sarama@v1.38.1/consumer_group.go:975
1 @ 0x104159d 0x1051bc9 0x1602a96 0x1624e9f 0x1073c01
# 0x1602a95 github.com/Shopify/sarama.(*offsetManager).mainLoop+0x1d5 /Users/sayanchatterjee/go/pkg/mod/github.com/!shopify/sarama@v1.38.1/offset_manager.go:242
# 0x1624e9e github.com/Shopify/sarama.withRecover+0x3e /Users/sayanchatterjee/go/pkg/mod/github.com/!shopify/sarama@v1.38.1/utils.go:43
1 @ 0x104159d 0x1070a05 0x1664be5 0x166d6f5 0x166b0d4 0x1073c01
# 0x1070a04 time.Sleep+0x124 /usr/local/Cellar/go/1.19.5/libexec/src/runtime/time.go:195
# 0x1664be4 github.ibm.com/BSS/exponential-goback.Wait+0xa4 /Users/sayanchatterjee/go/pkg/mod/github.ibm.com/!b!s!s/exponential-goback@v0.0.0-20220708152916-efd285ce2e13/goback.go:117
# 0x166d6f4 github.ibm.com/BSS/golang-pulsar/pkg/worker.(*PulsarWorker).send+0x25d4 /Users/sayanchatterjee/Documents/codebase/BSS/hyperwarp/golang-pulsar/pkg/worker/worker.go:891
# 0x166b0d3 github.ibm.com/BSS/golang-pulsar/pkg/worker.(*PulsarWorker).processBatch+0x213 /Users/sayanchatterjee/Documents/codebase/BSS/hyperwarp/golang-pulsar/pkg/worker/worker.go:690
@sitano I took a look at this with the latest sarama and fixed up the example to check channel closure and session done correctly and didn't seem to be able to reproduce the bug
Here's the changes I made: https://github.com/sitano/sarama_close_bug/pull/1
Can you confirm if this is still a problem for you?
@dnwe Hi! I am not working on that one at the moment. Don't have time at the moment to reproduce it. If you feel its no longer an issue, feel free to close.
@sitano ok no worries. Thanks I'll close this as believe to have been fixed.
Versions
Sarama Version: d84c59b2a2d87f185d91a1cc426a1f4d4e9365109fe0d96cbd2404c3a57c365a / release v1.22.0 Kafka Version: kafka_2.12-2.1.0.jar Go Version: go version go1.12.1 linux/amd64
Configuration
What configuration values are you using for Sarama and Kafka?
Kafka: a single topic with only 1 partition and 2 consumers in a single consumer group
Logs
Problem Description
Consumer group call to
Close()
hangs (or deadlocks) on acquiring a mutex, becauseconsume
cannot finish when it is waiting for a session to complete but there were no particions assigned to this consumer instance.