Open kadlmt opened 1 year ago
It can only be fixed by restarting the broker.I think it's a broker bug
I've faced similar problem today. With a shared subscription, ackTimeout and receivers queue=8.
I was able to read some of the messages (around 100, and the consumer get stucked). I was using 3.1.0 Java client. Restarts of the client didn't help. I saw in the log files that one message was marked for later delivery (ack message tracker)
I was able to read the whole topic using pulsar-perf with the default settings.
I see that the consumer has similar configuration, so it may be related.
I can reproduce the issue on my system now. Using build from master from Monday.
I have a topic using 3
partitions with 1100013
messages.
The client and the consumer:
client = PulsarClient.builder()
.serviceUrl(this.pulsarBrokerUrl)
.build();
Consumer<PublicationEvent> consumer = client.newConsumer(Schema.PROTOBUF(PublicationEvent.class))
.topic(sourceTopic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.receiverQueueSize(8)
.ackTimeout(ackTimeout, TimeUnit.SECONDS)
.subscribe();
was able to retrieve 53527
messages and stopped. I see in the log files the entry:
[ConsumerBase{subscription='publications-router', consumerName='8c650', topic='persistent://websight/dxp/monolog'}] 1 messages will be re-delivered
I can read the messages using pulsar-perf with exclusive consumer.
stats:
{
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesInCounter" : 12559297597,
"msgInCounter" : 733015,
"bytesOutCounter" : 126060797203,
"msgOutCounter" : 7360029,
"averageMsgSize" : 0.0,
"msgChunkPublished" : false,
"storageSize" : 18847011097,
"backlogSize" : 18012733217,
"publishRateLimitedTimes" : 0,
"earliestMsgPublishTimeInBacklogs" : 0,
"offloadedStorageSize" : 0,
"lastOffloadLedgerId" : 0,
"lastOffloadSuccessTimeStamp" : 0,
"lastOffloadFailureTimeStamp" : 0,
"ongoingTxnCount" : 0,
"abortedTxnCount" : 0,
"committedTxnCount" : 0,
"publishers" : [ ],
"waitingPublishers" : 0,
"subscriptions" : {
"publications-router" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 467088550,
"msgOutCounter" : 29832,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0,
"msgBacklog" : 1046486,
"backlogSize" : 18012733217,
"earliestMsgPublishTimeInBacklog" : 0,
"msgBacklogNoDelayed" : 1046486,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 48,
"type" : "Shared",
"msgRateExpired" : 0.0,
"totalMsgExpired" : 0,
"lastExpireTimestamp" : 0,
"lastConsumedFlowTimestamp" : 0,
"lastConsumedTimestamp" : 0,
"lastAckedTimestamp" : 0,
"lastMarkDeleteAdvancedTimestamp" : 0,
"consumers" : [ {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 374080,
"msgOutCounter" : 24,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0.0,
"availablePermits" : 0,
"unackedMessages" : 24,
"avgMessagesPerEntry" : 0,
"blockedConsumerOnUnackedMsgs" : false,
"lastAckedTimestamp" : 0,
"lastConsumedTimestamp" : 0,
"lastConsumedFlowTimestamp" : 0,
"lastAckedTime" : "1970-01-01T00:00:00Z",
"lastConsumedTime" : "1970-01-01T00:00:00Z"
}, {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 374076,
"msgOutCounter" : 24,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0.0,
"availablePermits" : 0,
"unackedMessages" : 24,
"avgMessagesPerEntry" : 0,
"blockedConsumerOnUnackedMsgs" : false,
"lastAckedTimestamp" : 0,
"lastConsumedTimestamp" : 0,
"lastConsumedFlowTimestamp" : 0,
"lastAckedTime" : "1970-01-01T00:00:00Z",
"lastConsumedTime" : "1970-01-01T00:00:00Z"
} ],
"isDurable" : true,
"isReplicated" : false,
"allowOutOfOrderDelivery" : false,
"consumersAfterMarkDeletePosition" : { },
"nonContiguousDeletedMessagesRanges" : 4,
"nonContiguousDeletedMessagesRangesSerializedSize" : 48,
"delayedMessageIndexSizeInBytes" : 0,
"subscriptionProperties" : { },
"filterProcessedMsgCount" : 0,
"filterAcceptedMsgCount" : 0,
"filterRejectedMsgCount" : 0,
"filterRescheduledMsgCount" : 0,
"durable" : true,
"replicated" : false
}
},
"replication" : { },
"nonContiguousDeletedMessagesRanges" : 4,
"nonContiguousDeletedMessagesRangesSerializedSize" : 48,
"delayedMessageIndexSizeInBytes" : 0,
"compaction" : {
"lastCompactionRemovedEventCount" : 0,
"lastCompactionSucceedTimestamp" : 0,
"lastCompactionFailedTimestamp" : 0,
"lastCompactionDurationTimeInMills" : 0
},
"metadata" : {
"partitions" : 3,
"deleted" : false
},
"partitions" : { }
}
internal stats:
{
"metadata" : {
"partitions" : 3,
"deleted" : false
},
"partitions" : {
"persistent://websight/dxp/monolog-partition-0" : {
"entriesAddedCounter" : 366843,
"numberOfEntries" : 366843,
"totalSize" : 6284981267,
"currentLedgerEntries" : 33336,
"currentLedgerSize" : 1085346225,
"lastLedgerCreatedTimestamp" : "2023-08-30T06:28:50.352Z",
"waitingCursorsCount" : 0,
"pendingAddEntriesCount" : 0,
"lastConfirmedEntry" : "47:33335",
"state" : "LedgerOpened",
"ledgers" : [ {
"ledgerId" : 12,
"entries" : 50000,
"size" : 780397885,
"offloaded" : false,
"underReplicated" : false
}, {
"ledgerId" : 33,
"entries" : 283507,
"size" : 4419237157,
"offloaded" : false,
"underReplicated" : false
}, {
"ledgerId" : 47,
"entries" : 0,
"size" : 0,
"offloaded" : false,
"underReplicated" : false
} ],
"cursors" : {
"publications-router" : {
"markDeletePosition" : "12:18559",
"readPosition" : "12:18612",
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 18596,
"cursorLedger" : 17,
"cursorLedgerLastEntry" : 68,
"individuallyDeletedMessages" : "[(12:18563..12:18567],(12:18571..12:18575],(12:18579..12:18607]]",
"lastLedgerSwitchTimestamp" : "2023-08-30T04:57:07.179Z",
"state" : "Open",
"active" : false,
"numberOfEntriesSinceFirstNotAckedMessage" : 53,
"totalNonContiguousDeletedMessagesRange" : 3,
"subscriptionHavePendingRead" : false,
"subscriptionHavePendingReplayRead" : false,
"properties" : { }
}
},
"schemaLedgers" : [ {
"ledgerId" : 15,
"entries" : 1,
"size" : 1553,
"offloaded" : false,
"underReplicated" : false
} ],
"compactedLedger" : {
"ledgerId" : -1,
"entries" : -1,
"size" : -1,
"offloaded" : false,
"underReplicated" : false
}
},
"persistent://websight/dxp/monolog-partition-1" : {
"entriesAddedCounter" : 0,
"numberOfEntries" : 366998,
"totalSize" : 6287713500,
"currentLedgerEntries" : 0,
"currentLedgerSize" : 0,
"lastLedgerCreatedTimestamp" : "2023-08-30T09:48:57.917Z",
"waitingCursorsCount" : 0,
"pendingAddEntriesCount" : 0,
"lastConfirmedEntry" : "48:33339",
"state" : "LedgerOpened",
"ledgers" : [ {
"ledgerId" : 1,
"entries" : 50000,
"size" : 780704389,
"offloaded" : false,
"underReplicated" : false
}, {
"ledgerId" : 34,
"entries" : 283658,
"size" : 4421532623,
"offloaded" : false,
"underReplicated" : false
}, {
"ledgerId" : 48,
"entries" : 33340,
"size" : 1085476488,
"offloaded" : false,
"underReplicated" : false
}, {
"ledgerId" : 84,
"entries" : 0,
"size" : 0,
"offloaded" : false,
"underReplicated" : false
} ],
"cursors" : {
"publications-router" : {
"markDeletePosition" : "1:23747",
"readPosition" : "1:23811",
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : -343203,
"cursorLedger" : 25,
"cursorLedgerLastEntry" : 70,
"individuallyDeletedMessages" : "[(1:23755..1:23802]]",
"lastLedgerSwitchTimestamp" : "2023-08-30T09:48:57.93Z",
"state" : "NoLedger",
"active" : false,
"numberOfEntriesSinceFirstNotAckedMessage" : 64,
"totalNonContiguousDeletedMessagesRange" : 1,
"subscriptionHavePendingRead" : false,
"subscriptionHavePendingReplayRead" : false,
"properties" : { }
}
},
"schemaLedgers" : [ {
"ledgerId" : 15,
"entries" : 1,
"size" : 1553,
"offloaded" : false,
"underReplicated" : false
} ],
"compactedLedger" : {
"ledgerId" : -1,
"entries" : -1,
"size" : -1,
"offloaded" : false,
"underReplicated" : false
}
},
"persistent://websight/dxp/monolog-partition-2" : {
"entriesAddedCounter" : 366172,
"numberOfEntries" : 366172,
"totalSize" : 6274316330,
"currentLedgerEntries" : 33321,
"currentLedgerSize" : 1084857885,
"lastLedgerCreatedTimestamp" : "2023-08-30T06:28:50.586Z",
"waitingCursorsCount" : 0,
"pendingAddEntriesCount" : 0,
"lastConfirmedEntry" : "49:33320",
"state" : "LedgerOpened",
"ledgers" : [ {
"ledgerId" : 13,
"entries" : 50000,
"size" : 780439463,
"offloaded" : false,
"underReplicated" : false
}, {
"ledgerId" : 32,
"entries" : 282851,
"size" : 4409018982,
"offloaded" : false,
"underReplicated" : false
}, {
"ledgerId" : 49,
"entries" : 0,
"size" : 0,
"offloaded" : false,
"underReplicated" : false
} ],
"cursors" : {
"publications-router" : {
"markDeletePosition" : "13:11135",
"readPosition" : "13:11152",
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 11136,
"cursorLedger" : 22,
"cursorLedgerLastEntry" : 60,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2023-08-30T04:57:06.846Z",
"state" : "Open",
"active" : false,
"numberOfEntriesSinceFirstNotAckedMessage" : 17,
"totalNonContiguousDeletedMessagesRange" : 0,
"subscriptionHavePendingRead" : false,
"subscriptionHavePendingReplayRead" : false,
"properties" : { }
}
},
"schemaLedgers" : [ {
"ledgerId" : 15,
"entries" : 1,
"size" : 1553,
"offloaded" : false,
"underReplicated" : false
} ],
"compactedLedger" : {
"ledgerId" : -1,
"entries" : -1,
"size" : -1,
"offloaded" : false,
"underReplicated" : false
}
}
}
}
I've removed:
.ackTimeout(ackTimeout, TimeUnit.SECONDS)
from my code, the consumer is able to read the messages now. Looks like using ackTimeout may cause the client to hang. It is more likely to happen when the receiver queue is relatively small.
[EDITED] Restart of the container (consumer) helped. Probably will happen again, as I faced it yestarday.
I've removed:ackTimeout(ackTimeout, TimeUnit.SECONDS) The consumer still can't receive the messages,must restart the broker
Could you try with pulsar client 2.11.2 instead of 3.x.x? I'm not sure if my issue is exactly same as yours, but it goes away with client 2.11.2 (with server using 3.0.1).
Hi.
I will write here my case because it might be related: I had previously server & client version 2.8.3, and everything worked fine. After upgrading the server and Java client to version 3.0.1 or 3.1.0, problems appeared.
My server has almost OOTB settings, with the following properties changed:
managedLedgerDefaultEnsembleSize: "3"
managedLedgerDefaultWriteQuorum: "3"
managedLedgerDefaultAckQuorum: "2"
brokerDeduplicationEnabled: "true"
# bookkeeperClientTimeoutInSeconds changed from default 30
# It allows to catch bookkeeper problems earlier and in case of problematic bookies,
# be able to retry sendMessage within standard 30 seconds. After the change, when bookie does not ack message in 5 seconds
# A new ensemble is created, and sendMessage is retried in about 15 seconds.
bookkeeperClientTimeoutInSeconds: "5"
# bookkeeperClientHealthCheckErrorThresholdPerInterval changed from default 5
# It allows to react to bookie timeouts faster.
# By default, the health check interval is 60 seconds, and we are not changing that.
# bookkeeperClientHealthCheckErrorThresholdPerInterval=3 means that if there will be >= 3 timeouts within 60 seconds,
# bookkeeper will be quarantined, and the ensemble will be recreated on different bookkeepers.
bookkeeperClientHealthCheckErrorThresholdPerInterval: "3"
# bookkeeperClientHealthCheckQuarantineTimeInSeconds changed from default 1800 seconds.
# Bookkeeper is quarantined when broker detects addEntry timeouts.
# We are lowering this value because we lowered bookkeeperClientTimeoutInSeconds, and in case of transient issues
# we don't want to have all bookies quarantined in a short time.
bookkeeperClientHealthCheckQuarantineTimeInSeconds: "600"
# Needed to set custom policies per topic (https://jira.tomtomgroup.com/browse/NAV-103543)
systemTopicEnabled: "true"
topicLevelPoliciesEnabled: "true"
Our tenant and namespaces are created with:
bin/pulsar-admin --admin-url "${ADMIN_URL}" tenants create "${TENANT}" --allowed-clusters pulsar
bin/pulsar-admin --admin-url "${ADMIN_URL}" namespaces create "${TENANT}/batch"
bin/pulsar-admin --admin-url "${ADMIN_URL}" namespaces set-max-unacked-messages-per-consumer -c 10 "${TENANT}/batch"
bin/pulsar-admin --admin-url "${ADMIN_URL}" namespaces set-max-unacked-messages-per-subscription -c 20 "${TENANT}/batch"
We are using MultiTopicsConsumer to fetch messages from all queues in the batch namespace. Below is how the client and consumer are configured:
**PULSAR CLIENT:**
PulsarClient.builder()
.ioThreads(1)
.listenerThreads(1)
.enableTlsHostnameVerification(false)
.serviceUrl(<URL>)
.keepAliveInterval(10_000, TimeUnit.MILLISECONDS)
.connectionTimeout(10_111, TimeUnit.MILLISECONDS)
.operationTimeout(30_000, TimeUnit.MILLISECONDS)
.startingBackoffInterval(100, TimeUnit.MILLISECONDS)
.maxBackoffInterval(10_000, TimeUnit.MILLISECONDS)
.build();
**CONSUMER:**
return pulsarClient
.newConsumer(<SCHEMA>)
.subscriptionName(<SUBSCRIPTION_NAME_SAME_FOR_ALL_CONSUMER_INSTANCES>)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Shared)
.topicsPattern(<PATTERN_CATCHING_ALL_BATCH_NAMESPACE_QUEUES>)
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
.patternAutoDiscoveryPeriod(60, TimeUnit.SECONDS)
.receiverQueueSize(1);
Messages are acked always after 100 - 1000ms
Our publisher tries to have a constant number of messages in the queue (about 100) and adds more after the previous message is processed.
With pulsar server & client in version 3.0.1 or 3.1.0, we see the following values in Prometheus (and issue shows on our multiple clusters using same server & client versions): pulsar_subscription_back_log (pulsar_subscription_back_log > 0)
pulsar_subscription_blocked_on_unacked_messages (pulsar_subscription_blocked_on_unacked_messages > 0)
It seems that queues are blocked because of unacked messages. We are always acking messages.
After changing the java client to 2.11.2 version, the issue is not reproducible.
Just did some diff, this may be related: https://github.com/apache/pulsar/pull/18478/files
ackTimeOut=30mins,Could this be related?
I've created #21104 as it looks like original issue is broker related.
hi, do we alway receive message without ack or just do nothing with consumer? i did consume using master broker and client , can receive message even when no ack , need more context on this.
It looks like there are two different issues. I do ack all the messages. I will work on creating isolated, reproducible setup as a part of issue Ive created (client side).
The issue had no activity for 30 days, mark with Stale label.
Just started testing with Pulsar and ran into something that sounds like this.
In my case, using inactive_topic_policies.inactiveTopicDeleteMode: delete_when_subscriptions_caught_up
with partitioned topics, if my topic stops receiving new messages and my subscriptions hit 0 backlog, the topic partitions (foo-partition-0
, foo-partition-1
, etc) get deleted after the inactive time, along with the subscriptions to them. But the topic foo
remains.
When new messages are produced again, the topic partitions (foo-partition-0
, foo-parttiion-1
) are auto-created. The multi-topic consumer does not discover the newly recreated partitions.
Fix was to add autoUpdatePartitions
to my consumer config.
@kadlmt
I've removed:ackTimeout(ackTimeout, TimeUnit.SECONDS) The consumer still can't receive the messages,must restart the broker
Could it be solved by restarting the client service? And the version 2.9.x
is too old to support now. The oldest version to support is 2.10.x
now.
ackTimeOut=30mins,Could this be related?
As expected, it will be redelivered to the client if the consumer is stuck by the client. Could you unload the heap dump of the client service that is the owner of the stuck consumer?
@michalcukierman
I will try this case: https://github.com/apache/pulsar/issues/21082#issuecomment-1698836870, if I could not reproduce this case, I will ask you for the heap dump of both broker and client. Note: new update: I can not reproduce this issue.
@MichalKoziorowski-TomTom : Could you try with pulsar client 2.11.2 instead of 3.x.x? I'm not sure if my issue is exactly same as yours, but it goes away with client 2.11.2 (with server using 3.0.1).
BTW, have you tried this case?
"individuallyDeletedMessages" : "[(12:18563..12:18567],(12:18571..12:18575],(12:18579..12:18607]]",
The stats-internal above means there are at least
8
messages that have been received but have not been acknowledged yet. I think there is something wrong with the client service. Could you reproduce this issue(do not use the libraryquarkus
, Because we need to filter out other influences), and provide this information below:
- the heap dump of the client service (important)
pulsar-admin topics stats persistent:{tenant}/{namespace}/{topic}-partition-0
(do not callpartitioned-stats
), becausetopics stats
will respond with more details (such aslastAckedTime
, this attribute is always zero of the APIpartitioned-stats
)pulsar-admin topics stats persistent:{tenant}/{namespace}/{topic}-partition-1
(do not callpartitioned-stats
)pulsar-admin topics stats persistent:{tenant}/{namespace}/{topic}-partition-2
(do not callpartitioned-stats
)pulsar-admin topics stats-internal persistent:{tenant}/{namespace}/{topic}-partition-0
(do not callpartitioned-stats-internal
)pulsar-admin topics stats-internal persistent:{tenant}/{namespace}/{topic}-partition-1
(do not callpartitioned-stats-internal
)pulsar-admin topics stats-internal persistent:{tenant}/{namespace}/{topic}-partition-2
(do not callpartitioned-stats-internal
)- the heap dump of the brokers
@MichalKoziorowski-TomTom
Thanks for explaining the details of the issue, I will try to reproduce it.
Note-update: I can not reproduce this issue
It might be fixed with https://github.com/apache/pulsar/issues/22352. Need to check...
@poorbarcode I've checked with 3.0.4 client containing https://github.com/apache/pulsar/issues/22352 fix and I can't reproduce it now. I don't know if this PR fixed it or some other, but I'm almost sure it would appear again with my testing, because ot was very easily reproducible with 3.0.1 version. About original issue question, I don't know. I had no problems with 2.9.2 client, only with >3.0 ones, so maybe that fixed only mine or https://github.com/apache/pulsar/issues/21104 issue.
Unfortunately it's not fixed or some other bug appeared. It seems we're going to be stuck with 2.11.x forever :(
I am facing this issue randomly as well.
Symptoms: delivery rate to one of the partitions randomly and occasionally drops to 0 and backlog builds. Server version: 3.0.3 Java client SDK version: 3.0.4 (chose this version due to the batch ack bug fix)
What we do:
I am able to reproduce this by publishing a huge number of messages into a partitioned topic using the pulsar-perf tool and using java client consumer.
Today, we are handling this by unloading the topic.
The following are the stats:
{
"msgRateIn" : 162.02194280364785,
"msgThroughputIn" : 96448.48058300688,
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesInCounter" : 64639785639,
"msgInCounter" : 110172884,
"bytesOutCounter" : 64282820474,
"msgOutCounter" : 109563883,
"averageMsgSize" : 595.2803608823001,
"msgChunkPublished" : false,
"storageSize" : 523822779,
"backlogSize" : 506085747,
"publishRateLimitedTimes" : 0,
"earliestMsgPublishTimeInBacklogs" : 0,
"offloadedStorageSize" : 0,
"lastOffloadLedgerId" : 0,
"lastOffloadSuccessTimeStamp" : 0,
"lastOffloadFailureTimeStamp" : 0,
"ongoingTxnCount" : 0,
"abortedTxnCount" : 0,
"committedTxnCount" : 0,
"publishers" : [
{
"accessMode" : "Shared",
"msgRateIn" : 0.7333462313290705,
"msgThroughputIn" : 436.9243512338996,
"averageMsgSize" : 595.7954545454545,
"chunkedMessageRate" : 0.0,
"producerId" : 1,
"supportsPartialProducer" : false,
"metadata" : { },
"address" : "/10.0.102.132:34006",
"producerName" : "pulsarv3-60-52435",
"connectedSince" : "2024-06-08T02:46:07.584459326Z",
"clientVersion" : "Pulsar-Java-v3.0.4"
}, {
"accessMode" : "Shared",
"msgRateIn" : 0.9494403702612061,
"msgThroughputIn" : 595.2991121537763,
"averageMsgSize" : 627.0,
"chunkedMessageRate" : 0.0,
"producerId" : 1,
"supportsPartialProducer" : false,
"metadata" : { },
"address" : "/10.0.108.53:46610",
"producerName" : "pulsarv3-60-52441",
"connectedSince" : "2024-06-08T03:12:03.764684205Z",
"clientVersion" : "Pulsar-Java-v3.0.4"
}, {
"accessMode" : "Shared",
"msgRateIn" : 0.6666783947174283,
"msgThroughputIn" : 433.3242896064604,
"averageMsgSize" : 649.975,
"chunkedMessageRate" : 0.0,
"producerId" : 1,
"supportsPartialProducer" : false,
"metadata" : { },
"address" : "/10.0.100.13:42184",
"producerName" : "pulsarv3-60-52443",
"connectedSince" : "2024-06-08T03:10:14.740170844Z",
"clientVersion" : "Pulsar-Java-v3.0.4"
}, {
"accessMode" : "Shared",
"msgRateIn" : 0.6833453002698462,
"msgThroughputIn" : 403.7904046253064,
"averageMsgSize" : 590.9024390243902,
"chunkedMessageRate" : 0.0,
"producerId" : 1,
"supportsPartialProducer" : false,
"metadata" : { },
"address" : "/10.0.100.13:51930",
"producerName" : "pulsarv3-60-52459",
"connectedSince" : "2024-06-08T03:01:03.355132469Z",
"clientVersion" : "Pulsar-Java-v3.0.4"
}
// more entries removed for brevity
],
"waitingPublishers" : 0,
"subscriptions" : {
"subscription-name-for-app" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 64282820474,
"msgOutCounter" : 109563883,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0,
"msgBacklog" : 860487,
"backlogSize" : 506085747,
"earliestMsgPublishTimeInBacklog" : 0,
"msgBacklogNoDelayed" : 860487,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 0,
"type" : "Shared",
"msgRateExpired" : 0.0,
"totalMsgExpired" : 0,
"lastExpireTimestamp" : 1717816240629,
"lastConsumedFlowTimestamp" : 1717811557813,
"lastConsumedTimestamp" : 1717810973704,
"lastAckedTimestamp" : 1717811020957,
"lastMarkDeleteAdvancedTimestamp" : 1717811020957,
"consumers" : [
{
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 103841680,
"msgOutCounter" : 177401,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0.0,
"consumerName" : "0fbbf",
"availablePermits" : 99,
"unackedMessages" : 0,
"avgMessagesPerEntry" : 1,
"blockedConsumerOnUnackedMsgs" : false,
"lastAckedTimestamp" : 1717811003860,
"lastConsumedTimestamp" : 1717810973534,
"lastConsumedFlowTimestamp" : 1717810999528,
"metadata" : { },
"address" : "/10.0.108.53:38444",
"connectedSince" : "2024-06-07T02:53:13.281036715Z",
"clientVersion" : "Pulsar-Java-v3.0.4",
"lastAckedTime" : "2024-06-08T01:43:23.86Z",
"lastConsumedTime" : "2024-06-08T01:42:53.534Z"
}, {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 171230263,
"msgOutCounter" : 292480,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0.0,
"consumerName" : "99b5a",
"availablePermits" : 70,
"unackedMessages" : 0,
"avgMessagesPerEntry" : 1,
"blockedConsumerOnUnackedMsgs" : false,
"lastAckedTimestamp" : 1717810987123,
"lastConsumedTimestamp" : 1717810973544,
"lastConsumedFlowTimestamp" : 1717810980425,
"metadata" : { },
"address" : "/10.0.100.77:52448",
"connectedSince" : "2024-06-07T02:53:29.689095678Z",
"clientVersion" : "Pulsar-Java-v3.0.4",
"lastAckedTime" : "2024-06-08T01:43:07.123Z",
"lastConsumedTime" : "2024-06-08T01:42:53.544Z"
}, {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 80800510,
"msgOutCounter" : 139363,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0.0,
"consumerName" : "86dce",
"availablePermits" : 87,
"unackedMessages" : 0,
"avgMessagesPerEntry" : 1,
"blockedConsumerOnUnackedMsgs" : false,
"lastAckedTimestamp" : 1717811011668,
"lastConsumedTimestamp" : 1717810973474,
"lastConsumedFlowTimestamp" : 1717810984002,
"metadata" : { },
"address" : "/10.0.102.132:39542",
"connectedSince" : "2024-06-07T06:36:41.643941671Z",
"clientVersion" : "Pulsar-Java-v3.0.4",
"lastAckedTime" : "2024-06-08T01:43:31.668Z",
"lastConsumedTime" : "2024-06-08T01:42:53.474Z"
}, {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 14310204,
"msgOutCounter" : 24448,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0.0,
"consumerName" : "10fe7",
"availablePermits" : 52,
"unackedMessages" : 0,
"avgMessagesPerEntry" : 1,
"blockedConsumerOnUnackedMsgs" : false,
"lastAckedTimestamp" : 1717810973647,
"lastConsumedTimestamp" : 1717810973524,
"lastConsumedFlowTimestamp" : 1717810966053,
"metadata" : { },
"address" : "/10.0.100.13:51702",
"connectedSince" : "2024-06-08T00:05:38.937196415Z",
"clientVersion" : "Pulsar-Java-v3.0.4",
"lastAckedTime" : "2024-06-08T01:42:53.647Z",
"lastConsumedTime" : "2024-06-08T01:42:53.524Z"
}, {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0.0,
"consumerName" : "f6f84",
"availablePermits" : 100,
"unackedMessages" : 0,
"avgMessagesPerEntry" : 0,
"blockedConsumerOnUnackedMsgs" : false,
"lastAckedTimestamp" : 0,
"lastConsumedTimestamp" : 0,
"lastConsumedFlowTimestamp" : 1717811556782,
"metadata" : { },
"address" : "/10.0.100.77:55216",
"connectedSince" : "2024-06-08T01:52:36.774854563Z",
"clientVersion" : "Pulsar-Java-v3.0.4",
"lastAckedTime" : "1970-01-01T00:00:00Z",
"lastConsumedTime" : "1970-01-01T00:00:00Z"
}, {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0.0,
"consumerName" : "0ff78",
"availablePermits" : 100,
"unackedMessages" : 0,
"avgMessagesPerEntry" : 0,
"blockedConsumerOnUnackedMsgs" : false,
"lastAckedTimestamp" : 0,
"lastConsumedTimestamp" : 0,
"lastConsumedFlowTimestamp" : 1717811557813,
"metadata" : { },
"address" : "/10.0.108.53:35150",
"connectedSince" : "2024-06-08T01:52:37.79321547Z",
"clientVersion" : "Pulsar-Java-v3.0.4",
"lastAckedTime" : "1970-01-01T00:00:00Z",
"lastConsumedTime" : "1970-01-01T00:00:00Z"
}
// more entries removed for brevity
],
"isDurable" : true,
"isReplicated" : false,
"allowOutOfOrderDelivery" : false,
"consumersAfterMarkDeletePosition" : { },
"nonContiguousDeletedMessagesRanges" : 0,
"nonContiguousDeletedMessagesRangesSerializedSize" : 8680,
"delayedMessageIndexSizeInBytes" : 0,
"subscriptionProperties" : { },
"filterProcessedMsgCount" : 0,
"filterAcceptedMsgCount" : 0,
"filterRejectedMsgCount" : 0,
"filterRescheduledMsgCount" : 0,
"durable" : true,
"replicated" : false
}
},
"replication" : { },
"deduplicationStatus" : "Disabled",
"nonContiguousDeletedMessagesRanges" : 0,
"nonContiguousDeletedMessagesRangesSerializedSize" : 8680,
"delayedMessageIndexSizeInBytes" : 0,
"compaction" : {
"lastCompactionRemovedEventCount" : 0,
"lastCompactionSucceedTimestamp" : 0,
"lastCompactionFailedTimestamp" : 0,
"lastCompactionDurationTimeInMills" : 0
},
"ownerBroker" : "pulsarv3-broker-1.pulsarv3-broker.pulsarv3.svc.cluster.local:8080"
}
internal stats:
{
"entriesAddedCounter" : 110181604,
"numberOfEntries" : 899690,
"totalSize" : 528916523,
"currentLedgerEntries" : 45619,
"currentLedgerSize" : 26944969,
"lastLedgerCreatedTimestamp" : "2024-06-08T03:09:43.351Z",
"waitingCursorsCount" : 0,
"pendingAddEntriesCount" : 8,
"lastConfirmedEntry" : "132720:45610",
"state" : "LedgerOpened",
"ledgers" : [ {
"ledgerId" : 132441,
"entries" : 94393,
"size" : 54997572,
"offloaded" : false,
"underReplicated" : false
}, {
"ledgerId" : 132467,
"entries" : 95872,
"size" : 56340982,
"offloaded" : false,
"underReplicated" : false
}, {
"ledgerId" : 132486,
"entries" : 98349,
"size" : 57980221,
"offloaded" : false,
"underReplicated" : false
}, {
"ledgerId" : 132524,
"entries" : 95218,
"size" : 55773569,
"offloaded" : false,
"underReplicated" : false
}, {
"ledgerId" : 132590,
"entries" : 93881,
"size" : 55305919,
"offloaded" : false,
"underReplicated" : false
}, {
"ledgerId" : 132627,
"entries" : 93708,
"size" : 55288974,
"offloaded" : false,
"underReplicated" : false
}, {
"ledgerId" : 132653,
"entries" : 93651,
"size" : 55466363,
"offloaded" : false,
"underReplicated" : false
}, {
"ledgerId" : 132676,
"entries" : 94671,
"size" : 55726661,
"offloaded" : false,
"underReplicated" : false
}, {
"ledgerId" : 132696,
"entries" : 94336,
"size" : 55095894,
"offloaded" : false,
"underReplicated" : false
}, {
"ledgerId" : 132720,
"entries" : 0,
"size" : 0,
"offloaded" : false,
"underReplicated" : false
} ],
"cursors" : {
"subscription-name-for-app" : {
"markDeletePosition" : "132441:30475",
"readPosition" : "132441:30476",
"waitingReadOp" : false,
"pendingReadOps" : 1,
"messagesConsumedCounter" : 109312390,
"cursorLedger" : 131937,
"cursorLedgerLastEntry" : 12833,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2024-06-07T22:09:00.234Z",
"state" : "Open",
"active" : false,
"numberOfEntriesSinceFirstNotAckedMessage" : 1,
"totalNonContiguousDeletedMessagesRange" : 0,
"subscriptionHavePendingRead" : true,
"subscriptionHavePendingReplayRead" : false,
"properties" : { }
}
},
"schemaLedgers" : [ {
"ledgerId" : 118705,
"entries" : 1,
"size" : 25669,
"offloaded" : false,
"underReplicated" : false
}, {
"ledgerId" : 184,
"entries" : 1,
"size" : 24904,
"offloaded" : false,
"underReplicated" : false
} ],
"compactedLedger" : {
"ledgerId" : -1,
"entries" : -1,
"size" : -1,
"offloaded" : false,
"underReplicated" : false
}
}
Dashboard:
@danielnesaraj Is this means the consumer count?
Yes. At that time there were around a 100 consumers (blue line) for that topic (actually, 1 out 10 partitions of a partitioned topic) with 1 subscription (yellow line).
@danielnesaraj it looks like the topic start to backlog after consumer count increased?
I think the reverse is true; we scale the consumers based on the backlog size. So as the backlog started increasing, there would have been more consumers created in an attempt to consume more and control the backlog.
Search before asking
Version
2.9.2
Minimal reproduce step
Consumer<byte[]> consumer = client.newConsumer() .topic("prod/collect/78_detail_common_urgent") .subscriptionName("my-subscription") .subscriptionType(SubscriptionType.Shared) .ackTimeout(5, TimeUnit.SECONDS) .enableRetry(false) .negativeAckRedeliveryDelay(1, TimeUnit.MILLISECONDS) .receiverQueueSize(10) .subscribe(); while (true) { try { Message<byte[]> receive = consumer.receive(); byte[] data = receive.getData(); MessageId messageId = receive.getMessageId(); System.out.println(" data: " + new String(data)); System.out.println(" messageId: " + messageId); consumer.acknowledge(messageId); } finally {
stats: { "msgRateIn":0, "msgThroughputIn":0, "msgRateOut":0, "msgThroughputOut":0, "bytesInCounter":673463031, "msgInCounter":970021, "bytesOutCounter":0, "msgOutCounter":0, "averageMsgSize":0, "msgChunkPublished":false, "storageSize":8771320, "backlogSize":5662456, "offloadedStorageSize":0, "lastOffloadLedgerId":0, "lastOffloadSuccessTimeStamp":0, "lastOffloadFailureTimeStamp":0, "publishers":Array[8], "waitingPublishers":0, "subscriptions":{ "my-subscription":{ "msgRateOut":0, "msgThroughputOut":0, "bytesOutCounter":0, "msgOutCounter":0, "msgRateRedeliver":0, "chunkedMessageRate":0, "msgBacklog":8029, "backlogSize":0, "msgBacklogNoDelayed":8029, "blockedSubscriptionOnUnackedMsgs":false, "msgDelayed":0, "unackedMessages":0, "type":"Shared", "msgRateExpired":0, "totalMsgExpired":0, "lastExpireTimestamp":1693277232780, "lastConsumedFlowTimestamp":1693275780540, "lastConsumedTimestamp":0, "lastAckedTimestamp":0, "lastMarkDeleteAdvancedTimestamp":0, "consumers":[ { "msgRateOut":0, "msgThroughputOut":0, "bytesOutCounter":0, "msgOutCounter":0, "msgRateRedeliver":0, "chunkedMessageRate":0, "consumerName":"333f1e44bf", "availablePermits":4, "unackedMessages":0, "avgMessagesPerEntry":1000, "blockedConsumerOnUnackedMsgs":false, "lastAckedTimestamp":0, "lastConsumedTimestamp":0, "metadata":{
}
What did you expect to see?
data: {"creatorName":"管理员","goodsPlatformName":"拼多多","requestId":"c96a17e1-9ea8-4dc0-82f6-06707e6053da","sortTypeName":"综合排序","extra":"0","id":30584515,"goodsPlatformId":78,"collectTime":1692758916000,"brandName":"测试11","creator":1,"updateTime":1692758918000,"stopType":1,"priority":"1","isMatch":"1","parentId":30584509,"keyWord":"http://mobile.pinduoduo.com/goods.html?goods_id=263506367446","errorMsg":"failed","collectType":"3","sortType":"0","createTime":1692758916000,"updatorName":"管理员","discountFlag":"0","brandId":11130,"updator":1,"strategy":"1","status":"5"} messageId: 3663279:0:0:0
What did you see instead?
empty
Anything else?
No response
Are you willing to submit a PR?