apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.24k stars 3.59k forks source link

[Bug] Client with shared subscription is blocked #21104

Open michalcukierman opened 1 year ago

michalcukierman commented 1 year ago

Search before asking

Version

Client - 3.1.0 Pulsar - 3.1.0 (and later builds)

Also reported on 3.0.1

Minimal reproduce step

My reproducible steps:

  1. Create persistent topic with 3 partitions
  2. Publish 1 mln messages (30KB)
  3. Run the client and consumer:
    PulsarClient client = PulsarClient.builder()
        .serviceUrl(this.pulsarBrokerUrl)
        .build();

    Consumer consumer = client.newConsumer()
        .topic(sourceTopic)
        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
        .subscriptionName(subscriptionName)
        .subscriptionType(SubscriptionType.Shared)
        .receiverQueueSize(8)
        .ackTimeout(5, TimeUnit.SECONDS)
        .subscribe();

What did you expect to see?

All messages are received

What did you see instead?

Client stops to receive messages, restart client helps, but it get stuck after some time.

Anything else?

The issue was originally created described here: #21082 @MichalKoziorowski-TomTom also faces the issue.

I've created new issue, because it in #21082 the author says that broker restart helps. In case of this issue, it looks like it's client related and some race condition observed in 3.x.x. after introducing ackTimeout

Are you willing to submit a PR?

codelipenghui commented 1 year ago

@michalcukierman Could you please share the topic stats and internal-stats when the issue is reproduced?

michalcukierman commented 1 year ago

Two files attached: partitioned.stats.internal.txt partitioned.stats.txt

I see that in stats we have:

        "availablePermits" : 0,
        "unackedMessages" : 25,

        "availablePermits" : 0,
        "unackedMessages" : 32,

, but I do ack all of the messages:

    Consumer consumer = client.newConsumer()
        .topic(sourceTopic)
        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
        .subscriptionName(subscriptionName)
        .subscriptionType(SubscriptionType.Shared)
        .receiverQueueSize(8)
        .ackTimeout(ackTimeout, TimeUnit.SECONDS)
        .subscribe();

    Producer<byte[]> producer = client.newProducer(Schema.BYTES)
        .topic(destinationTopic)
        .compressionType(CompressionType.LZ4)
        .maxPendingMessages(8)  // Support 5 MB files
        .blockIfQueueFull(true) // Support 5 MB files
        .batchingMaxBytes(5242880)
        .create();

    Multi<Messagemessages = Multi.createBy().repeating()
        .completionStage(consumer::receiveAsync)
        .until(m -> closed.get());

    messages.subscribe().with(msg -> {
      receivedDistribution.record(getKiloBytesSize(msg.getData()));
      Uni.createFrom().completionStage(producer.newMessage(Schema.BYTES)
              .key(msg.getKey())
              .value(msg.getValue().getContent().getPayload().toByteArray())
              .eventTime(msg.getEventTime()).sendAsync())
          .subscribe().with(msgId -> {
            sentCounter.increment();
            try {
              consumer.acknowledge(msg.getMessageId());
            } catch (PulsarClientException e) {
              throw new RuntimeException("Unable to send or ACK the message", e);
            }
          });
    });
  }

askTimeout is set to 120 seconds.

It can be confirmed in the log files, where sentCounter is just on message behind

Screenshot 2023-09-01 at 13 11 49
michalcukierman commented 1 year ago

Here are the alternative stats: https://github.com/apache/pulsar/issues/21082#issuecomment-1698836870

michalcukierman commented 1 year ago

I think it happens:

