apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.15k stars 3.57k forks source link

getLastMessageId from PulsarAdmin doesn't work with BatchMessageIdImpl #6462

Closed kevin-kosmos closed 4 years ago

kevin-kosmos commented 4 years ago

Describe the bug A topic created with a producer in batch mode generates batch messages id : ledgerId:entryId:partitionIndex:batchIndex. The getLastMessageId method return a MessageIdImpl which doesn't use the batchIndex parameter.

To Reproduce

  1. Send messages with a producer in batch mode : `

    public class ProducerTest {

    public static void main(String[] args) throws Exception {

    try (PulsarClient client = PulsarClient.builder()
            .serviceUrl("pulsar://localhost:6650")
            .build()) {
    
        Producer<String> producer = client
                .newProducer(Schema.STRING)
                .topic("topic")
                .enableBatching(true)
                .batchingMaxMessages(10)
                .blockIfQueueFull(false)
                .batchingMaxPublishDelay(100, TimeUnit.SECONDS)
                .create();
    
        CompletableFuture<MessageId> messageIdCompletableFuture = null;
        for (int i = 0; i < 10; i++) {
            messageIdCompletableFuture = producer.newMessage()
                    .value(String.valueOf(i))
                    .sendAsync();
        }
    
        messageIdCompletableFuture.get();
    }

    } }`

  2. Shows generated messages :

|value|topic |messageId |__messageIdToString| |0 |persistent://public/default/topic|[08 F0 B6 02 10 00 20 00]|39792:0: -1:0 | |1 |persistent://public/default/topic|[08 F0 B6 02 10 00 20 01]|39792:0: -1:1 | |2 |persistent://public/default/topic|[08 F0 B6 02 10 00 20 02]|39792:0: -1:2 | |3 |persistent://public/default/topic|[08 F0 B6 02 10 00 20 03]|39792:0: -1:3 | |4 |persistent://public/default/topic|[08 F0 B6 02 10 00 20 04]|39792:0: -1:4 | |5 |persistent://public/default/topic|[08 F0 B6 02 10 00 20 05]|39792:0: -1:5 | |6 |persistent://public/default/topic|[08 F0 B6 02 10 00 20 06]|39792:0: -1:6 | |7 |persistent://public/default/topic|[08 F0 B6 02 10 00 20 07]|39792:0: -1:7 | |8 |persistent://public/default/topic|[08 F0 B6 02 10 00 20 08]|39792:0: -1:8 | |9 |persistent://public/default/topic|[08 F0 B6 02 10 00 20 09]|39792:0: -1:9 |

  1. Retrieve the last message id from pulsar admin and display it : 39792:0: -1

Expected behavior It shoud return 39792:0: -1:9

Desktop (please complete the following information):

congbobo184 commented 4 years ago

@kevin-kosmos broker don't store the batchIndex, client only loop set batchIndex.

eladar2000 commented 4 years ago

@congbobo184 I don't understand ? If the batchIndex is not stored on the broker side, all the messages of the same batch have the same Message id (as described before, the batch id is the only changing part in the id) ? So it is not possible to target a specific message created inside a batch ?

congbobo184 commented 4 years ago

I will fix this, and every time it will return the last of batchIndex.