apache / pulsar-client-python

Apache Pulsar Python client library
https://pulsar.apache.org/
Apache License 2.0
49 stars 38 forks source link

[Bug] receiver_queue_size is not honoured with partitioned topic #21593 #166

Open ujjain7194 opened 7 months ago

ujjain7194 commented 7 months ago

Versions Pulsar version 2.10.4 OS: Ubuntu(version=22.04) Client(Python): pulsar-client(Version=3.3.0)

Minimal reproduce step Create a partitioned topic with one partition bin/pulsar-admin topics create-partitioned-topic persistent://public/default/test-consumer -p 1

Publish some messages

Create one consumer as following consumer = client.subscribe( topic='persistent://public/default/test-consumer', subscription_name='test', consumer_type=ConsumerType.Shared, initial_position=InitialPosition.Earliest, receiver_queue_size=1, max_total_receiver_queue_size_across_partitions=1, consumer_name=None, negative_ack_redelivery_delay_ms=60000, unacked_messages_timeout_ms=3600000)

Receive and ack one message consumer.receive(timeout_millis=30000) consumer.acknowledge(message=message_id)

Check stats(Its showing 2 unack messages but its not received yet, also if we create another consumer and try to consume messages its not delivering)

What did you expect to see? It should not show 2 unack messages, instead it should show 1 message since receiver size is 1.

What did you see instead? I can see 2 unack in stats as following Command: bin/pulsar-admin topics partitioned-stats persistent://public/default/test-consumer

{ "msgRateIn" : 0.0, "msgThroughputIn" : 0.0, "msgRateOut" : 0.07158759338136238, "msgThroughputOut" : 4.271393071754622, "bytesInCounter" : 179, "msgInCounter" : 3, "bytesOutCounter" : 179, "msgOutCounter" : 3, "averageMsgSize" : 0.0, "msgChunkPublished" : false, "storageSize" : 179, "backlogSize" : 120, "publishRateLimitedTimes" : 0, "earliestMsgPublishTimeInBacklogs" : 0, "offloadedStorageSize" : 0, "lastOffloadLedgerId" : 0, "lastOffloadSuccessTimeStamp" : 0, "lastOffloadFailureTimeStamp" : 0, "publishers" : [ ], "waitingPublishers" : 0, "subscriptions" : { "test" : { "msgRateOut" : 0.07158759338136238, "msgThroughputOut" : 4.271393071754622, "bytesOutCounter" : 179, "msgOutCounter" : 3, "msgRateRedeliver" : 0.0, "messageAckRate" : 0.0, "chunkedMessageRate" : 0, "msgBacklog" : 2, "backlogSize" : 0, "earliestMsgPublishTimeInBacklog" : 0, "msgBacklogNoDelayed" : 2, "blockedSubscriptionOnUnackedMsgs" : false, "msgDelayed" : 0, "unackedMessages" : 2, "msgRateExpired" : 0.0, "totalMsgExpired" : 0, "lastExpireTimestamp" : 0, "lastConsumedFlowTimestamp" : 0, "lastConsumedTimestamp" : 0, "lastAckedTimestamp" : 0, "lastMarkDeleteAdvancedTimestamp" : 0, "consumers" : [ { "msgRateOut" : 0.07158759338136238, "msgThroughputOut" : 4.271393071754622, "bytesOutCounter" : 179, "msgOutCounter" : 3, "msgRateRedeliver" : 0.0, "messageAckRate" : 0.023862530971099603, "chunkedMessageRate" : 0.0, "availablePermits" : 0, "unackedMessages" : 2, "avgMessagesPerEntry" : 0, "blockedConsumerOnUnackedMsgs" : false, "lastAckedTimestamp" : 0, "lastConsumedTimestamp" : 0 }, { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "bytesOutCounter" : 0, "msgOutCounter" : 0, "msgRateRedeliver" : 0.0, "messageAckRate" : 0.0, "chunkedMessageRate" : 0.0, "availablePermits" : 1, "unackedMessages" : 0, "avgMessagesPerEntry" : 0, "blockedConsumerOnUnackedMsgs" : false, "lastAckedTimestamp" : 0, "lastConsumedTimestamp" : 0 } ], "isDurable" : true, "isReplicated" : false, "allowOutOfOrderDelivery" : false, "consumersAfterMarkDeletePosition" : { }, "nonContiguousDeletedMessagesRanges" : 0, "nonContiguousDeletedMessagesRangesSerializedSize" : 0, "subscriptionProperties" : { }, "durable" : true, "replicated" : false } }, "replication" : { }, "nonContiguousDeletedMessagesRanges" : 0, "nonContiguousDeletedMessagesRangesSerializedSize" : 0, "compaction" : { "lastCompactionRemovedEventCount" : 0, "lastCompactionSucceedTimestamp" : 0, "lastCompactionFailedTimestamp" : 0, "lastCompactionDurationTimeInMills" : 0 }, "metadata" : { "partitions" : 1 }, "partitions" : { } }