streamnative / kop

Kafka-on-Pulsar - A protocol handler that brings native Kafka protocol to Apache Pulsar
https://streamnative.io/docs/kop
Apache License 2.0
451 stars 137 forks source link

[BUG] KoP will consume duplicate message when send multi batches in one produce request for one TopicPartition #537

Open hangc0276 opened 3 years ago

hangc0276 commented 3 years ago

Describe the Bug

When producer send multi batches in one MemoryRecords, such as protocal [V0, V2] and [V8, +), it will packup multi batches in one entry and stored in managedLeger.

However, the offset expose to consumer is in batch granularity. Consumer fetch message accroding to offset, and it will be converted to MessageId in KoP. As the following picture, one entry contains for batches image

When consumer fetch from offset 6, offset 6 will be converted to entry 1, and KoP will fetch entry 1 from BookKeeper and return fetched entry to consumer without filtering out message of offset 4 and offset 5. It will lead to consumer fetch duplicated messages.

Another affected area is getting LogStartOffset (LSO). https://github.com/streamnative/kop/pull/531 We are planing to calculate LSO by LSO = LEO - numberOfEntries

LSO and numberOfEntries can get from managedLedger. However, this formula is based on one entry contains one batch.

If one entry contains multi batches, we should use another variable to mark LSO in managedLedger.

BewareMyPower commented 3 years ago

Just a term correction. In Kafka, LSO is usually short for Last Stable Offset. For log started offsets, there seems to be no short name.

To solve this problem, I think we need to upgrade the Kafka dependency to 2.7.0 or 2.8.0 first. And if we want to handle the case that a single entry contains multiple batches, we can add a key-value to the entry's metadata.