fsprojects / pulsar-client-dotnet

Apache Pulsar native client for .NET (C#/F#/VB)
MIT License
301 stars 47 forks source link

Fix seek chunked messages #205

Closed RobertIndie closed 2 years ago

RobertIndie commented 2 years ago

Motivation

This is the implementation of https://github.com/apache/pulsar/issues/12402.

Currently, when we send chunked messages, the producer returns the message-id of the last chunk. This can cause some problems. For example, when we use this message-id to seek, it will cause the consumer to consume from the position of the last chunk, and the consumer will mistakenly think that the previous chunks are lost and choose to skip the current message. If we use the inclusive seek, the consumer may skip the first message, which brings the wrong behavior.

Here is the simple code(in java) used to demonstrate the problem.

var msgId = producer.send(...); // eg. return 0:1:-1

var otherMsg = producer.send(...); // return 0:2:-1

consumer.seek(msgId); // inclusive seek

var receiveMsgId = consumer.receive().getMessageId(); // it may skip the
first message and return like 0:2:-1

Assert.assertEquals(msgId, receiveMsgId); // fail

For more context, please see PIP-107

And I find that f# client has already stored all chunk message ids in MessageIds.chunkMessageIds. We can use this field to implement the ChunkMessageId feature like in java.

There is still work left in this PR to serialize the ChunkMessageId. To be consistent with the behavior of the Java client, when we serialize and deserialize messageIDs or compare messageId, the comparison for chunkMessageIds only needs to compare the message id of the first chunk if the message is a chunked message. Like below:

match m.ChunkMessageIds, this.ChunkMessageIds with
| Some mchunkMessageIds, Some thisChunkMessageIds when mchunkMessageIds.Length > 0 && thisChunkMessageIds.Length > 0 ->
                        mchunkMessageIds.[0] = thisChunkMessageIds.[0] // We need to check the first chunk message id if the message is a chunkd message
| _, _ -> true

We need to update the pulsar proto file before proceeding with the rest of the work. What is the correct way to generate the code for the proto? I found that the code I generated using protoc is very different from the existing generated code. Are the parameters not set correctly?

Update: The serialization for the chunk message id is added. This PR is ready for review.

Modification

Lanayx commented 2 years ago

The code is generated using this site https://protogen.marcgravell.com/ , you'll also need to update generated modifiers from public to internal.