There may be a race condition in 3.1.0 client, as the situation was not observed with 2.10.4 (we've downgraded, also @MichalKoziorowski-TomTom reported this as a fix).

MichalKoziorowski-TomTom commented 1 year ago

We don't have ackTimeout and it still happens. In my case, my topic has only one partition.

redoc123 commented 1 year ago

you said 1 million messages right? Could you post your sample payload? I want to reproduce and debug.

michalcukierman commented 1 year ago

It's 1 mln x 30 kb of sample HTML file. Any text file would be good. What we are doing:

topic:source ( 1mln x 30k) -> app:router -> topic:destination

source has 3 partitions destination has 6 partitions Router has 2 replicas, uses shared subscription to read from source, writing to destination and ack's incoming message once the message is stored and acked on destination

michalcukierman commented 1 year ago

Here is the exact payload: 30k.html.zip

lvdelu commented 1 year ago

+1,it seems also occurred in other consumer mode(shared,keyshared)

michalcukierman commented 1 year ago

Happened today again on different module.

topic - not partitioned, without retention messages - 50k, around 5k each subscription - shared, with two clients

The solution is to use 2.10.4 client for now.

mattisonchao commented 1 year ago

I'll try to reproduce it and get back to you. :)

mattisonchao commented 1 year ago

HI, @michalcukierman

I created a repo to reproduce this issue. But no luck. Could you please help me to refine this test?

michalcukierman commented 1 year ago

@mattisonchao I'll have a time next week to get back to it.

lvdelu commented 1 year ago

how about fix deadline ?

mattisonchao commented 1 year ago

@lvdelu Could you help to refine this test to help reproduce it? repo

michalcukierman commented 1 year ago

I've tried to create isolated environment to reproduce the issue on local machine using test containers, but I was not able today. The environment I observe the issue is a GCP cluster with:

It also happens on randomly, with 1 million messages It sometimes get stuck around 300k-500k messages, but it is not deterministic. I'll try to reproduce the issue on GCP, but I'll need more time. It may happen that more than one partition on more than one brokers are required. Unfortunately with the test containers I am not able to recreate the same load.

michalcukierman commented 1 year ago

It looks like I was able to reproduce the issue in the two runs today (failed 2/2).

The code is here: https://github.com/michalcukierman/pulsar-21104

In general it's very much like in the bug description. Produce 1 mln messages of 30kb:

  @Outgoing("requests-out")
  public Multi<String> produce() {
    return Multi.createBy().repeating()
        .uni(() -> Uni
            .createFrom()
            .item(() -> RandomStringUtils.randomAlphabetic(30_000))
            .onItem()
            .invoke(() -> System.out.println("+ Produced: " + outCount.incrementAndGet()))
        )
        .atMost(1_000_000);
  }

Read it using client with shared subscription and write to another topic:

@ApplicationScoped
public class Processor {

  private final AtomicLong inCount = new AtomicLong(0);
  @Incoming("requests-in")
  @Outgoing("dump-out")
  @Blocking
  PulsarOutgoingMessage<String> process(PulsarIncomingMessage<String> in) {
    System.out.println(" - Processed: " + inCount.incrementAndGet());
    return PulsarOutgoingMessage.from(in);
  }
}

The settings of the client are:

pulsar.client.serviceUrl=pulsar://brokers-broker:6650

mp.messaging.incoming.requests-in.subscriptionType=Shared
mp.messaging.incoming.requests-in.numIoThreads=4
mp.messaging.incoming.requests-in.subscriptionName=request-shared-subscription
mp.messaging.incoming.requests-in.ackTimeoutMillis=5000
mp.messaging.incoming.requests-in.subscriptionInitialPosition=Earliest
mp.messaging.incoming.requests-in.receiverQueueSize=8
mp.messaging.incoming.requests-in.topic=persistent://public/default/requests_4
mp.messaging.incoming.requests-in.connector=smallrye-pulsar

mp.messaging.outgoing.dump-out.topic=persistent://public/default/dump
mp.messaging.outgoing.dump-out.connector=smallrye-pulsar
mp.messaging.outgoing.dump-out.blockIfQueueFull=true
mp.messaging.outgoing.dump-out.maxPendingMessages=8
mp.messaging.outgoing.dump-out.maxPendingMessagesAcrossPartitions=12

The retention of the topic requests is set using Pulsar Admin in Java to -1 -1.

During two runs the consumer get stucked:

Screenshot 2023-10-02 at 21 12 42
poorbarcode commented 1 year ago

@michalcukierman

I left a comment here, and you can answer the comment under current Issue, Thanks

michalcukierman commented 1 year ago

Both issues may not be related, In both cases the subscriptions are blocked, but in this case the restart of the broker didn't help - it looks like a deadlock in the client.

@poorbarcode have you tried the repository I've linked? https://github.com/michalcukierman/pulsar-21104

It's possible to reproduce it on GCP cluster. Should work on other clusters as well.

MichalKoziorowski-TomTom commented 7 months ago

It might be fixed with https://github.com/apache/pulsar/issues/22352. Need to check...

MichalKoziorowski-TomTom commented 6 months ago

@michalcukierman Could you recheck your case? I've checked my case and I can't reproduce with 3.0.4 client while it was easily reproducible with 3.0.1. Test with version that includes https://github.com/apache/pulsar/issues/22352 fix.

michalcukierman commented 6 months ago

I could not reproduce the issue with Client 3.0.4, I can still reproduce the issue with Client 3.2.2 @codelipenghui the hprof is here: https://github.com/michalcukierman/pulsar-21104/blob/main/heap.hprof It was taken on the execution of the code that is currently in the main branch (Pulsar Cluster 3.1.0 / Pulsar Client 3.2.2)

michalcukierman commented 6 months ago

I've noticed that with Client 3.2.2 the behavior may be a bit different. The consumers get blocked, but occasionally resume. After a couple messages received are stuck again. Stats are uploaded: https://github.com/michalcukierman/pulsar-21104/tree/main/stats

I've confirmed once again, the issue does not occur with Pulsar Client 3.0.4 (or at least I was not able to reproduce it after processing 1 mln messages. With Client 3.2.2 usually clients are blocked after 50k messages).

michalcukierman commented 6 months ago

https://github.com/apache/pulsar/assets/4356553/a8a9ef21-f12d-4d4d-97b5-9c1b028f4d8f https://github.com/apache/pulsar/assets/4356553/a104c3fe-ad46-4b48-82c2-e045e15b7ac4 Comparison of Client 3.2.2 and Client 3.0.4

michalcukierman commented 5 months ago

@poorbarcode have you had chance to see the last comment?

dao-jun commented 5 months ago

I've noticed that with Client 3.2.2 the behavior may be a bit different. The consumers get blocked, but occasionally resume. After a couple messages received are stuck again. Stats are uploaded: https://github.com/michalcukierman/pulsar-21104/tree/main/stats

I've confirmed once again, the issue does not occur with Pulsar Client 3.0.4 (or at least I was not able to reproduce it after processing 1 mln messages. With Client 3.2.2 usually clients are blocked after 50k messages).

I check the stats and the internal-stats, As I can see, there is about 30k messages in the topic, and there is a subscription named request-shared-subscription to consume the topic, but it seems none of the messages acknowledged, the markDeletedPosition is 0:-1 and individuallyDeletedMessages is empty: image

Which means all the messages are unacked, or, in the backlog. And I checked the stats, for the consumer of request-shared-subscription. I can see msgOutCounter=10002 which means how many messages dispatched to the consumer, unackedMessages=10001 means 10001 messages are unacked. So, obviously, you only receive messages but didn't ack messages.

I guess the configuration maxUnackedMessagesPerConsumer in your broker.conf is 10000. So, since the unackedMessages=10001 exceeds the maxUnackedMessagesPerConsumer=10000, so the blockedConsumerOnUnackedMsgs field in the stats is true. image

Which means there are toooo many unacked messages on the consumer so that the consumer stopped dispatch messages to clients.

Please ack message after process successfully.

michalcukierman commented 5 months ago

@dao-jun,

The whole source code with the instructions on how to run it is available in the linked repository. Note that the same example works when downgrading the client to 3.0.4 (see the videos). https://github.com/apache/pulsar/issues/21104#issuecomment-2098254335

The message acknowledgment should be done after getting write confirmation from the producer. https://github.com/apache/pulsar/issues/21104#issuecomment-1743609860

This is the original code that was failing. We are no longer using it because we've decommissioned the module.: https://github.com/apache/pulsar/issues/21104#issuecomment-1702584503

https://github.com/apache/pulsar/issues/21104 was created as a reproducible example.

michalcukierman commented 5 months ago

This code creates a pipeline with the source-processor-sink. Sink ACK triggers source ACK.

@ApplicationScoped
public class Processor {

  private final AtomicLong inCount = new AtomicLong(0);
  @Incoming("requests-in")
  @Outgoing("dump-out")
  @Blocking
  PulsarOutgoingMessage<String> process(PulsarIncomingMessage<String> in) {
    System.out.println(" - Processed: " + inCount.incrementAndGet());
    return PulsarOutgoingMessage.from(in);
  }
}
dao-jun commented 5 months ago

@michalcukierman I'm not familiar with this kind of development framework, https://github.com/apache/pulsar/issues/21104#issuecomment-2140860378 is based on the stats and internal-stats you provided. The point is: all the messages dispatched to clients are not acked.

You can debug your code to confirm the messages are acked or not