apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.23k stars 3.58k forks source link

acknowledgmentAtBatchIndexLevelEnabled will still receive duplicate messages when unload topic #14982

Open xuesongxs opened 2 years ago

xuesongxs commented 2 years ago

Describe the bug Pulsar v2.8.3 acknowledgmentAtBatchIndexLevelEnabled will still receive duplicate messages

To Reproduce Steps to reproduce the behavior:

  1. Set acknowledgmentAtBatchIndexLevelEnabled=true in broker.conf
  2. Create consumer:

    public static void main(String[] args) throws Exception{
        PulsarClient client = PulsarClient
                .builder()
                .serviceUrl("pulsar://127.0.0.1:6650")
                .build();
    
        ConsumerBuilder<NlMessage> consumerBuilder = client.newConsumer(Schema.JSON(NlMessage.class)).topic("batchtest")
                .subscriptionName("batchTest")
                .subscriptionType(SubscriptionType.Shared)
                .enableRetry(true)
                .enableBatchIndexAcknowledgment(true)
                .receiverQueueSize(1000)
                .negativeAckRedeliveryDelay(20, TimeUnit.SECONDS)
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
        // DLQ
        DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.builder()
                .deadLetterTopic("persistent://public/default/batchtest-dlq")
                .retryLetterTopic("persistent://public/default/batchtest-retry")
                .maxRedeliverCount(3)
                .build();
        consumerBuilder.deadLetterPolicy(deadLetterPolicy);
        consumerBuilder.negativeAckRedeliveryDelay(10, TimeUnit.MICROSECONDS);
        consumerBuilder.ackTimeout(0, TimeUnit.SECONDS);
    
        Consumer<NlMessage> consumer = consumerBuilder.subscribe();
        long i = 0;
        while (true) {
            Message<NlMessage> message = consumer.receive();
            System.out.println("message:" + message.getMessageId());
            if (i <= 99) {
                consumer.acknowledge(message);
                System.out.println("acked message:" + i);
            } else {
                // hold the msg
            }
            i++;
        }
  3. Create producer and send 100 msgs:
    public static void main(String[] args) throws PulsarClientException {
        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://127.0.0.1:6650").build();
        Producer<NlMessage> producer = pulsarClient.newProducer(Schema.JSON(NlMessage.class))
                .topic("persistent://public/default/batchtest")
                .blockIfQueueFull(true)
                .sendTimeout(0, TimeUnit.SECONDS)
                .create();
        long i = 1;
        while(i <= 100L){
            producer.sendAsync(new NlMessage());
            i++;
        }
    }
  4. 100 msgs in 1 backlog image
  5. After 2 minutes, execute unload cmd: bin/pulsar-admin topics unload persistent://public/default/batchtest
  6. See consumer log, there will also be several or more messages that have been ack pushed.

Expected behavior Only push the message without ack.

Screenshots If applicable, add screenshots to help explain your problem.

Desktop (please complete the following information):

Additional context Add any other context about the problem here.

Jason918 commented 2 years ago

Is it reproduced on master branch? Do you plan to work on a fix? @xuesongxs

xuesongxs commented 2 years ago

Is it reproduced on master branch? Do you plan to work on a fix? @xuesongxs

I tested it on the 2.8.3 branch, the same should be true for the master branch, I won't fix this.

Technoboy- commented 2 years ago

In this case, it may receive the duplicate message.

Jason918 commented 2 years ago

In this case, it may receive the duplicate message.

Because mark delete info is persistent periodically and it's not triggered when topic unloads?

xuesongxs commented 2 years ago

In this case, it may receive the duplicate message.

Because mark delete info is persistent periodically and it's not triggered when topic unloads?

I unloaded or stopped the broker 5 minutes after the ack of the first 99 messages was completed. Even if I write index asynchronously, 5 minutes should be enough, but there is still a problem.

lhotari commented 2 years ago

15031 and #15067 fix issues in this area.

github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.

github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.

xuesongxs commented 2 years ago

15031 and #15067 fix issues in this area.

I have verified these 2 PRs in the latest pulsar version and have not solved this bug.

github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.

jdfrozen commented 1 year ago

2.8.4 duplicated delayed messages when unload topic