apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.3k stars 3.59k forks source link

`receiver_queue_size` of 1 does not work; Shared consumers with receiver_queue_size=1 can "steal" messages that other consumers should receive #15703

Open zbentley opened 2 years ago

zbentley commented 2 years ago

Describe the bug

When the Python client calls subscribe, setting receiver_queue_size to a value of 1 results in an effective receiver queue behavior of more than one; Shared-type consumers with receiver_queue_size=1 can "steal" messages that other consumers could process.

To Reproduce

  1. Create a persistent non partitioned topic.
  2. Create a Shared subscription on the topic.
  3. Publish 10 messages to the topic with sequential numeric payloads, e.g. message-0, message-1 and so on.
  4. Create a consumer on the topic with the below code, and ensure it prints Press a key to acknowledge messages.
  5. In a second terminal, start another consumer on the topic with the below code, and ensure it prints Press a key to acknowledge messages.
  6. Press "enter" in the first consumer's terminal.
  7. Observe that the first consumer processes fewer than 9 messages (in my tests it was usually 8, though I saw 6 occasionally; that might have been a defect on my side though).
  8. Press "enter" in the second consumer's terminal.
  9. Observe that the second consumer processes more than 1 message.

Consumer code:

from pulsar import Client, ConsumerType, Timeout
import os

TOPIC = 'THETOPIC'
SUBSCRIPTION = 'THESUBSCRIPTION'

def recv(sub):
    while True:
        try:
            msg = sub.receive(100)
            print("Got message", msg.data())
            return msg
        except Timeout:
            pass

def main():
    client = Client(service_url='pulsar://localhost:6650')
    sub = client.subscribe(
        topic=TOPIC,
        subscription_name=SUBSCRIPTION,
        consumer_type=ConsumerType.Shared,
        receiver_queue_size=1,
        consumer_name=f'testconsumer-{os.getpid()}'
    )
    msg = recv(sub)
    mid = msg.message_id()
    print("partition:", mid.partition(), "ledger:", mid.ledger_id(), "entry:", mid.entry_id(), "batch:", mid.batch_index())
    input("Press a key to acknowledge messages")
    while True:
        sub.acknowledge(msg)
        msg = recv(sub)

if __name__ == '__main__':
    main()

Expected behavior

The first consumer should process exactly 9 messages, every time. The second consumer should process exactly 1 message, every time. The extra messages that go to consumer 2 should go to consumer 1; otherwise, consumer 2 can "falsely steal" those messages.

Environment: Same environment as https://github.com/apache/pulsar-client-python/issues/190

zbentley commented 2 years ago

This is especially severe in our use case because many of our messages take a long time to process, so we parallelize by running many consumers and setting each one's receiver queue size to 1. However, this bug results in 50% or more of messages being artificially delayed even when idle consumers are available to handle them.

codelipenghui commented 2 years ago

@zbentley It should be the same reason with https://github.com/apache/pulsar-client-python/issues/190, please try to disable the produce side batch.

zbentley commented 2 years ago

@codelipenghui I have reproduced with batching both enabled and disabled.

Updated example code to print out batch index so it is clear that it is -1.

merlimat commented 2 years ago

@zbentley The pre-fetching is tied to the receive() calls and not the acknowledgements.

As long as you keep calling receive, you are making more space in the internal receiver queue and the client library will ask for more messages to the broker. In this case, it is expected to be a race for consumer-2 to get the 2nd message prefetched.

To have the precise behavior of 1 single message, you need to disable pre-fetching completely, by setting receiver_queue_size=0.

zbentley commented 2 years ago

@merlimat that makes great sense, thanks. So "receiver queue size" could be better thought of as "extra messages to fetch (if available) whenever receive is called"?

If so I can totally work with that; it would be great if that were documented more clearly. Documentation indicates that Pulsar uses a pull-based protocol, but doesn't go into detail about when pulls happen and how they work. It'd be great if the documentation for each client linked to a wiki article that explained:

I can take a stab at writing such an article if you'd like.

codelipenghui commented 2 years ago

@zbentley https://pulsar.apache.org/docs/next/developing-binary-protocol#flow-control here is some context for this part.

github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.

wallacepeng commented 1 year ago

how is it going on? i believe the code in multiple topic consumer set to minimum value 2

MultiTopicsConsumerImpl(PulsarClientImpl client, String singleTopic, ConsumerConfigurationData<T> conf, ExecutorService listenerExecutor, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist) {
        super(client, singleTopic, conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, subscribeFuture, schema, interceptors);

wonder if it can allow to set to 1 ? I also really need this as i have long running jobs so it will allow clients to concurrently run all the jobs .