apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.17k stars 3.57k forks source link

pulsar-timer thread blocked at redeliverUnacknowledgedMessages #9109

Closed Shawyeok closed 3 years ago

Shawyeok commented 3 years ago

Describe the bug After broker crash and restart, consumers got blocked, consumer rateOut decrease to 0, can't auto recover without restart consumer process.

To Reproduce Steps to reproduce the behavior:

  1. Run reproduce code below

    public class PulsarConsumerTest {
    
    private static final Logger LOG = LoggerFactory.getLogger(PulsarConsumerTest.class);
    
    private PulsarClient pulsarClient;
    private Consumer<byte[]> consumer;
    
    @Before
    public void setUp() throws Exception {
        pulsarClient = PulsarClient.builder()
            .serviceUrl("pulsar://<broker>:6650")
            .build();
        String topic = "persistent://sample/ns1/topic1";
        String subscriptionName = "test";
        DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.builder()
            .deadLetterTopic(String.format("%s-%s-DLQ", topic, subscriptionName))
            .maxRedeliverCount(3)
            .build();
        consumer = pulsarClient.newConsumer()
            .topic(topic)
            .deadLetterPolicy(deadLetterPolicy)
            .ackTimeout(5, TimeUnit.SECONDS)
            .acknowledgmentGroupTime(0, TimeUnit.MILLISECONDS)
            .subscriptionName(subscriptionName)
            .subscriptionType(SubscriptionType.Shared)
            .subscribe();
    }
    
    @After
    public void tearDown() throws Exception {
        pulsarClient.close();
    }
    
    @Test
    public void test() throws PulsarClientException {
        while (true) {
            Message<byte[]> message = consumer.receive();
            MessageId messageId = message.getMessageId();
            LOG.info("received message with messageId: {}", messageId);
            try {
                consume(message);
            } catch (Exception e) {
                LOG.error("consume message exception with messageId: {}", messageId, e);
            }
        }
    }
    
    private void consume(Message<byte[]> message) {
        throw new IllegalStateException("mock consume fails");
    }
    }
  2. Send 1000 messages to topic persistent://sample/ns1/topic1
  3. Wait about 10-15s(time to redeliver), kill and restart broker process
  4. Look into the thread pulsar-timer-4-1's stack, check whether it's blocked or not
  5. If the problem doesn't appear, try step 2-4 a few more times

Expected behavior The thread pulsar-timer-4-1 blocked at producer.send forever, and the method consumer.receive may blocked at UnAckedMessageTracker#add method due to acquire a writeLock inside UnAckedMessageTracker.

"pulsar-timer-4-1" #37 prio=5 os_prio=0 tid=0x00007efe584c0000 nid=0x62 waiting on condition [0x00007efe30aec000]
   java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  <0x00000000af9bbee8> (a java.util.concurrent.CompletableFuture$Signaller)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
 at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
 at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
 at org.apache.pulsar.client.impl.ProducerBase.send(ProducerBase.java:115)
 at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:89)
 at org.apache.pulsar.client.impl.ConsumerImpl.processPossibleToDLQ(ConsumerImpl.java:1452)
 at org.apache.pulsar.client.impl.ConsumerImpl.lambda$null$12(ConsumerImpl.java:1390)
 at org.apache.pulsar.client.impl.ConsumerImpl$$Lambda$811/717118161.test(Unknown Source)
 at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
 at java.util.Iterator.forEachRemaining(Iterator.java:116)
 at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
 at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
 at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
 at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
 at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
 at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
 at org.apache.pulsar.client.impl.ConsumerImpl.lambda$redeliverUnacknowledgedMessages$14(ConsumerImpl.java:1396)
 at org.apache.pulsar.client.impl.ConsumerImpl$$Lambda$810/1511392410.accept(Unknown Source)
 at java.lang.Iterable.forEach(Iterable.java:75)
 at org.apache.pulsar.client.impl.ConsumerImpl.redeliverUnacknowledgedMessages(ConsumerImpl.java:1388)
 at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.lambda$redeliverUnacknowledgedMessages$20(MultiTopicsConsumerImpl.java:621)
 at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl$$Lambda$807/636669140.accept(Unknown Source)
 at java.util.HashMap.forEach(HashMap.java:1289)
 at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.redeliverUnacknowledgedMessages(MultiTopicsConsumerImpl.java:619)
 at org.apache.pulsar.client.impl.UnAckedMessageTracker$2.run(UnAckedMessageTracker.java:144)
 at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672)
 at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747)
 at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472)
 at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 at java.lang.Thread.run(Thread.java:748)
   Locked ownable synchronizers:
 - <0x00000000a9dab610> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)

Screenshots The consumer thread got blocked in our production. image

Additional context Broker version: 2.4.0 pulsar-client version: 2.5.2

codelipenghui commented 3 years ago

https://github.com/apache/pulsar/pull/9552 fixed this problem