apache / pulsar-client-python

Apache Pulsar Python client library
https://pulsar.apache.org/
Apache License 2.0
48 stars 38 forks source link

max_total_receiver_queue_size_across_partitions does not work #190

Open zbentley opened 2 years ago

zbentley commented 2 years ago

Describe the bug

The max_total_receiver_queue_size_across_partitions kwarg to the Python subscribe method is nonfunctional.

To Reproduce

  1. Create a persistent partitioned topic. I used 4 partitions.
  2. Create a Shared subscription on the topic.
  3. Publish 10 messages to the topic using batching_type=BatchingType.KeyBased and a unique partition key for each message (this is not necessary with a Shared subscription, but is necessary to demonstrate that this bug also affects KeyShared subscriptions).
  4. Create a consumer on the topic with the below code, and ensure it prints Got message, sleeping forever.
  5. In a second terminal, start another consumer on the topic with the below code.
  6. Observe that the second consumer does not get a message.
  7. Publish additional messages to the topic.
  8. Observe that only after the second publish step does the consumer get messages.

Consumer code:

import time
from pulsar import Client, ConsumerType, Timeout
import os

TOPIC = 'THETOPIC'
SUBSCRIPTION = 'THESUBSCRIPTION'

def main():
    client = Client(service_url='pulsar://localhost:6650')
    sub = client.subscribe(
        topic=TOPIC,
        subscription_name=SUBSCRIPTION,
        consumer_type=ConsumerType.Shared,
        max_total_receiver_queue_size_across_partitions=1,
        consumer_name=f'testconsumer-{os.getpid()}'
    )
    while True:
        try:
            msg = sub.receive(100)
            mid = msg.message_id()
            print("partition:", mid.partition(), "ledger:", mid.ledger_id(), "entry:", mid.entry_id(), "batch:", mid.batch_index())
            break
        except Timeout:
            pass
    print("Got message, sleeping forever")
    while True:
        time.sleep(1)

if __name__ == '__main__':
    main()

Expected behavior

The second consumer should receive messages from the topic immediately upon startup. The first consumer should only prevent the second consumer from getting max_total_receiver_queue_size_across_partitions messages.

I'm not sure what setting max_total_receiver_queue_size_across_partitions to 0 should do; that's not documented, and probably should be; these docs indicate that it should behave equivalent to a value of 1 with regards to other consumers' ability to get messages.

I'm not sure what the interaction is between receiver_queue_size and max_total_receiver_queue_size_across_partitions; that should be documented as well, but as part of https://github.com/apache/pulsar/issues/15702.

Additional context

After around ~320 messages in the backlog (given my message size), the second consumer will get data when it starts. I don't know why that cutoff exists.

Environment:

(chariot) zac.bentley@ZacBentleyMBP ~/Desktop/Projects/Klaviyo/chariot ∴ arch
i386
(chariot) zac.bentley@ZacBentleyMBP ~/Desktop/Projects/Klaviyo/chariot ∴ sw_vers
ProductName:    macOS
ProductVersion: 12.3.1
BuildVersion:   21E258
(chariot) zac.bentley@ZacBentleyMBP ~/Desktop/Projects/Klaviyo/chariot ∴ brew info apache-pulsar
apache-pulsar: stable 2.10.0 (bottled), HEAD
Cloud-native distributed messaging and streaming platform
https://pulsar.apache.org/
/usr/local/Cellar/apache-pulsar/2.10.0 (1,018 files, 949.7MB) *
  Poured from bottle on 2022-05-13 at 12:10:54
From: https://github.com/Homebrew/homebrew-core/blob/HEAD/Formula/apache-pulsar.rb
License: Apache-2.0
(chariot) zac.bentley@ZacBentleyMBP ~/Desktop/Projects/Klaviyo/chariot ∴ python --version
Python 3.7.13
(chariot) zac.bentley@ZacBentleyMBP ~/Desktop/Projects/Klaviyo/chariot ∴ pip show pulsar-client
Name: pulsar-client
Version: 2.10.0
Summary: Apache Pulsar Python client library
Home-page: https://pulsar.apache.org/
Author: Pulsar Devs
Author-email: dev@pulsar.apache.org
License: Apache License v2.0
Location: /Users/zac.bentley/Desktop/Projects/Klaviyo/chariot/.venv/lib/python3.7/site-packages
Requires: certifi, six
codelipenghui commented 2 years ago

