apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.28k stars 3.59k forks source link

[Bug] Infinite message loop after restart #21224

Open marekczajkowski opened 1 year ago

marekczajkowski commented 1 year ago

Search before asking

Version

Client - 3.1.0 Pulsar - 3.1.0 (and later builds)

Minimal reproduce step

Publishing 1 000 000 messages. Each message 30Kb. Servers restarts in the meantime.

What did you expect to see?

All messages read and backlog is empty

What did you see instead?

On one of the partitions the backlog has messages. Consumer keeps reading (indefinitely) the messages and sends ack successfully. Backlog remains the same

Anything else?

After restart there is a rewind

Zrzut ekranu 2023-09-21 o 14 05 53

Partition stats show readPosition on different ledger

kubectl exec --namespace pulsar pulsar-toolset-0 -- bin/pulsar-admin topics stats persistent://websight/dxp/search_experiences-partition-3 { “msgRateIn” : 0.0, “msgThroughputIn” : 0.0, “msgRateOut” : 1009.3990130769382, “msgThroughputOut” : 1.5503941574612858E7, “bytesInCounter” : 0, “msgInCounter” : 0, “bytesOutCounter” : 4276153214, “msgOutCounter” : 278397, “averageMsgSize” : 0.0, “msgChunkPublished” : false, “storageSize” : 4842312348, “backlogSize” : 4842296220, “publishRateLimitedTimes” : 0, “earliestMsgPublishTimeInBacklogs” : 0, “offloadedStorageSize” : 0, “lastOffloadLedgerId” : 0, “lastOffloadSuccessTimeStamp” : 0, “lastOffloadFailureTimeStamp” : 0, “ongoingTxnCount” : 0, “abortedTxnCount” : 0, “committedTxnCount” : 0, “publishers” : [ { “accessMode” : “Shared”, “msgRateIn” : 0.0, “msgThroughputIn” : 0.0, “averageMsgSize” : 0.0, “chunkedMessageRate” : 0.0, “producerId” : 3, “supportsPartialProducer” : false, “producerName” : “pulsar-0-106", “address” : “/10.0.4.4:48576", “connectedSince” : “2023-09-21T11:28:28.642147948Z”, “clientVersion” : “2.10.4", “metadata” : { } } ], “waitingPublishers” : 0, “subscriptions” : { “__compaction” : { “msgRateOut” : 0.0, “msgThroughputOut” : 0.0, “bytesOutCounter” : 0, “msgOutCounter” : 0, “msgRateRedeliver” : 0.0, “messageAckRate” : 0.0, “chunkedMessageRate” : 0, “msgBacklog” : 184244, “backlogSize” : 4842296220, “earliestMsgPublishTimeInBacklog” : 0, “msgBacklogNoDelayed” : 184244, “blockedSubscriptionOnUnackedMsgs” : false, “msgDelayed” : 0, “unackedMessages” : 0, “type” : “None”, “msgRateExpired” : 0.0, “totalMsgExpired” : 0, “lastExpireTimestamp” : 0, “lastConsumedFlowTimestamp” : 0, “lastConsumedTimestamp” : 0, “lastAckedTimestamp” : 0, “lastMarkDeleteAdvancedTimestamp” : 0, “consumers” : [ ], “isDurable” : true, “isReplicated” : false, “allowOutOfOrderDelivery” : false, “consumersAfterMarkDeletePosition” : { }, “nonContiguousDeletedMessagesRanges” : 0, “nonContiguousDeletedMessagesRangesSerializedSize” : 0, “delayedMessageIndexSizeInBytes” : 0, “subscriptionProperties” : { }, “filterProcessedMsgCount” : 0, “filterAcceptedMsgCount” : 0, “filterRejectedMsgCount” : 0, “filterRescheduledMsgCount” : 0, “durable” : true, “replicated” : false }, “websight-dxp-experiences-search-index-547cdf9c5d-dxv7v” : { “msgRateOut” : 1009.3990130769382, “msgThroughputOut” : 1.5503941574612858E7, “bytesOutCounter” : 4276153214, “msgOutCounter” : 278397, “msgRateRedeliver” : 0.0, “messageAckRate” : 168.03316794388724, “chunkedMessageRate” : 0, “msgBacklog” : 17337, “backlogSize” : 355410316, “earliestMsgPublishTimeInBacklog” : 0, “msgBacklogNoDelayed” : 17337, “blockedSubscriptionOnUnackedMsgs” : false, “msgDelayed” : 0, “unackedMessages” : 0, “type” : “Exclusive”, “activeConsumerName” : “b3250”, “msgRateExpired” : 0.0, “totalMsgExpired” : 0, “lastExpireTimestamp” : 0, “lastConsumedFlowTimestamp” : 1695295988291, “lastConsumedTimestamp” : 1695295988263, “lastAckedTimestamp” : 1695295988270, “lastMarkDeleteAdvancedTimestamp” : 0, “consumers” : [ { “msgRateOut” : 1009.3990130769382, “msgThroughputOut” : 1.5503941574612858E7, “bytesOutCounter” : 4276153214, “msgOutCounter” : 278397, “msgRateRedeliver” : 0.0, “messageAckRate” : 168.03316794388724, “chunkedMessageRate” : 0.0, “consumerName” : “b3250", “availablePermits” : 21, “unackedMessages” : 0, “avgMessagesPerEntry” : 1, “blockedConsumerOnUnackedMsgs” : false, “address” : “/10.0.6.4:43380", “connectedSince” : “2023-09-21T11:28:28.616981177Z”, “clientVersion” : “Pulsar-Java-v3.1.0", “lastAckedTimestamp” : 1695295988270, “lastConsumedTimestamp” : 1695295988263, “lastConsumedFlowTimestamp” : 1695295988291, “metadata” : { }, “lastAckedTime” : “2023-09-21T11:33:08.27Z”, “lastConsumedTime” : “2023-09-21T11:33:08.263Z” } ], “isDurable” : true, “isReplicated” : false, “allowOutOfOrderDelivery” : false, “consumersAfterMarkDeletePosition” : { }, “nonContiguousDeletedMessagesRanges” : 0, “nonContiguousDeletedMessagesRangesSerializedSize” : 0, “delayedMessageIndexSizeInBytes” : 0, “subscriptionProperties” : { }, “filterProcessedMsgCount” : 0, “filterAcceptedMsgCount” : 0, “filterRejectedMsgCount” : 0, “filterRescheduledMsgCount” : 0, “durable” : true, “replicated” : false } }, “replication” : { }, “deduplicationStatus” : “Disabled”, “nonContiguousDeletedMessagesRanges” : 0, “nonContiguousDeletedMessagesRangesSerializedSize” : 0, “delayedMessageIndexSizeInBytes” : 0, “compaction” : { “lastCompactionRemovedEventCount” : 0, “lastCompactionSucceedTimestamp” : 0, “lastCompactionFailedTimestamp” : 0, “lastCompactionDurationTimeInMills” : 0 }, “ownerBroker” : “pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:8080” } ➜ ~ kubectl exec --namespace pulsar pulsar-toolset-0 -- bin/pulsar-admin topics stats-internal persistent://websight/dxp/search_experiences-partition-3 { “entriesAddedCounter” : 0, “numberOfEntries” : 184245, “totalSize” : 4842312348, “currentLedgerEntries” : 0, “currentLedgerSize” : 0, “lastLedgerCreatedTimestamp” : “2023-09-21T11:28:28.26Z”, “waitingCursorsCount” : 0, “pendingAddEntriesCount” : 0, “lastConfirmedEntry” : “141:16664", “state” : “LedgerOpened”, “ledgers” : [ { “ledgerId” : 35, “entries” : 50000, “size” : 806438303, “offloaded” : false, “underReplicated” : false }, { “ledgerId” : 83, “entries” : 50000, “size” : 806539134, “offloaded” : false, “underReplicated” : false }, { “ledgerId” : 89, “entries” : 50000, “size” : 806515803, “offloaded” : false, “underReplicated” : false }, { “ledgerId” : 126, “entries” : 17580, “size” : 2149590472, “offloaded” : false, “underReplicated” : false }, { “ledgerId” : 141, “entries” : 16665, “size” : 273228636, “offloaded” : false, “underReplicated” : false }, { “ledgerId” : 165, “entries” : 0, “size” : 0, “offloaded” : false, “underReplicated” : false } ], “cursors” : { “__compaction” : { “markDeletePosition” : “35:0”, “readPosition” : “35:1”, “waitingReadOp” : false, “pendingReadOps” : 0, “messagesConsumedCounter” : -184244, “cursorLedger” : -1, “cursorLedgerLastEntry” : -1, “individuallyDeletedMessages” : “[]“, “lastLedgerSwitchTimestamp” : “2023-09-21T11:28:28.353Z”, “state” : “NoLedger”, “active” : false, “numberOfEntriesSinceFirstNotAckedMessage” : 1, “totalNonContiguousDeletedMessagesRange” : 0, “subscriptionHavePendingRead” : false, “subscriptionHavePendingReplayRead” : false, “properties” : { “CompactedTopicLedger” : 44 } }, “websight-dxp-experiences-search-index-547cdf9c5d-dxv7v” : { “markDeletePosition” : “126:16907", “readPosition” : “35:1", “waitingReadOp” : false, “pendingReadOps” : 0, “messagesConsumedCounter” : -17337, “cursorLedger” : -1, “cursorLedgerLastEntry” : -1, “individuallyDeletedMessages” : “[]“, “lastLedgerSwitchTimestamp” : “2023-09-21T11:28:28.353Z”, “state” : “NoLedger”, “active” : false, “numberOfEntriesSinceFirstNotAckedMessage” : 0, “totalNonContiguousDeletedMessagesRange” : 0, “subscriptionHavePendingRead” : false, “subscriptionHavePendingReplayRead” : false, “properties” : { } } }, “schemaLedgers” : [ ], “compactedLedger” : { “ledgerId” : 44, “entries” : 1, “size” : 186, “offloaded” : false, “underReplicated” : false } }

