segmentio / kafka-go

Kafka library in Go
MIT License
7.65k stars 792 forks source link

The offset is submitted, but each execution still starts from the beginning and repeats #1318

Open ChenghuaMi opened 3 months ago

ChenghuaMi commented 3 months ago

func main() { r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092"}, Topic: "my-topic", Partition: 0, MaxBytes: 10e6, // 10MB }) for { m, err := r.ReadMessage(context.Background()) if err != nil { break } fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value)) r.CommitMessages(context.Background(), m) // commit offset }

if err := r.Close(); err != nil {
    log.Fatal("failed to close reader:", err)
}

}

I submitted the offset, but each time I execute the program, it starts from the beginning, resulting in repeated consumption.

===============First Execution=========== message at offset 0: = one! message at offset 1: = two! message at offset 2: = three!

============== Second Execution ========= message at offset 0: = one! message at offset 1: = two! message at offset 2: = three!

luffyao commented 2 months ago

i think you must use the consumer group to do this.

logrusorgru commented 3 days ago

Also, the ReadMessage commits the message automatically.

If consumer groups are used, ReadMessage will automatically commit the offset when called. Note that this could result in an offset being committed before the message is fully processed.

If more fine-grained control of when offsets are committed is required, it is recommended to use FetchMessage with CommitMessages instead.

https://pkg.go.dev/github.com/segmentio/kafka-go#Reader.ReadMessage

And yes, it's for consumer groups.