@zbentley It should be related to the producer batch, the broker dispatches the whole batch to a consumer, not a message. For your test case, the producer might add 10 messages into 1 batch, so all the 10 messages will dispatch to one consumer. The other consumer will not receive any messages.

You can try to disable the message batch on the producer side.

zbentley commented 2 years ago

@codelipenghui this issue occurs with and without batching_enabled in the producer. Additionally, with batching_enabled=True it occurs regardless of which batching strategy is used.

zbentley commented 2 years ago

I edited the code in the example to print out ledger/entry/partition/batch so that it is evident that the batch index is -1 for messages received.

codelipenghui commented 2 years ago

@zbentley

In a second terminal, start another consumer on the topic with the below code.

Does the second consumer use the same subscription name as the first consumer?

zbentley commented 2 years ago

@codelipenghui no, see example code:

consumer_name=f'testconsumer-{os.getpid()}'

github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.

RobertIndie commented 6 months ago

@codelipenghui no, see example code:

consumer_name=f'testconsumer-{os.getpid()}'

This is for the consumer name. Not the subscription name.

max_total_receiver_queue_size_across_partitions only works for the partitioned topic. The root cause is that the max_total_receiver_queue_size_across_partitions only controls the receiver queue size of the sub consumers instead of the parent consumer(MultiTopicConsumer). The default receiver queue size of the parent consumer is still 1000. As a workaround, you can adjust the receiver_queue_size for the consumer to match the max_total_receiver_queue_size_across_partitions.

Here is an example for your case:

    sub = client.subscribe(
        topic=TOPIC,
        subscription_name=SUBSCRIPTION,
        consumer_type=ConsumerType.Shared,
        max_total_receiver_queue_size_across_partitions=1,
        consumer_name=f'testconsumer-{os.getpid()}',
        receiver_queue_size=1
    )

This behavior is same with the Java client.

For further improvements:

bobcorsaro commented 5 months ago

@codelipenghui no, see example code:

consumer_name=f'testconsumer-{os.getpid()}'

This is for the consumer name. Not the subscription name.

max_total_receiver_queue_size_across_partitions only works for the partitioned topic. The root cause is that the max_total_receiver_queue_size_across_partitions only controls the receiver queue size of the sub consumers instead of the parent consumer(MultiTopicConsumer). The default receiver queue size of the parent consumer is still 1000. As a workaround, you can adjust the receiver_queue_size for the consumer to match the max_total_receiver_queue_size_across_partitions.

Here is an example for your case:

    sub = client.subscribe(
        topic=TOPIC,
        subscription_name=SUBSCRIPTION,
        consumer_type=ConsumerType.Shared,
        max_total_receiver_queue_size_across_partitions=1,
        consumer_name=f'testconsumer-{os.getpid()}',
        receiver_queue_size=1
    )

This behavior is same with the Java client.

For further improvements:

  • There is a lack of testing for the max_total_receiver_queue_size_across_partitions. We need to enrich the test cases.
  • max_total_receiver_queue_size_across_partitions only works for the multiple topics consumer. It's better to print a warn log when the users use the multiple topics consumer with max_total_receiver_queue_size_across_partitions.
  • When setting the max_total_receiver_queue_size_across_partitions, the user also needs to consider the receiver_queue_size for the consumer. It's not user-friendly. We may need to find a way to improve it.

I've adjusted the test code by adding receiver_queue_size= and reproduced the same issue. Creating a backlog of 100 messages on a 4 partitioned topic, type Shared, only the first consumer gets a single message. After it disconnects, the second consumer gets a message.

bobcorsaro commented 5 months ago

Tested on pulsar-client-3.4.0 and it works as expected. This appears to be a bug in the version we are using, pulsar-client-2.10.1

RobertIndie commented 5 months ago

Tested on pulsar-client-3.4.0 and it works as expected. This appears to be a bug in the version we are using, pulsar-client-2.10.1

Thanks for your information. I just tested the workaround and it's not working on pulsar-client-2.10. Yes. There might be some bugs on pulsar-client-2.10 but they have been fixed after 3.0. The workaround should only be working >= 3.0

After investigation, the issue is related to the PartitionedConsumerImpl from the Pulsar CPP client side, which has been refactored by this PR:https://github.com/apache/pulsar/pull/16969.