streamnative / pulsar-archived

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org
Apache License 2.0
73 stars 25 forks source link

ISSUE-7726: Consumer can't receive message from topic #1331

Open sijie opened 4 years ago

sijie commented 4 years ago

Original Issue: apache/pulsar#7726


Describe the bug I create a topic with 4 partitions.When I use key shared mode to consumer message,sometimes, I can't receive any message from a specify partition.In this scenes,I create two consumers, one of it subscribe the topic(C1), one of it subscribe two partitions of the same topic(C2). I start C1 first, then C2. At first, C1 can receive all message,C2 can't receive any message.When I close C1, C2 can receive message from specify partitions. But when I start C1 again, all consumer can't recevie from the partitions those C2 subscribed.

The code of produce

import pulsar
import time
from multiprocessing import Process

def produce_test(partition, key, message):
    client = pulsar.Client('pulsar://localhost:6650')
    producer = client.create_producer(partition)
    print(client.get_topic_partitions("test10"))
    for i in range(240):
        time.sleep(2)
        message_mix = str(int(i)) + message
        print(message_mix)
        producer.send((message_mix).encode('utf-8'), partition_key=key)
    time.sleep(1000)
    client.close()

if __name__ == "__main__":
    p1 = Process(target=produce_test, args=("test10", "10000", "aaaa",))
    p2 = Process(target=produce_test, args=("test10", "20000", "bbbb",))
    p3 = Process(target=produce_test, args=("test10", "30000", "cccc",))
    p4 = Process(target=produce_test, args=("test10", "40000", "dddd",))
    p1.start()
    p2.start()
    p3.start()
    p4.start()

The code of C1:

import pulsar
import time
from _pulsar import ConsumerType
from multiprocessing import Process

def consumer_data(topic, process_index):
    client = pulsar.Client('pulsar://localhost:6650')
    print(len(client._consumers))
    consumer = client.subscribe(topic, 'my-subscription', consumer_type=ConsumerType.KeyShared)
    flag = True
    start_time = int(time.time() * 1000)
    while True:
        msg = consumer.receive()
        print(
            "Process {} Received message '{}' id='{}' partition={}".format(process_index, msg.data(), msg.message_id(),
                                                                           msg.topic_name()))
        consumer.acknowledge(msg)

if __name__ == "__main__":
    p1 = Process(target=consumer_data, args=("test10", "p1", ))
    p1.start()

The code of C2

import pulsar
import time
from _pulsar import ConsumerType
from multiprocessing import Process

def consumer_data(topic, process_index,):
    client = pulsar.Client('pulsar://localhost:6650')
    print(len(client._consumers))
    consumer = client.subscribe(topic, 'my-subscription', consumer_type=ConsumerType.KeyShared)
    flag = True
    start_time = int(time.time() * 1000)
    while True:
        msg = consumer.receive()
        print(
            "Process {} Received message '{}' id='{}' partition={}".format(process_index, msg.data(), msg.message_id(),
                                                                           msg.topic_name()))
        consumer.acknowledge(msg)

if __name__ == "__main__":
    p2 = Process(target=consumer_data, args=(["test10-partition-2", "test10-partition-3"], "2", ))
    p2.start()

The output of C1:

