segmentio / kafka-go

Kafka library in Go
MIT License
7.53k stars 780 forks source link

Partition changes picked up but not subscribed to? #646

Open langesven opened 3 years ago

langesven commented 3 years ago

Describe the bug The library picks up that the amount of partitions on a topic have changed (via the PartitionWatcher) but the subsequent stop/join flow doesn't generate a new generation of the ConsumerGroup as such the consumer does not subscribe to the new partitions and thus doesn't consume any messages from them.

Kafka Version AWS MSK with kafka version 2.2.1

To Reproduce

Expected behavior I'd expect the partition watcher to always kick off a rebalance which then once the ConsumerGroup has done so will also consume messages from the new partitions it discovered on the topic.

Additional context We've observed this during a resizing of partitions on a few topics in our cluster. Applications were suddenly randomly skipping events with no clear reason why, no (visible) lag was building up but things did not go as expected. It later turned out that the ConsumerGroups simply didn't subscribe to the new partitions, at least not all of them.

I've spent a bit of time debugging this on our end and I'm at a loss by now, hence the issue. We were using kafka-go in version 0.3.10 originally but I've also verified the same behaviour still happens for us in version 0.4.15.

Let me show you a snippet of the logs of what I can see happening. This is the output of the client and this is right after I've resized the partitions on topics PartitionResize107 and PartitionResize108. We can see the partition watcher correctly recognized that the partitions on the two topics were changed and that a rebalance needs to happen. Then we can see that it rejoins the group twice (for each of the planned rebalances) and both times in the same generation, which is also the generation the ConsumerGroup was already in prior to the resize. As such the ConsumerGroup is not subscribed to the new partitions on my test topics and also completely ignores any messages sent to the topics. Once I restart the application the ConsumerGroup will join as generation 340 and will pick up the new partitions. If I then resize one of the topics again usually the same thing happens and it doesn't pick them up.

