apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.1k stars 3.56k forks source link

[Bug][broker] Occur so many ERROR log in broker, which is confusing #23022

Closed TakaHiR07 closed 2 weeks ago

TakaHiR07 commented 1 month ago

Search before asking

Read release policy

Version

master

Minimal reproduce step

can implement a simple unittest in ReaderTest.class to reproduce ERROR log

@Test
    private void testRead() throws Exception {
        cleanup();
        conf.setDispatcherReadFailureBackoffInitialTimeInMs(1000);
        setup();

        String topic = "persistent://my-property/my-ns/my-reader-topic";
        boolean enableBatch = false;

        int numKeys = 10;

        Set<String> keys = publishMessages(topic, numKeys, enableBatch);
        Reader<byte[]> reader = pulsarClient.newReader()
                .topic(topic)
                .startMessageId(MessageId.earliest)
                .readerName(subscription)
                .create();

        while (reader.hasMessageAvailable()) {
            Message<byte[]> message = reader.readNext();
            Assert.assertTrue(keys.remove(message.getKey()));
        }
        reader.close();

        Thread.sleep(2000);
    }

What did you expect to see?

should not log this confusing error, since everything is OK.

What did you see instead?

In order to fix NonDurableImpl OOM problem, this pr https://github.com/apache/pulsar/pull/22454, and several previous pr make some modification.

This is the following modification and the relevant code. https://github.com/apache/pulsar/blob/7c0e82739215fbae9e21270d4c70c9a52dd3e403/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L992-L1002

https://github.com/apache/pulsar/blob/88ebe785dbdab239104981453a9bd0e4a7e896d3/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L945-L991

The error log Root reason is:

If reader readNext and then close, if hasMoreEntries() is false, it would enter checkForNewEntries() with default 10ms delay. During this time, nonDurableCursor's state change to closed. So after 10ms delay, it would throw CursorAlreadyClosedException in checkForNewEntries().

The execute order is : asyncReadEntriesWithSkipOrWait -> cursor become closed -> after 10ms delay -> checkForNewEntries

And PersistentDispatcherSingleActiveConsumer would log Error reading entries at 5:1 : Cursor was already closed - Retrying to read in 1.0 seconds and then log Skipping read retry: Current Consumer null, havePendingRead false

It seems once nonDurableCursor is closed. There is no need to log the error, and no need to schedule readEntry again. Besides, we should also consider that if cursor is durable, should it schedule readEntry again?

Now the code in readEntry looks still have some unreasonable area.

This is the log when reproducing, including a confusing ERROR log.