Process p1 Received message 'b'236dddd'' id='(15,236,0,-1)' partition=persistent://public/default/test10-partition-0
Process p1 Received message 'b'236cccc'' id='(17,472,1,-1)' partition=persistent://public/default/test10-partition-1
Process p1 Received message 'b'236aaaa'' id='(17,473,1,-1)' partition=persistent://public/default/test10-partition-1
Process p1 Received message 'b'237dddd'' id='(15,237,0,-1)' partition=persistent://public/default/test10-partition-0
Process p1 Received message 'b'237cccc'' id='(17,474,1,-1)' partition=persistent://public/default/test10-partition-1
Process p1 Received message 'b'237aaaa'' id='(17,475,1,-1)' partition=persistent://public/default/test10-partition-1
Process p1 Received message 'b'238dddd'' id='(15,238,0,-1)' partition=persistent://public/default/test10-partition-0
Process p1 Received message 'b'238cccc'' id='(17,476,1,-1)' partition=persistent://public/default/test10-partition-1
Process p1 Received message 'b'238aaaa'' id='(17,477,1,-1)' partition=persistent://public/default/test10-partition-1
Process p1 Received message 'b'239dddd'' id='(15,239,0,-1)' partition=persistent://public/default/test10-partition-0
Process p1 Received message 'b'239cccc'' id='(17,478,1,-1)' partition=persistent://public/default/test10-partition-1
Process p1 Received message 'b'239aaaa'' id='(17,479,1,-1)' partition=persistent://public/default/test10-partition-1
2020-08-03 15:54:37.531 INFO  [140699652921088] ConsumerStatsImpl:65 | Consumer [persistent://public/default/test10-partition-0, my-subscription, 0] , ConsumerStatsImpl (numBytesRecieved_ = 1535, totalNumBytesRecieved_ = 1535, receivedMsgMap_ = {[Key: Ok, Value: 233], }, ackedMsgMap_ = {[Key: {Result: Ok, ackType: 0}, Value: 233], }, totalReceivedMsgMap_ = {[Key: Ok, Value: 233], }, totalAckedMsgMap_ = {[Key: {Result: Ok, ackType: 0}, Value: 233], })
2020-08-03 15:54:37.531 INFO  [140699652921088] ConsumerStatsImpl:65 | Consumer [persistent://public/default/test10-partition-1, my-subscription, 1] , ConsumerStatsImpl (numBytesRecieved_ = 3070, totalNumBytesRecieved_ = 3070, receivedMsgMap_ = {[Key: Ok, Value: 466], }, ackedMsgMap_ = {[Key: {Result: Ok, ackType: 0}, Value: 466], }, totalReceivedMsgMap_ = {[Key: Ok, Value: 466], }, totalAckedMsgMap_ = {[Key: {Result: Ok, ackType: 0}, Value: 466], })
2020-08-03 15:54:37.531 INFO  [140699652921088] ConsumerStatsImpl:65 | Consumer [persistent://public/default/test10-partition-2, my-subscription, 2] , ConsumerStatsImpl (numBytesRecieved_ = 0, totalNumBytesRecieved_ = 0, receivedMsgMap_ = {}, ackedMsgMap_ = {}, totalReceivedMsgMap_ = {}, totalAckedMsgMap_ = {})
2020-08-03 15:54:37.531 INFO  [140699652921088] ConsumerStatsImpl:65 | Consumer [persistent://public/default/test10-partition-3, my-subscription, 3] , ConsumerStatsImpl (numBytesRecieved_ = 0, totalNumBytesRecieved_ = 0, receivedMsgMap_ = {}, ackedMsgMap_ = {}, totalReceivedMsgMap_ = {}, totalAckedMsgMap_ = {})

The output of C2:

2020-08-03 15:49:46.326 INFO  [140382617937728] ConnectionPool:85 | Created connection for pulsar://localhost:6650
2020-08-03 15:49:46.326 INFO  [140382567016192] ClientConnection:335 | [127.0.0.1:50638 -> 127.0.0.1:6650] Connected to broker
2020-08-03 15:49:46.329 INFO  [140382567016192] AckGroupingTrackerEnabled:53 | ACK grouping is enabled, grouping time 100ms, grouping max size 1000
2020-08-03 15:49:46.329 INFO  [140382567016192] HandlerBase:53 | [persistent://public/default/test10-partition-2, my-subscription, 0] Getting connection from pool
2020-08-03 15:49:46.329 INFO  [140382567016192] AckGroupingTrackerEnabled:53 | ACK grouping is enabled, grouping time 100ms, grouping max size 1000
2020-08-03 15:49:46.329 INFO  [140382567016192] HandlerBase:53 | [persistent://public/default/test10-partition-3, my-subscription, 1] Getting connection from pool
2020-08-03 15:49:46.331 INFO  [140382567016192] ConsumerImpl:199 | [persistent://public/default/test10-partition-2, my-subscription, 0] Created consumer on broker [127.0.0.1:50638 -> 127.0.0.1:6650] 
2020-08-03 15:49:46.331 INFO  [140382567016192] ConsumerImpl:199 | [persistent://public/default/test10-partition-3, my-subscription, 1] Created consumer on broker [127.0.0.1:50638 -> 127.0.0.1:6650] 
2020-08-03 15:49:46.331 INFO  [140382567016192] MultiTopicsConsumerImpl:99 | Successfully Subscribed to Topics
2020-08-03 15:59:46.330 INFO  [140382567016192] ConsumerStatsImpl:65 | Consumer [persistent://public/default/test10-partition-2, my-subscription, 0] , ConsumerStatsImpl (numBytesRecieved_ = 0, totalNumBytesRecieved_ = 0, receivedMsgMap_ = {}, ackedMsgMap_ = {}, totalReceivedMsgMap_ = {}, totalAckedMsgMap_ = {})
2020-08-03 15:59:46.330 INFO  [140382567016192] ConsumerStatsImpl:65 | Consumer [persistent://public/default/test10-partition-3, my-subscription, 1] , ConsumerStatsImpl (numBytesRecieved_ = 0, totalNumBytesRecieved_ = 0, receivedMsgMap_ = {}, ackedMsgMap_ = {}, totalReceivedMsgMap_ = {}, totalAckedMsgMap_ = {})
sijie commented 4 years ago

We need to reproduce this behavior and add a test to SN-tests.