Are you willing to submit a PR?

coderzc commented 1 year ago

@marekczajkowski Whether readCompacted is enabled on the subscription? It seems to be same issue with #21074

michalcukierman commented 1 year ago

@coderzc we both work on the same product. So let me answer your question. readCompacted is enabled, but the issue is different.

We stated that in #21074 one of the following can cause the loop:

We ensured that none of the above is happening and we still face the problem. The client remained in a loop for hours, only on one partition (partition-3).

Read position never changes during that time and it is equal to: “readPosition” : “35:1”

So it's not reconnecting and reading from the beginning, but the read cursor getting stucked.

Bundles unloading didn't help. Restart of the re-creation of the subscription fixed the problem.

michalcukierman commented 1 year ago

Isn't “waitingReadOp” = false, with the cursor set to the first message a problem? // May be not, I am still learning ;)

coderzc commented 1 year ago

@michalcukierman Thanks for the context, I'll try to reproduce it.

coderzc commented 1 year ago

Read position never changes during that time and it is equal to: “readPosition” : “35:1” So it's not reconnecting and reading from the beginning, but the read cursor getting stucked.

@michalcukierman So this is the same issue with #21104?

michalcukierman commented 1 year ago

No, this on relates to Exclusive subscription. In this issue, we fall into infinite loop, and the cursor position is not changing (we constantly receiving 1k messages/s). in #21104 the client stops to receive any messages, even if the backlog is not empty.

github-actions[bot] commented 1 year ago

The issue had no activity for 30 days, mark with Stale label.

michalcukierman commented 1 year ago

We still encounter the issue.