segmentio / kafka-go

Kafka library in Go
MIT License
7.53k stars 780 forks source link

Messages not consumed when reader connects during topic creation #585

Open jizi opened 3 years ago

jizi commented 3 years ago

Describe the bug I am using Reader with GroupID defined to read a kafka topic. The topic is being created in parallel to reader start. Sometimes it happens reader joins consumer group when the topic creation on kafka side is not completed yet and is assigned with no partitions. In this case the reader never starts consuming the topic. When I change the topic config (e.g. change of number of partitions) partition watcher correctly triggers rebalance and reader starts to read.

Kafka Version 2.7.0 (locally using bitnami/kafka:2-debian-10)

To Reproduce

  1. Start kafka consumer using Reader
    kafka.ReaderConfig{
        Brokers:        []string{"127.0.0.1:9092"},
        GroupID:        "regatta-test-local",
        Topic:          "regatta-test",
        Dialer:         dialer,
        MaxBytes:       10e6, // 10MB
        MaxWait:        3 * time.Second,
        CommitInterval: 1 * time.Second,
        RetentionTime:  7 * 24 * time.Hour,
        WatchPartitionChanges: true,
    }
  2. Reader is not able to connect to kafka because it is not started yet. Logs:
    
    2021-01-12T10:05:06.924+0100    DEBUG   consumer:regatta-test   kafka-go@v0.4.8/logger.go:17    entering loop for consumer group, regatta-test-local

2021-01-12T10:05:06.925+0100 ERROR consumer:regatta-test kafka-go@v0.4.8/logger.go:17 Unable to establish connection to consumer group coordinator for group regatta-test-local: dial tcp 127.0.0.1:9092: connect: connection refused github.com/segmentio/kafka-go.LoggerFunc.Printf /Users/jizi/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.8/logger.go:17 github.com/segmentio/kafka-go.(ConsumerGroup).nextGeneration.func1 /Users/jizi/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.8/consumergroup.go:638 github.com/segmentio/kafka-go.(ConsumerGroup).withErrorLogger /Users/jizi/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.8/consumergroup.go:1090 github.com/segmentio/kafka-go.(ConsumerGroup).nextGeneration /Users/jizi/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.8/consumergroup.go:637 github.com/segmentio/kafka-go.(ConsumerGroup).run /Users/jizi/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.8/consumergroup.go:579 github.com/segmentio/kafka-go.NewConsumerGroup.func1 /Users/jizi/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.8/consumergroup.go:517 2021-01-12T10:05:06.925+0100 ERROR consumer:regatta-test kafka-go@v0.4.8/logger.go:17 dial tcp 127.0.0.1:9092: connect: connection refused github.com/segmentio/kafka-go.LoggerFunc.Printf /Users/jizi/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.8/logger.go:17 github.com/segmentio/kafka-go.(Reader).run.func2 /Users/jizi/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.8/reader.go:263 github.com/segmentio/kafka-go.(Reader).withErrorLogger /Users/jizi/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.8/reader.go:1021 github.com/segmentio/kafka-go.(*Reader).run /Users/jizi/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.8/reader.go:262

3. Start empty kafka.
4. Reader can't read `regatta-test` topic. Logs: 

