segmentio / kafka-go

Kafka library in Go
MIT License
7.59k stars 789 forks source link

Commit offsets with consumer group? #1079

Open bgranvea opened 1 year ago

bgranvea commented 1 year ago

I'm using the consumer group API and this method to commit offsets:

// CommitOffsets commits the provided topic+partition+offset combos to the
// consumer group coordinator.  This can be used to reset the consumer to
// explicit offsets.
func (g *Generation) CommitOffsets(offsets map[string]map[int]int64) error {

Now I've been in the case where my Kafka brokers restarted and my code keeped trying calling CommitOffsets in a retry loop that I coded. The problem is that even when the brokers came back, CommitOffsets still failed with this error:

write tcp 10.68.48.18:32132->10.60.64.54:30093: use of closed network connection

It seems the connection is never restored in consumer group in case of problem. Anyway I suppose that the generation cannot be used anymore but my retry loop only stops on a "fatal" error like "unknown member id" and here I get a network error that seems to be "retryable".

Do you have suggestions on how to manage this kind of case?

pltanton commented 2 months ago

Got same issue, did you find any solution so far?