streamnative / kop

Kafka-on-Pulsar - A protocol handler that brings native Kafka protocol to Apache Pulsar
https://streamnative.io/docs/kop
Apache License 2.0
447 stars 132 forks source link

Fix wrong offset increment for Sarama message set #1992

Closed BewareMyPower closed 11 months ago

BewareMyPower commented 11 months ago

Motivation

When Sarama producers send messages to KoP with message set, e.g.

conf := sarama.NewConfig()
conf.Version = sarama.V0_10_2_2
conf.Producer.Compression = sarama.CompressionLZ4
producer, err := sarama.NewAsyncProducer([]string{"localhost:15003"}, conf)

The PartitionLog#analyzeAndValidateRecords method could parse a wrong LogAppendInfo#numMessages result. It's because this method uses RecordBatch#lastOffset to get the latest offset.

https://github.com/streamnative/kop/blob/3b22e79764ca22107228ec2f74590f0769bd9fd9/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java#L1028

However, when Sarama client handles the message set (v0 or v1 records), it only writes a offset of zero value:

https://github.com/IBM/sarama/blob/c10bd1e5709a7b47729b445bc98f2e41bc7cc0a8/produce_request.go#L107

In this case, every batch is counted as 1 message, which might results in a wrong offset. Assuming there are 2 batches whose size is 5, the offsets written in the entries could be:

The correct offsets should be:

The wrong LEO could make the offset check in PartitionLog#checkOffsetOutOfRange fail and then consumers will reset the offset and consume duplicated messages.

Modifications

When the lastOffset is 0, iterate over the records in the batch to compute the number of messages.

Ideally we should add a Golang client test to avoid the regression, in this patch, we only add a SaramaCompressedV1Records class to simulate the behavior of Sarama. Then add the unit tests.

Documentation

Check the box below.

Need to update docs?

BewareMyPower commented 11 months ago

I think for clients of other languages, we can add the tests into our own private test repo in future.

https://github.com/streamnative/kop-tests

@Demogorgon314 @gaoran10 @codelipenghui

BewareMyPower commented 11 months ago

Here is the reproduce code for Sarama. (Just run a unit test in KoP and sleep for a long time)

package main

import (
    "fmt"
    "log"
    "os"

    "github.com/IBM/sarama"
)

func main() {
    sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
    conf := sarama.NewConfig()
    conf.Version = sarama.V0_10_2_2
    conf.Producer.Compression = sarama.CompressionLZ4
    // producer.Successes() will return the msg metadata
    conf.Producer.Return.Successes = true
    producer, err := sarama.NewAsyncProducer([]string{"localhost:15003"}, conf)
    if err != nil {
        panic("Couldn't create a Kafka producer")
    }
    defer producer.AsyncClose()
    for i := 0; i < 2; i++ {
        producer.Input() <- &sarama.ProducerMessage{
            Topic: "my-topic",
            Key:   nil,
            Value: sarama.StringEncoder(fmt.Sprintf("my-value-%d", i)),
        }
    }
    for i := 0; i < 2; i++ {
        select {
        case err := <-producer.Errors():
            sarama.Logger.Println(i, "failed", err)
        case metadata := <-producer.Successes():
            sarama.Logger.Println(i, "send to ", metadata.Offset)
        }
    }
}
image
codecov[bot] commented 11 months ago

Codecov Report

Merging #1992 (4e2ead3) into master (1fd3bdb) will increase coverage by 0.00%. Report is 1 commits behind head on master. The diff coverage is 71.42%.

Impacted file tree graph

@@            Coverage Diff            @@
##             master    #1992   +/-   ##
=========================================
  Coverage     17.28%   17.29%           
- Complexity      726      728    +2     
=========================================
  Files           190      190           
  Lines         14012    14041   +29     
  Branches       1312     1320    +8     
=========================================
+ Hits           2422     2428    +6     
- Misses        11414    11437   +23     
  Partials        176      176           
Files Changed Coverage Δ
...tive/pulsar/handlers/kop/storage/PartitionLog.java 8.61% <71.42%> (+0.71%) :arrow_up:

... and 4 files with indirect coverage changes