apache / pulsar-client-python

Apache Pulsar Python client library
https://pulsar.apache.org/
Apache License 2.0
49 stars 38 forks source link

pattern_auto_discovery_period in pulsar.Client is not used. #171

Open mikeawalker opened 7 months ago

mikeawalker commented 7 months ago

This took me a little while to track down but im pretty sure its an actual bug. This seems simple enough to fix and I'd be willing to take on the PR to fix it. I've prototyped that change locally and its 1 line in the code and 1-2 lines in the unit test.

The Client class has a subscribe function. It has a parameter called pattern_auto_discovery_period

This value is unused and the value will always default to 60 seconds (because that is what is in the cpp files).

Here is a POC that shows the problem

import pulsar 
import multiprocessing as M
import time 
import re 
import _pulsar 

mode = "non-persistent" # ?? nothing received

TOPIC = f"{mode}://public/default/my-topic/whatever"
TOPIC2 = f"{mode}://public/default/my-topic/something"
TOPIC_WILD = f"{mode}://public/default/my-topic/.*"

def callback( consumer , msg ):
    print(f"Got topic {msg.topic_name()} {msg.data()}\n", flush=True)
def main():

    sub_mode = _pulsar.RegexSubscriptionMode.NonPersistentOnly

    time.sleep( 2 )
    client = pulsar.Client('pulsar://localhost:6650')
    # This topic will be found right away becuase i created it first!
    producer = client.create_producer(TOPIC)

    client.subscribe(  re.compile(TOPIC_WILD) , subscription_name="subname111",message_listener=callback ,  regex_subscription_mode=sub_mode , pattern_auto_discovery_period=100 )
    # This topic will not be found until 60 seconds into the run regardless of pattern_auto_discovery_period
    producer2 = client.create_producer( TOPIC2 )

    for k in range(0,120):
        time.sleep(1 )
        o = f"some data {k}"
        if k % 2 == 0: 
            producer.send( o.encode() )
        else:
            producer2.send( o.encode() )
    client.close()
if __name__ == "__main__":
    main( )

Here is a screenshot of the pattern discovery happening at 60 seconds.

image

There is a unit test that is supposed to be testing for this: test_topics_pattern_consumer

However, this test has a bug as well. Since all the producers are made before the subscribe call the topics are all found right away.


        producer1 = client.create_producer(topic1)
        producer2 = client.create_producer(topic2)
        producer3 = client.create_producer(topic3) ##### MOVE THIS after the subscribe to demonstrate auto-recovery!

        consumer = client.subscribe(
            re.compile(topics_pattern),
            "my-pattern-consumer-sub",
            consumer_type=ConsumerType.Shared,
            receiver_queue_size=10,
            pattern_auto_discovery_period=1,
        )

        # wait enough time to trigger auto discovery
        time.sleep(2)