2021-01-12T10:05:32.218+0100 ERROR consumer:regatta-test kafka-go@v0.4.8/logger.go:17 Unable to establish connection to consumer group coordinator for group regatta-test-local: [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active github.com/segmentio/kafka-go.LoggerFunc.Printf /Users/jizi/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.8/logger.go:17 github.com/segmentio/kafka-go.(ConsumerGroup).nextGeneration.func1 /Users/jizi/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.8/consumergroup.go:638 github.com/segmentio/kafka-go.(ConsumerGroup).withErrorLogger /Users/jizi/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.8/consumergroup.go:1090 github.com/segmentio/kafka-go.(ConsumerGroup).nextGeneration /Users/jizi/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.8/consumergroup.go:637 github.com/segmentio/kafka-go.(ConsumerGroup).run /Users/jizi/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.8/consumergroup.go:579 github.com/segmentio/kafka-go.NewConsumerGroup.func1 /Users/jizi/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.8/consumergroup.go:517 2021-01-12T10:05:37.312+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 joined group regatta-test-local as member regatta@Jiri-Zizkovsky.local (github.com/segmentio/kafka-go)-41f1f0bc-e20a-40f6-8370-6b362813e55e in generation 1 2021-01-12T10:05:37.312+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 selected as leader for group, regatta-test-local

2021-01-12T10:05:37.331+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 using 'range' balancer to assign group, regatta-test-local 2021-01-12T10:05:37.331+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 found member: regatta@Jiri-Zizkovsky.local (github.com/segmentio/kafka-go)-41f1f0bc-e20a-40f6-8370-6b362813e55e/[]byte(nil) 2021-01-12T10:05:37.331+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 joinGroup succeeded for response, regatta-test-local. generationID=1, memberID=regatta@Jiri-Zizkovsky.local (github.com/segmentio/kafka-go)-41f1f0bc-e20a-40f6-8370-6b362813e55e 2021-01-12T10:05:37.331+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 Joined group regatta-test-local as member regatta@Jiri-Zizkovsky.local (github.com/segmentio/kafka-go)-41f1f0bc-e20a-40f6-8370-6b362813e55e in generation 1 2021-01-12T10:05:37.331+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 Syncing 1 assignments for generation 1 as member regatta@Jiri-Zizkovsky.local (github.com/segmentio/kafka-go)-41f1f0bc-e20a-40f6-8370-6b362813e55e 2021-01-12T10:05:37.438+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 received empty assignments for group, regatta-test-local as member regatta@Jiri-Zizkovsky.local (github.com/segmentio/kafka-go)-41f1f0bc-e20a-40f6-8370-6b362813e55e for generation 1 2021-01-12T10:05:37.438+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 sync group finished for group, regatta-test-local 2021-01-12T10:05:37.446+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 subscribed to partitions: map[] 2021-01-12T10:05:37.447+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 started commit for group regatta-test-local

2021-01-12T10:05:37.447+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 started partition watcher for group, regatta-test-local, topic regatta-test [5s] 2021-01-12T10:05:37.453+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 started heartbeat for group, regatta-test-local [3s] 2021-01-12T10:05:37.466+0100 ERROR consumer:regatta-test kafka-go@v0.4.8/logger.go:17 Problem getting partitions during startup, [3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker , Returning and setting up nextGeneration github.com/segmentio/kafka-go.LoggerFunc.Printf /Users/jizi/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.8/logger.go:17 github.com/segmentio/kafka-go.(Generation).partitionWatcher.func1.3 /Users/jizi/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.8/consumergroup.go:448 github.com/segmentio/kafka-go.(ConsumerGroup).withErrorLogger /Users/jizi/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.8/consumergroup.go:1090 github.com/segmentio/kafka-go.(Generation).partitionWatcher.func1 /Users/jizi/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.8/consumergroup.go:447 github.com/segmentio/kafka-go.(Generation).Start.func1 /Users/jizi/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.8/consumergroup.go:338 2021-01-12T10:05:37.466+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 stopped partition watcher for group, regatta-test-local, topic regatta-test 2021-01-12T10:05:37.467+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 stopped heartbeat for group regatta-test-local

5. The above error keeps occurring permanently.
6. Create topic `regatta-test` in kafka.
7. Error `Unknown Topic Or Partition` no longer occurs but it is stucked with empty assignments:

2021-01-12T10:13:35.777+0100 ERROR consumer:regatta-test kafka-go@v0.4.8/logger.go:17 Problem getting partitions during startup, [3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker , Returning and setting up nextGeneration github.com/segmentio/kafka-go.LoggerFunc.Printf /Users/jizi/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.8/logger.go:17 github.com/segmentio/kafka-go.(Generation).partitionWatcher.func1.3 /Users/jizi/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.8/consumergroup.go:448 github.com/segmentio/kafka-go.(ConsumerGroup).withErrorLogger /Users/jizi/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.8/consumergroup.go:1090 github.com/segmentio/kafka-go.(Generation).partitionWatcher.func1 /Users/jizi/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.8/consumergroup.go:447 github.com/segmentio/kafka-go.(Generation).Start.func1 /Users/jizi/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.8/consumergroup.go:338 2021-01-12T10:13:35.777+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 stopped partition watcher for group, regatta-test-local, topic regatta-test 2021-01-12T10:13:35.778+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 stopped commit for group regatta-test-local

2021-01-12T10:13:35.778+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 stopped heartbeat for group regatta-test-local

2021-01-12T10:13:35.807+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 joined group regatta-test-local as member regatta@Jiri-Zizkovsky.local (github.com/segmentio/kafka-go)-65631494-c7b4-4be6-be5f-076e946fd33e in generation 355 2021-01-12T10:13:35.807+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 selected as leader for group, regatta-test-local

2021-01-12T10:13:35.825+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 using 'range' balancer to assign group, regatta-test-local 2021-01-12T10:13:35.825+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 found member: regatta@Jiri-Zizkovsky.local (github.com/segmentio/kafka-go)-65631494-c7b4-4be6-be5f-076e946fd33e/[]byte(nil) 2021-01-12T10:13:35.825+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 joinGroup succeeded for response, regatta-test-local. generationID=355, memberID=regatta@Jiri-Zizkovsky.local (github.com/segmentio/kafka-go)-65631494-c7b4-4be6-be5f-076e946fd33e 2021-01-12T10:13:35.825+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 Joined group regatta-test-local as member regatta@Jiri-Zizkovsky.local (github.com/segmentio/kafka-go)-65631494-c7b4-4be6-be5f-076e946fd33e in generation 355 2021-01-12T10:13:35.825+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 Syncing 1 assignments for generation 355 as member regatta@Jiri-Zizkovsky.local (github.com/segmentio/kafka-go)-65631494-c7b4-4be6-be5f-076e946fd33e 2021-01-12T10:13:35.833+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 received empty assignments for group, regatta-test-local as member regatta@Jiri-Zizkovsky.local (github.com/segmentio/kafka-go)-65631494-c7b4-4be6-be5f-076e946fd33e for generation 355 2021-01-12T10:13:35.833+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 sync group finished for group, regatta-test-local 2021-01-12T10:13:35.842+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 started heartbeat for group, regatta-test-local [3s] 2021-01-12T10:13:35.842+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 subscribed to partitions: map[] 2021-01-12T10:13:35.842+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 started partition watcher for group, regatta-test-local, topic regatta-test [5s] 2021-01-12T10:13:35.842+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 started commit for group regatta-test-local

8. Change topic config and the reader recovers. Logs:

2021-01-12T10:14:45.849+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 Partition changes found, reblancing group: regatta-test-local. 2021-01-12T10:14:45.849+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 stopped partition watcher for group, regatta-test-local, topic regatta-test 2021-01-12T10:14:45.849+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 stopped commit for group regatta-test-local

2021-01-12T10:14:45.849+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 stopped heartbeat for group regatta-test-local

2021-01-12T10:14:45.866+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 joined group regatta-test-local as member regatta@Jiri-Zizkovsky.local (github.com/segmentio/kafka-go)-65631494-c7b4-4be6-be5f-076e946fd33e in generation 356 2021-01-12T10:14:45.866+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 selected as leader for group, regatta-test-local

2021-01-12T10:14:45.869+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 using 'range' balancer to assign group, regatta-test-local 2021-01-12T10:14:45.869+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 found member: regatta@Jiri-Zizkovsky.local (github.com/segmentio/kafka-go)-65631494-c7b4-4be6-be5f-076e946fd33e/[]byte(nil) 2021-01-12T10:14:45.869+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 found topic/partition: regatta-test/0 2021-01-12T10:14:45.869+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 found topic/partition: regatta-test/1 2021-01-12T10:14:45.869+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 assigned member/topic/partitions regatta@Jiri-Zizkovsky.local (github.com/segmentio/kafka-go)-65631494-c7b4-4be6-be5f-076e946fd33e/regatta-test/[0 1] 2021-01-12T10:14:45.869+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 joinGroup succeeded for response, regatta-test-local. generationID=356, memberID=regatta@Jiri-Zizkovsky.local (github.com/segmentio/kafka-go)-65631494-c7b4-4be6-be5f-076e946fd33e 2021-01-12T10:14:45.869+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 Joined group regatta-test-local as member regatta@Jiri-Zizkovsky.local (github.com/segmentio/kafka-go)-65631494-c7b4-4be6-be5f-076e946fd33e in generation 356 2021-01-12T10:14:45.869+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 Syncing 1 assignments for generation 356 as member regatta@Jiri-Zizkovsky.local (github.com/segmentio/kafka-go)-65631494-c7b4-4be6-be5f-076e946fd33e 2021-01-12T10:14:45.873+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 sync group finished for group, regatta-test-local 2021-01-12T10:14:45.876+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 subscribed to partitions: map[0:-2 1:-2] 2021-01-12T10:14:45.876+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 started partition watcher for group, regatta-test-local, topic regatta-test [5s] 2021-01-12T10:14:45.876+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 started commit for group regatta-test-local

2021-01-12T10:14:45.876+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 initializing kafka reader for partition 0 of regatta-test starting at offset -2 2021-01-12T10:14:45.876+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 initializing kafka reader for partition 1 of regatta-test starting at offset -2 2021-01-12T10:14:45.876+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 started heartbeat for group, regatta-test-local [3s] 2021-01-12T10:14:45.920+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 the kafka reader for partition 0 of regatta-test is seeking to offset 0 2021-01-12T10:14:45.926+0100 DEBUG consumer:regatta-test kafka-go@v0.4.8/logger.go:17 the kafka reader for partition 1 of regatta-test is seeking to offset 0



Sometimes it happens that when the topic is created in kafka the reader gets the partitions immediately and than the reader works as expected without any further intervention needed.
It seems that topic creation is not atomic operation and when the reader connects "in the middle" it is not able to recover.
Maybe it can bedetected [here](https://github.com/segmentio/kafka-go/blob/0.4.8/consumergroup.go#L1042) when the assignments are empty?

**Expected behavior**
The reader is always able to reconnect and consume kafka topic even if kafka is not ready during startup.
achille-roussel commented 3 years ago

Hello @jizi, and thank you for the detailed bug report.

I think your suggestion on how to fix seems sound, would you have time to submit a pull request that addresses the issue?

achille-roussel commented 2 years ago

Hello @jizi, I just wanted to check whether you were still experiencing this issue with kafka-go?