2024-07-11T16:44:31,915+0800 [pulsar-io-75-4] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:61363] Created subscription on topic persistent://my-property/my-ns/my-reader-topic / reader-2b31818c8d
2024-07-11T16:44:31,915+0800 [pulsar-client-io-102-4] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://my-property/my-ns/my-reader-topic][reader-2b31818c8d] Subscribed to topic on localhost/127.0.0.1:61358 -- consumer: 0
2024-07-11T16:44:31,916+0800 [broker-topic-workers-OrderedExecutor-7-0] DEBUG org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - [persistent://my-property/my-ns/my-reader-topic / reader-2b31818c8d-Consumer{subscription=PersistentSubscription{topic=persistent://my-property/my-ns/my-reader-topic, name=reader-2b31818c8d}, consumerId=0, consumerName=reader-sub, address=[id: 0x80218327, L:/127.0.0.1:61358 - R:/127.0.0.1:61363] [SR:127.0.0.1, state:Connected]}] Trigger new read after receiving flow control message
2024-07-11T16:44:31,916+0800 [broker-topic-workers-OrderedExecutor-7-0] DEBUG org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - [persistent://my-property/my-ns/my-reader-topic / reader-2b31818c8d-Consumer{subscription=PersistentSubscription{topic=persistent://my-property/my-ns/my-reader-topic, name=reader-2b31818c8d}, consumerId=0, consumerName=reader-sub, address=[id: 0x80218327, L:/127.0.0.1:61358 - R:/127.0.0.1:61363] [SR:127.0.0.1, state:Connected]}] Schedule read of 100 messages
2024-07-11T16:44:31,916+0800 [broker-topic-workers-OrderedExecutor-7-0] DEBUG org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [my-property/my-ns/persistent/my-reader-topic] [reader-2b31818c8d] Read entries immediately
2024-07-11T16:44:31,919+0800 [PulsarTestContext-executor-OrderedExecutor-0-0] DEBUG org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [my-property/my-ns/persistent/my-reader-topic] [reader-2b31818c8d] Filtering entries [5:0..5:0] - alreadyDeleted: []
2024-07-11T16:44:31,920+0800 [PulsarTestContext-executor-OrderedExecutor-0-0] DEBUG org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [my-property/my-ns/persistent/my-reader-topic] [reader-2b31818c8d] No filtering needed for entries [5:0..5:0]
2024-07-11T16:44:31,920+0800 [broker-topic-workers-OrderedExecutor-7-0] DEBUG org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - [persistent://my-property/my-ns/my-reader-topic / reader-2b31818c8d-Consumer{subscription=PersistentSubscription{topic=persistent://my-property/my-ns/my-reader-topic, name=reader-2b31818c8d}, consumerId=0, consumerName=reader-sub, address=[id: 0x80218327, L:/127.0.0.1:61358 - R:/127.0.0.1:61363] [SR:127.0.0.1, state:Connected]}] Got messages: 1
2024-07-11T16:44:31,926+0800 [broker-topic-workers-OrderedExecutor-7-0] DEBUG org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - [persistent://my-property/my-ns/my-reader-topic / reader-2b31818c8d-Consumer{subscription=PersistentSubscription{topic=persistent://my-property/my-ns/my-reader-topic, name=reader-2b31818c8d}, consumerId=0, consumerName=reader-sub, address=[id: 0x80218327, L:/127.0.0.1:61358 - R:/127.0.0.1:61363] [SR:127.0.0.1, state:Connected]}] Schedule read of 100 messages
2024-07-11T16:44:31,926+0800 [broker-topic-workers-OrderedExecutor-7-0] DEBUG org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [my-property/my-ns/persistent/my-reader-topic] [reader-2b31818c8d] Deferring retry of read at position 5:1
2024-07-11T16:44:31,929+0800 [pulsar-io-75-4] DEBUG org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://my-property/my-ns/my-reader-topic][reader-2b31818c8d] Cumulative ack on 5:0
2024-07-11T16:44:31,929+0800 [pulsar-io-75-4] DEBUG org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [my-property/my-ns/persistent/my-reader-topic] Mark delete cursor reader-2b31818c8d up to position: 5:0
2024-07-11T16:44:31,929+0800 [pulsar-io-75-4] DEBUG org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [my-property/my-ns/persistent/my-reader-topic] Moved ack position from: 5:-1 to: 5:0 -- skipped: 1
2024-07-11T16:44:31,932+0800 [pulsar-io-75-4] DEBUG org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://my-property/my-ns/my-reader-topic][reader-2b31818c8d] Mark deleted messages to position 5:0 from position 5:-1
2024-07-11T16:44:31,932+0800 [pulsar-io-75-4] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:61363] Closing consumer: consumerId=0
2024-07-11T16:44:31,933+0800 [pulsar-io-75-4] INFO  org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=PersistentSubscription{topic=persistent://my-property/my-ns/my-reader-topic, name=reader-2b31818c8d}, consumerId=0, consumerName=reader-sub, address=[id: 0x80218327, L:/127.0.0.1:61358 - R:/127.0.0.1:61363] [SR:127.0.0.1, state:Connected]}
2024-07-11T16:44:31,933+0800 [pulsar-io-75-4] DEBUG org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [my-property/my-ns/persistent/my-reader-topic] [reader-2b31818c8d] Cancel pending read request
2024-07-11T16:44:31,933+0800 [pulsar-io-75-4] INFO  org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://my-property/my-ns/my-reader-topic][reader-2b31818c8d] Successfully closed subscription [NonDurableCursorImpl{ledger=my-property/my-ns/persistent/my-reader-topic, cursor=reader-2b31818c8d, ackPos=5:0, readPos=5:1}]
2024-07-11T16:44:31,933+0800 [pulsar-io-75-4] INFO  org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://my-property/my-ns/my-reader-topic][reader-2b31818c8d] Successfully closed dispatcher for reader
2024-07-11T16:44:31,933+0800 [pulsar-io-75-4] DEBUG org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://my-property/my-ns/my-reader-topic] [reader-2b31818c8d] [reader-sub] Removed consumer -- count: 0
2024-07-11T16:44:31,934+0800 [pulsar-io-75-4] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:61363] Closed consumer, consumerId=0
2024-07-11T16:44:31,934+0800 [pulsar-client-io-102-4] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://my-property/my-ns/my-reader-topic] [reader-2b31818c8d] Closed consumer
2024-07-11T16:44:31,935+0800 [pulsar-74-1] DEBUG org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [my-property/my-ns/persistent/my-reader-topic] Consumer reader-2b31818c8d cursor ml-entries: 1 -- deleted-counter: 1 other counters: mdPos 5:0 rdPos 5:1
2024-07-11T16:44:31,938+0800 [bookkeeper-ml-scheduler-OrderedScheduler-1-0] DEBUG org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [my-property/my-ns/persistent/my-reader-topic] [reader-2b31818c8d] Re-trying the read at position null
2024-07-11T16:44:31,938+0800 [broker-topic-workers-OrderedExecutor-7-0] ERROR org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - [persistent://my-property/my-ns/my-reader-topic / reader-2b31818c8d-Consumer{subscription=PersistentSubscription{topic=persistent://my-property/my-ns/my-reader-topic, name=reader-2b31818c8d}, consumerId=0, consumerName=reader-sub, address=[id: 0x80218327, L:/127.0.0.1:61358 - R:/127.0.0.1:61363] [SR:127.0.0.1, state:Connected]}] Error reading entries at 5:1 : Cursor was already closed - Retrying to read in 1.0 seconds
2024-07-11T16:44:32,945+0800 [broker-topic-workers-OrderedExecutor-7-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - [persistent://my-property/my-ns/my-reader-topic / reader-2b31818c8d-Consumer{subscription=PersistentSubscription{topic=persistent://my-property/my-ns/my-reader-topic, name=reader-2b31818c8d}, consumerId=0, consumerName=reader-sub, address=[id: 0x80218327, L:/127.0.0.1:61358 - R:/127.0.0.1:61363] [SR:127.0.0.1, state:Connected]}] Skipping read retry: Current Consumer null, havePendingRead false

Anything else?

No response

Are you willing to submit a PR?

TakaHiR07 commented 1 month ago

@Technoboy- @codelipenghui @lhotari Can you take a look of this issue

dao-jun commented 1 month ago

https://github.com/apache/pulsar/pull/22751

dao-jun commented 2 weeks ago

close the issue because of https://github.com/apache/pulsar/pull/22751 merged