apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.24k stars 3.58k forks source link

[Bug] [client] transactional consumer do not work though corresponding TP has been recovered. #19148

Open thetumbled opened 1 year ago

thetumbled commented 1 year ago

Search before asking

Version

master branch.

Minimal reproduce step

# create topic with 10 partition
bin/pulsar-admin topics create-partitioned-topic  test/tb5/testTxn10 --partitions 10

# start perf process
bin/pulsar-perf produce -r 2048000 -bm 10 -txn -nmt 1000 persistent://test/tb5/testTxn10
bin/pulsar-perf consume -r 2048000 -txn -nmt 1500  persistent://test/tb5/testTxn10

# Restart broker, which will trigger topic unload and reload, TP, TB, TC recovery.
bin/pulsar-daemon restart broker

What did you expect to see?

we expect to see that transactional producer and consumer work like before.

What did you see instead?

some partitions of topic persistent://test/tb5/testTxn10 do not work. There are only two partitions work normally. image

The client reports the following error. The partition without traffic will have the corresponding error message.

2023-01-06T14:31:03,907+0800 [pulsar-client-io-1-1] WARN org.apache.pulsar.client.impl.ClientCnx - [id: 0x52f04075, L:/172.24.25.42:49158 - R:cluster2-nn0.bigo.baina/172.24.25.41:6650] Received error from server: **Exclusive consumer is already connected**
2023-01-06T14:31:03,907+0800 [pulsar-client-io-1-1] WARN org.apache.pulsar.client.impl.ConsumerImpl - [persistent://test/tb5/testTxn10-**partition-2**][sub] **Failed to subscribe to topic** on cluster2-nn0.bigo.baina/172.24.25.41:6650

2023-01-06T14:31:33,177+0800 [pulsar-client-io-1-1] WARN org.apache.pulsar.client.impl.ClientCnx - [id: 0x951d9be7, L:/172.24.25.42:49171 - R:cluster2-nn0.bigo.baina/172.24.25.41:6650] Received error from server: Exclusive consumer is already connected
2023-01-06T14:31:33,177+0800 [pulsar-client-io-1-1] WARN org.apache.pulsar.client.impl.ConsumerImpl - [persistent://test/tb5/testTxn10-partition-6][sub] Failed to subscribe to topic on cluster2-nn0.bigo.baina/172.24.25.41:6650

And i find that all TP corresponding to 10 partitions of topics have been recovered.

python3 calculateTbpRecoverTime.py TP
TP for topic:persistent://test/tb5/testTxn10-partition-0 sub:sub recover time in milliseconds: 431442
TP for topic:persistent://test/tb5/testTxn10-partition-1 sub:sub recover time in milliseconds: 275965
TP for topic:persistent://test/tb5/testTxn10-partition-2 sub:sub recover time in milliseconds: 302184
TP for topic:persistent://test/tb5/testTxn10-partition-3 sub:sub recover time in milliseconds: 282678
TP for topic:persistent://test/tb5/testTxn10-partition-4 sub:sub recover time in milliseconds: 336789
TP for topic:persistent://test/tb5/testTxn10-partition-5 sub:sub recover time in milliseconds: 341019
TP for topic:persistent://test/tb5/testTxn10-partition-6 sub:sub recover time in milliseconds: 279376
TP for topic:persistent://test/tb5/testTxn10-partition-7 sub:sub recover time in milliseconds: 431452
TP for topic:persistent://test/tb5/testTxn10-partition-8 sub:sub recover time in milliseconds: 282711
TP for topic:persistent://test/tb5/testTxn10-partition-9 sub:sub recover time in milliseconds: 250019

Query the information of one topic without traffic.

bin/pulsar-admin topics stats persistent://test/tb5/testTxn10-partition-0
{
"msgRateIn" : 894.0452581776605,
"msgThroughputIn" : 931086.1450541553,
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesInCounter" : 982555920,
"msgInCounter" : 943472,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"averageMsgSize" : 1041.4306619689428,
"msgChunkPublished" : false,
"storageSize" : 13418100120,
"backlogSize" : 13417146498,
"publishRateLimitedTimes" : 0,
"earliestMsgPublishTimeInBacklogs" : 0,
"offloadedStorageSize" : 0,
"lastOffloadLedgerId" : 0,
"lastOffloadSuccessTimeStamp" : 0,
"lastOffloadFailureTimeStamp" : 0,
"ongoingTxnCount" : 0,
"abortedTxnCount" : 0,
"committedTxnCount" : 1254,
"publishers" : [ {
"accessMode" : "Shared",
"msgRateIn" : 894.0452581776605,
"msgThroughputIn" : 931086.1450541553,
"averageMsgSize" : 1041.4306619689428,
"chunkedMessageRate" : 0.0,
"producerId" : 0,
"supportsPartialProducer" : false,
"metadata" : { },
"address" : "/172.24.25.42:48102",
"producerName" : "pulsar-cluster-fwz-1-46-10",
"connectedSince" : "2023-01-06T14:18:20.440183522+08:00",
"clientVersion" : "2.9.3"
} ],
"waitingPublishers" : 0,
"subscriptions" : {
"sub" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0,
"msgBacklog" : 1299568,
"backlogSize" : 0,
"earliestMsgPublishTimeInBacklog" : 0,
"msgBacklogNoDelayed" : 1299568,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 0,
"type" : "Exclusive",
"activeConsumerName" : "46bbb",
"msgRateExpired" : 0.0,
"totalMsgExpired" : 0,
"lastExpireTimestamp" : 0,
"lastConsumedFlowTimestamp" : 0,
"lastConsumedTimestamp" : 0,
"lastAckedTimestamp" : 0,
"lastMarkDeleteAdvancedTimestamp" : 0,
"consumers" : [ {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0.0,
**"consumerName" : "46bbb",** 
"availablePermits" : 0,
"unackedMessages" : 0,
"avgMessagesPerEntry" : 0,
"blockedConsumerOnUnackedMsgs" : false,
"lastAckedTimestamp" : 0,
"lastConsumedTimestamp" : 0,
"lastConsumedFlowTimestamp" : 0,
"metadata" : { },
 **"address" : "/172.24.25.42:48441",
"connectedSince" : "2023-01-06T14:18:20.999288614+08:00",** 
"clientVersion" : "2.9.3",
"lastAckedTime" : "1970-01-01T08:00:00+08:00",
"lastConsumedTime" : "1970-01-01T08:00:00+08:00"
} ],
"isDurable" : true,
"isReplicated" : false,
"allowOutOfOrderDelivery" : false,
"consumersAfterMarkDeletePosition" : { },
"nonContiguousDeletedMessagesRanges" : 7161,
"nonContiguousDeletedMessagesRangesSerializedSize" : 139391,
"delayedTrackerMemoryUsage" : 0,
"subscriptionProperties" : { },
"filterProcessedMsgCount" : 0,
"filterAcceptedMsgCount" : 0,
"filterRejectedMsgCount" : 0,
"filterRescheduledMsgCount" : 0,
"durable" : true,
"replicated" : false
}
},
"replication" : { },
"deduplicationStatus" : "Disabled",
"nonContiguousDeletedMessagesRanges" : 7161,
"nonContiguousDeletedMessagesRangesSerializedSize" : 139391,
"delayedMessageIndexSizeInBytes" : 0,
"compaction" : {
"lastCompactionRemovedEventCount" : 0,
"lastCompactionSucceedTimestamp" : 0,
"lastCompactionFailedTimestamp" : 0,
"lastCompactionDurationTimeInMills" : 0
},
"ownerBroker" : "cluster2-nn0.bigo.baina:8081"
}

It is found that the subscription sub of this partition has been connected with a consumer (called 46bbb), but it has no traffic. The pressure testing tool is trying to reconnect and create a new consumer with subscription sub. However, since the sub is exclusive, and there is an inactive consumer 46bbb, which causes that the new consumer cannot be created successfully. It can be seen from the connection time that this non working consumer is created after broker restarting, but the problem is why it does not work?

As time goes by, partitions that cannot work before begin to have consuming traffic. image Query partition information again.

bin/pulsar-admin topics stats persistent://test/tb5/testTxn10-partition-0
{
"msgRateIn" : 899.7183507594756,
"msgThroughputIn" : 938068.0225423912,
"msgRateOut" : 2017.087077147803,
"msgThroughputOut" : 2100638.74889055,
"bytesInCounter" : 2524424057,
"msgInCounter" : 2423093,
"bytesOutCounter" : 340584925,
"msgOutCounter" : 327056,
"averageMsgSize" : 1042.624085360206,
"msgChunkPublished" : false,
"storageSize" : 14960070221,
"backlogSize" : 14959116599,
"publishRateLimitedTimes" : 0,
"earliestMsgPublishTimeInBacklogs" : 0,
"offloadedStorageSize" : 0,
"lastOffloadLedgerId" : 0,
"lastOffloadSuccessTimeStamp" : 0,
"lastOffloadFailureTimeStamp" : 0,
"ongoingTxnCount" : 0,
"abortedTxnCount" : 0,
"committedTxnCount" : 3012,
"publishers" : [ {
"accessMode" : "Shared",
"msgRateIn" : 899.7183507594756,
"msgThroughputIn" : 938068.0225423912,
"averageMsgSize" : 1042.624085360206,
"chunkedMessageRate" : 0.0,
"producerId" : 0,
"supportsPartialProducer" : false,
"metadata" : { },
"address" : "/172.24.25.42:48102",
"producerName" : "pulsar-cluster-fwz-1-46-10",
"connectedSince" : "2023-01-06T14:18:20.440183522+08:00",
"clientVersion" : "2.9.3"
} ],
"waitingPublishers" : 0,
"subscriptions" : {
"sub" : {
"msgRateOut" : 2017.087077147803,
"msgThroughputOut" : 2100638.74889055,
"bytesOutCounter" : 340584925,
"msgOutCounter" : 327056,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 2017.1037374578968,
"chunkedMessageRate" : 0,
"msgBacklog" : 1449916,
"backlogSize" : 0,
"earliestMsgPublishTimeInBacklog" : 0,
"msgBacklogNoDelayed" : 1449916,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 0,
"type" : "Exclusive",
"activeConsumerName" : "46bbb",
"msgRateExpired" : 0.0,
"totalMsgExpired" : 0,
"lastExpireTimestamp" : 0,
"lastConsumedFlowTimestamp" : 1672988583828,
"lastConsumedTimestamp" : 1672988585368,
"lastAckedTimestamp" : 1672988583932,
"lastMarkDeleteAdvancedTimestamp" : 0,
"consumers" : [ {
"msgRateOut" : 2017.087077147803,
"msgThroughputOut" : 2100638.74889055,
"bytesOutCounter" : 340584925,
"msgOutCounter" : 327056,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 2017.1037374578968,
"chunkedMessageRate" : 0.0,
**"consumerName" : "46bbb",**
"availablePermits" : -56,
"unackedMessages" : 0,
"avgMessagesPerEntry" : 10,
"blockedConsumerOnUnackedMsgs" : false,
"lastAckedTimestamp" : 1672988583932,
"lastConsumedTimestamp" : 1672988585368,
"lastConsumedFlowTimestamp" : 1672988583828,
"metadata" : { },
**"address" : "/172.24.25.42:48441",
"connectedSince" : "2023-01-06T14:18:20.999288614+08:00",**
"clientVersion" : "2.9.3",
"lastAckedTime" : "2023-01-06T15:03:03.932+08:00",
"lastConsumedTime" : "2023-01-06T15:03:05.368+08:00"
} ],
"isDurable" : true,
"isReplicated" : false,
"allowOutOfOrderDelivery" : false,
"consumersAfterMarkDeletePosition" : { },
"nonContiguousDeletedMessagesRanges" : 7161,
"nonContiguousDeletedMessagesRangesSerializedSize" : 139391,
"delayedTrackerMemoryUsage" : 0,
"subscriptionProperties" : { },
"filterProcessedMsgCount" : 0,
"filterAcceptedMsgCount" : 0,
"filterRejectedMsgCount" : 0,
"filterRescheduledMsgCount" : 0,
"durable" : true,
"replicated" : false
}
},
"replication" : { },
"deduplicationStatus" : "Disabled",
"nonContiguousDeletedMessagesRanges" : 7161,
"nonContiguousDeletedMessagesRangesSerializedSize" : 139391,
"delayedMessageIndexSizeInBytes" : 0,
"compaction" : {
"lastCompactionRemovedEventCount" : 0,
"lastCompactionSucceedTimestamp" : 0,
"lastCompactionFailedTimestamp" : 0,
"lastCompactionDurationTimeInMills" : 0
},
"ownerBroker" : "cluster2-nn0.bigo.baina:8081"
}

It is found that the current working consumer is the previous non working consumer!

Anything else?

No response

Are you willing to submit a PR?

github-actions[bot] commented 1 year ago

The issue had no activity for 30 days, mark with Stale label.