{"applicationLog":{"Topic":"PartitionResize107","file":"kafka-go@v0.4.15/consumergroup.go:482","function":"github.com/segmentio/kafka-go.(*Generation).partitionWatcher.func1.4","message":"Partition changes found, reblancing group: component-kafka-event-connector-js-example.","severity":"INFO","timestamp":"2021-04-28T06:35:32Z"}}
{"applicationLog":{"Topic":"PartitionResize107","file":"kafka-go@v0.4.15/consumergroup.go:458","function":"github.com/segmentio/kafka-go.(*Generation).partitionWatcher.func1.2","message":"stopped partition watcher for group, component-kafka-event-connector-js-example, topic PartitionResize107","severity":"INFO","timestamp":"2021-04-28T06:35:32Z"}}
{"applicationLog":{"Topic":"PartitionResize107","file":"kafka-go@v0.4.15/reader.go:264","function":"github.com/segmentio/kafka-go.(*Reader).commitLoop.func2","message":"stopped commit for group component-kafka-event-connector-js-example\n","severity":"INFO","timestamp":"2021-04-28T06:35:32Z"}}
{"applicationLog":{"Topic":"PartitionResize107","file":"kafka-go@v0.4.15/consumergroup.go:422","function":"github.com/segmentio/kafka-go.(*Generation).heartbeatLoop.func1.2","message":"stopped heartbeat for group component-kafka-event-connector-js-example\n","severity":"INFO","timestamp":"2021-04-28T06:35:32Z"}}
{"applicationLog":{"Topic":"PartitionResize108","file":"kafka-go@v0.4.15/consumergroup.go:482","function":"github.com/segmentio/kafka-go.(*Generation).partitionWatcher.func1.4","message":"Partition changes found, reblancing group: component-kafka-event-connector-js-example.","severity":"INFO","timestamp":"2021-04-28T06:35:32Z"}}
{"applicationLog":{"Topic":"PartitionResize108","file":"kafka-go@v0.4.15/consumergroup.go:458","function":"github.com/segmentio/kafka-go.(*Generation).partitionWatcher.func1.2","message":"stopped partition watcher for group, component-kafka-event-connector-js-example, topic PartitionResize108","severity":"INFO","timestamp":"2021-04-28T06:35:32Z"}}
{"applicationLog":{"Topic":"PartitionResize108","file":"kafka-go@v0.4.15/consumergroup.go:422","function":"github.com/segmentio/kafka-go.(*Generation).heartbeatLoop.func1.2","message":"stopped heartbeat for group component-kafka-event-connector-js-example\n","severity":"INFO","timestamp":"2021-04-28T06:35:32Z"}}
{"applicationLog":{"Topic":"PartitionResize108","file":"kafka-go@v0.4.15/reader.go:264","function":"github.com/segmentio/kafka-go.(*Reader).commitLoop.func2","message":"stopped commit for group component-kafka-event-connector-js-example\n","severity":"INFO","timestamp":"2021-04-28T06:35:32Z"}}
{"applicationLog":{"Topic":"PartitionResize107","file":"kafka-go@v0.4.15/consumergroup.go:897","function":"github.com/segmentio/kafka-go.(*ConsumerGroup).joinGroup.func1","message":"joined group component-kafka-event-connector-js-example as member sidecar@component-kafka-event-connector-js-example-86b7956d6-jw47d (github.com/segmentio/kafka-go)-4930fd9d-dc39-4ac9-b0de-b1e6df9fc1cb in generation 339","severity":"INFO","timestamp":"2021-04-28T06:35:34Z"}}
{"applicationLog":{"Topic":"PartitionResize107","file":"kafka-go@v0.4.15/consumergroup.go:918","function":"github.com/segmentio/kafka-go.(*ConsumerGroup).joinGroup.func3","message":"joinGroup succeeded for response, component-kafka-event-connector-js-example.  generationID=339, memberID=sidecar@component-kafka-event-connector-js-example-86b7956d6-jw47d (github.com/segmentio/kafka-go)-4930fd9d-dc39-4ac9-b0de-b1e6df9fc1cb","severity":"INFO","timestamp":"2021-04-28T06:35:34Z"}}
{"applicationLog":{"Topic":"PartitionResize107","file":"kafka-go@v0.4.15/consumergroup.go:752","function":"github.com/segmentio/kafka-go.(*ConsumerGroup).nextGeneration.func3","message":"Joined group component-kafka-event-connector-js-example as member sidecar@component-kafka-event-connector-js-example-86b7956d6-jw47d (github.com/segmentio/kafka-go)-4930fd9d-dc39-4ac9-b0de-b1e6df9fc1cb in generation 339","severity":"INFO","timestamp":"2021-04-28T06:35:34Z"}}
{"applicationLog":{"Topic":"PartitionResize107","file":"kafka-go@v0.4.15/consumergroup.go:1049","function":"github.com/segmentio/kafka-go.(*ConsumerGroup).syncGroup.func2","message":"sync group finished for group, component-kafka-event-connector-js-example","severity":"INFO","timestamp":"2021-04-28T06:35:34Z"}}
{"applicationLog":{"Topic":"PartitionResize107","file":"kafka-go@v0.4.15/reader.go:132","function":"github.com/segmentio/kafka-go.(*Reader).subscribe.func1","message":"subscribed to topics and partitions: map[{topic:PartitionResize107 partition:0}:1]","severity":"INFO","timestamp":"2021-04-28T06:35:34Z"}}
{"applicationLog":{"Topic":"PartitionResize107","file":"kafka-go@v0.4.15/reader.go:1225","function":"github.com/segmentio/kafka-go.(*reader).run.func1","message":"initializing kafka reader for partition 0 of PartitionResize107 starting at offset 1","severity":"INFO","timestamp":"2021-04-28T06:35:34Z"}}
{"applicationLog":{"Topic":"PartitionResize107","file":"kafka-go@v0.4.15/reader.go:261","function":"github.com/segmentio/kafka-go.(*Reader).commitLoop.func1","message":"started commit for group component-kafka-event-connector-js-example\n","severity":"INFO","timestamp":"2021-04-28T06:35:34Z"}}
{"applicationLog":{"Topic":"PartitionResize107","file":"kafka-go@v0.4.15/consumergroup.go:419","function":"github.com/segmentio/kafka-go.(*Generation).heartbeatLoop.func1.1","message":"started heartbeat for group, component-kafka-event-connector-js-example [3s]","severity":"INFO","timestamp":"2021-04-28T06:35:34Z"}}
{"applicationLog":{"Topic":"PartitionResize107","file":"kafka-go@v0.4.15/consumergroup.go:455","function":"github.com/segmentio/kafka-go.(*Generation).partitionWatcher.func1.1","message":"started partition watcher for group, component-kafka-event-connector-js-example, topic PartitionResize107 [5s]","severity":"INFO","timestamp":"2021-04-28T06:35:34Z"}}
{"applicationLog":{"Topic":"PartitionResize107","file":"kafka-go@v0.4.15/reader.go:1406","function":"github.com/segmentio/kafka-go.(*reader).initialize.func1","message":"the kafka reader for partition 0 of PartitionResize107 is seeking to offset 1","severity":"INFO","timestamp":"2021-04-28T06:35:34Z"}}
{"applicationLog":{"Topic":"PartitionResize108","file":"kafka-go@v0.4.15/consumergroup.go:897","function":"github.com/segmentio/kafka-go.(*ConsumerGroup).joinGroup.func1","message":"joined group component-kafka-event-connector-js-example as member sidecar@component-kafka-event-connector-js-example-86b7956d6-jw47d (github.com/segmentio/kafka-go)-555f4dfa-0082-4d94-bf4e-8bcb84d9c01c in generation 339","severity":"INFO","timestamp":"2021-04-28T06:35:34Z"}}
{"applicationLog":{"Topic":"PartitionResize108","file":"kafka-go@v0.4.15/consumergroup.go:918","function":"github.com/segmentio/kafka-go.(*ConsumerGroup).joinGroup.func3","message":"joinGroup succeeded for response, component-kafka-event-connector-js-example.  generationID=339, memberID=sidecar@component-kafka-event-connector-js-example-86b7956d6-jw47d (github.com/segmentio/kafka-go)-555f4dfa-0082-4d94-bf4e-8bcb84d9c01c","severity":"INFO","timestamp":"2021-04-28T06:35:34Z"}}
{"applicationLog":{"Topic":"PartitionResize108","file":"kafka-go@v0.4.15/consumergroup.go:752","function":"github.com/segmentio/kafka-go.(*ConsumerGroup).nextGeneration.func3","message":"Joined group component-kafka-event-connector-js-example as member sidecar@component-kafka-event-connector-js-example-86b7956d6-jw47d (github.com/segmentio/kafka-go)-555f4dfa-0082-4d94-bf4e-8bcb84d9c01c in generation 339","severity":"INFO","timestamp":"2021-04-28T06:35:34Z"}}
{"applicationLog":{"Topic":"PartitionResize108","file":"kafka-go@v0.4.15/consumergroup.go:1049","function":"github.com/segmentio/kafka-go.(*ConsumerGroup).syncGroup.func2","message":"sync group finished for group, component-kafka-event-connector-js-example","severity":"INFO","timestamp":"2021-04-28T06:35:34Z"}}
{"applicationLog":{"Topic":"PartitionResize108","file":"kafka-go@v0.4.15/reader.go:132","function":"github.com/segmentio/kafka-go.(*Reader).subscribe.func1","message":"subscribed to topics and partitions: map[{topic:PartitionResize108 partition:0}:1]","severity":"INFO","timestamp":"2021-04-28T06:35:34Z"}}
{"applicationLog":{"Topic":"PartitionResize108","file":"kafka-go@v0.4.15/consumergroup.go:419","function":"github.com/segmentio/kafka-go.(*Generation).heartbeatLoop.func1.1","message":"started heartbeat for group, component-kafka-event-connector-js-example [3s]","severity":"INFO","timestamp":"2021-04-28T06:35:34Z"}}
{"applicationLog":{"Topic":"PartitionResize108","file":"kafka-go@v0.4.15/consumergroup.go:455","function":"github.com/segmentio/kafka-go.(*Generation).partitionWatcher.func1.1","message":"started partition watcher for group, component-kafka-event-connector-js-example, topic PartitionResize108 [5s]","severity":"INFO","timestamp":"2021-04-28T06:35:34Z"}}
{"applicationLog":{"Topic":"PartitionResize108","file":"kafka-go@v0.4.15/reader.go:1225","function":"github.com/segmentio/kafka-go.(*reader).run.func1","message":"initializing kafka reader for partition 0 of PartitionResize108 starting at offset 1","severity":"INFO","timestamp":"2021-04-28T06:35:34Z"}}
{"applicationLog":{"Topic":"PartitionResize108","file":"kafka-go@v0.4.15/reader.go:261","function":"github.com/segmentio/kafka-go.(*Reader).commitLoop.func1","message":"started commit for group component-kafka-event-connector-js-example\n","severity":"INFO","timestamp":"2021-04-28T06:35:34Z"}}
{"applicationLog":{"Topic":"PartitionResize108","file":"kafka-go@v0.4.15/reader.go:1406","function":"github.com/segmentio/kafka-go.(*reader).initialize.func1","message":"the kafka reader for partition 0 of PartitionResize108 is seeking to offset 1","severity":"INFO","timestamp":"2021-04-28T06:35:34Z"}}

