segmentio / kafka-go

Kafka library in Go
MIT License
7.53k stars 780 forks source link

Reader doesn't fetch messages after new consumer group creation when StartOffset is set to FirstOffset #1211

Open M0rdecay opened 11 months ago

M0rdecay commented 11 months ago

Describe the bug

Reader doesn't fetch messages after new consumer group creation when StartOffset is set to FirstOffset

Kafka Version

  • What version(s) of Kafka are you testing against?

Kafka 3.5.1

  • What version of kafka-go are you using?

kafka-go 0.4.43

To Reproduce

Resources to reproduce the behavior:

docker run -d --name kafka-server -p 9092:9092 -p 9093:9093 --hostname kafka-test \
    --network app-tier \
    -e KAFKA_CFG_NODE_ID=0 \
    -e KAFKA_CFG_PROCESS_ROLES=controller,broker \
    -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
    -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
    -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
    -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
    bitnami/kafka:3.5.1

Expected Behavior

After consumer group created, reader will start fetching messages from first offset.

Observed Behavior

If the consumer group did not exist before, Reader will not fetch messages after the group is created. However, with LastOffset or already exists consumer group it works correctly.

jonathan-dev commented 11 months ago

Might not be your problem but I had a similar behavior when I had a non yet created topic in my topic list ... maybe not your problem but worth checking took me a while to figure it out because there was no logging saying that it is problematic.

zmz-kevin commented 11 months ago

panic: markRead: negative count │ │ │ │ goroutine 135 [running]: │ │ github.com/segmentio/kafka-go.(*messageSetReader).markRead(0xc00090a7b0?) │ │ /vendor/github.com/segmentio/kafka-go/message_reader.go:353 +0x9e │ │ [github.com/segmentio/kafka-go.(messageSetReader).readMessageV2(0xc00037bc60](http://github.com/segmentio/kafka-go.(messageSetReader).readMessageV2(0xc00037bc60), 0xc00028d9c0?, 0x44f525?, 0xc00028da10?) │ │ /vendor/github.com/segmentio/kafka-go/message_reader.go:337 +0x96d │ │ [github.com/segmentio/kafka-go.(messageSetReader).readMessage(0xc00037bc60](http://github.com/segmentio/kafka-go.(messageSetReader).readMessage(0xc00037bc60), 0x0?, 0x7f9a8d8a4a08?, 0x25848dd33?) │ │ /vendor/github.com/segmentio/kafka-go/message_reader.go:138 +0x88 │ │ [github.com/segmentio/kafka-go.(Batch).readMessage(0xc000888300](http://github.com/segmentio/kafka-go.(Batch).readMessage(0xc000888300), 0x0?, 0x0?) │ │ /vendor/github.com/segmentio/kafka-go/batch.go:248 +0x45 │ │ github.com/segmentio/kafka-go.(*Batch).ReadMessage(0xc000888300) │ │ /vendor/github.com/segmentio/kafka-go/batch.go:200 +0xe5 │ │ [github.com/segmentio/kafka-go.(reader).read(0xc00028dec8](http://github.com/segmentio/kafka-go.(reader).read(0xc00028dec8), {0x14e9108, 0xc00038ccd0}, 0x0, 0xc00090a780) │ │ /vendor/github.com/segmentio/kafka-go/reader.go:1513 +0x285 │ │ [github.com/segmentio/kafka-go.(reader).run(0xc00028dec8](http://github.com/segmentio/kafka-go.(reader).run(0xc00028dec8), {0x14e9108, 0xc00038ccd0}, 0x0) │ │ /vendor/github.com/segmentio/kafka-go/reader.go:1325 +0x405 │ │ [github.com/segmentio/kafka-go.(Reader).start.func1({0x14e9108](http://github.com/segmentio/kafka-go.(Reader).start.func1(%7B0x14e9108)?, 0xc00038ccd0?}, {{0xc00030e330?, 0x0?}, 0x0?}, 0x0?, 0x0?) │ │ /vendor/github.com/segmentio/kafka-go/reader.go:1217 +0x1bb │ │ created by github.com/segmentio/kafka-go.(*Reader).start in goroutine 29

I also have a similar behavior, above is the error log, I have five partition, but actually only one partition has the data. If I set the startOffset to lastoffset, it can work, but if it is firstOffset, it will panic