My reader looks like this

    c.reader = kafka.NewReader(kafka.ReaderConfig{
        Brokers:               conf.Brokers,
        GroupID:               conf.GroupID,
        Topic:                 topic,
        MaxWait:               3 * time.Second,    // maximum time to wait for new messages
        MinBytes:              1,                  // minimum message size
        MaxBytes:              10e6,               // maximum message size 1 MByte (= maximum size Kafka can handle)
        RetentionTime:         time.Hour * 24 * 7, // keep ConsumerGroup for 1 week
        WatchPartitionChanges: true,               // watch for changes to the partitions (e.g. increase of partitions)
    })

and it's basically running in a loop to fetch stuff

func (c *consumer) StartFetching(ctx context.Context) {
    logger := log.WithFields(log.Fields{
        "GroupId": c.conf.GroupID,
    })
    defer func() {
        if err := c.client.Close(); err != nil {
            logger.Warn("Error Closing Consumer Client")
        }
    }()
    logger.Debug("Starting consumer client")
    for {
        select {
        case <-ctx.Done():
            return
        default:
        }
        if err := c.processMessage(ctx); err != nil {
            c.restartClient()
        }
    }
}

I've read most of the issues here that relate to partitions and couldn't find similar problems that weren't explained by "you forgot to turn the partition watcher on" so I wouldn't be completely surprised if we're somehow using something wrong? Unless resizing partitions is such a rare thing that people don't run into this frequently. I've been wondering if this maybe relates to the thing mentioned https://github.com/segmentio/kafka-go/blob/master/consumergroup.go#L341-L353 and in #357 and somehow our client implementation doesn't allow the ConsumerGroup context to end so that it never exits the current generation thus staying in it instead of triggering a rebalance?

Sometimes it randomly works, I can't reproduce it explicitly. Sometimes I can resize a partition, it's being picked up, a rebalance actually happens (can confirm this with the kafka broker logs as well that shows rebalance activity which is not the case for the stuff pasted above) and the client is then (as expected) subscribed to the new partitions. I can't explain when this happens, I can't explain why this happens :/

Kl1mn commented 3 years ago

Hi, i have the same problem, but with ConsumerGroup. The problem was solved by setting different IDs in them for different topics (e.g. PartitionResize107 and PartitionResize108). If you have the same GroupID for different topics, then try to set unique GroupIDs