Open yabinmeng opened 3 years ago
@yabinmeng Does the consumeA acknowledged the received messages before killing it? You can try to get the topic internal-stats(bin/pulsar-admin topics stats-internal) before run consumerB to make sure the mark delete position is moving forward in the local cluster.
@codelipenghui yes, consumeA acknowledged each message after receiving it.
I did some investigation on this issue and I found a pattern. The problem occurs if there are pending messages in the subscription.
As described above, I first produced 6 messages in the topic in cluster A. If I consume all 6 messages in the consumer running in cluster A, starting the consumer in cluster B won't get any messages as expected. However if only 3 messages are consumed and 3 remain in the subscription, it doesn't work.
@codelipenghui Thanks to the hint of using stats-internal .
After producing 6 messages:
from cluster A:
{
"entriesAddedCounter" : 30934,
"numberOfEntries" : 30934,
"totalSize" : 3074239,
"currentLedgerEntries" : 30934,
"currentLedgerSize" : 3074239,
"lastLedgerCreatedTimestamp" : "2021-03-31T07:34:57.665Z",
"waitingCursorsCount" : 1,
"pendingAddEntriesCount" : 0,
"lastConfirmedEntry" : "4:30933",
"state" : "LedgerOpened",
"ledgers" : [ {
"ledgerId" : 4,
"entries" : 0,
"size" : 0,
"offloaded" : false
} ],
"cursors" : {
"mysub" : {
"markDeletePosition" : "4:30732",
"readPosition" : "4:30733",
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 30733,
"cursorLedger" : 6,
"cursorLedgerLastEntry" : 75,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2021-03-31T07:34:57.676Z",
"state" : "Open",
"numberOfEntriesSinceFirstNotAckedMessage" : 1,
"totalNonContiguousDeletedMessagesRange" : 0,
"properties" : {
"pulsar.replicated.subscription" : 1
}
},
"pulsar.repl.cluster-b" : {
"markDeletePosition" : "4:30933",
"readPosition" : "4:30934",
"waitingReadOp" : true,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 30934,
"cursorLedger" : 5,
"cursorLedgerLastEntry" : 5233,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2021-03-31T07:34:57.674Z",
"state" : "Open",
"numberOfEntriesSinceFirstNotAckedMessage" : 1,
"totalNonContiguousDeletedMessagesRange" : 0,
"properties" : { }
}
}
}
from cluster B:
{
"entriesAddedCounter" : 31144,
"numberOfEntries" : 31144,
"totalSize" : 3150803,
"currentLedgerEntries" : 31144,
"currentLedgerSize" : 3150803,
"lastLedgerCreatedTimestamp" : "2021-03-31T07:34:58.097Z",
"waitingCursorsCount" : 1,
"pendingAddEntriesCount" : 0,
"lastConfirmedEntry" : "0:31143",
"state" : "LedgerOpened",
"ledgers" : [ {
"ledgerId" : 0,
"entries" : 0,
"size" : 0,
"offloaded" : false
} ],
"cursors" : {
"mysub" : {
"markDeletePosition" : "0:30727",
"readPosition" : "0:30728",
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 30728,
"cursorLedger" : 2,
"cursorLedgerLastEntry" : 81,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2021-03-31T07:34:58.833Z",
"state" : "Open",
"numberOfEntriesSinceFirstNotAckedMessage" : 1,
"totalNonContiguousDeletedMessagesRange" : 0,
"properties" : {
"pulsar.replicated.subscription" : 1
}
},
"pulsar.repl.cluster-a" : {
"markDeletePosition" : "0:31143",
"readPosition" : "0:31144",
"waitingReadOp" : true,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 31144,
"cursorLedger" : 1,
"cursorLedgerLastEntry" : 5274,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2021-03-31T07:34:58.219Z",
"state" : "Open",
"numberOfEntriesSinceFirstNotAckedMessage" : 1,
"totalNonContiguousDeletedMessagesRange" : 0,
"properties" : { }
}
}
}
After consuming 3 messages in cluster A:
from cluster A:
{
"entriesAddedCounter" : 31510,
"numberOfEntries" : 31510,
"totalSize" : 3134935,
"currentLedgerEntries" : 31510,
"currentLedgerSize" : 3134935,
"lastLedgerCreatedTimestamp" : "2021-03-31T07:34:57.665Z",
"waitingCursorsCount" : 1,
"pendingAddEntriesCount" : 0,
"lastConfirmedEntry" : "4:31509",
"state" : "LedgerOpened",
"ledgers" : [ {
"ledgerId" : 4,
"entries" : 0,
"size" : 0,
"offloaded" : false
} ],
"cursors" : {
"mysub" : {
"markDeletePosition" : "4:30765",
"readPosition" : "4:31410",
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 31407,
"cursorLedger" : 6,
"cursorLedgerLastEntry" : 77,
"individuallyDeletedMessages" : "[(4:30768..4:31409]]",
"lastLedgerSwitchTimestamp" : "2021-03-31T07:34:57.676Z",
"state" : "Open",
"numberOfEntriesSinceFirstNotAckedMessage" : 645,
"totalNonContiguousDeletedMessagesRange" : 1,
"properties" : {
"pulsar.replicated.subscription" : 1
}
},
"pulsar.repl.cluster-b" : {
"markDeletePosition" : "4:31509",
"readPosition" : "4:31510",
"waitingReadOp" : true,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 31510,
"cursorLedger" : 5,
"cursorLedgerLastEntry" : 5347,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2021-03-31T07:34:57.674Z",
"state" : "Open",
"numberOfEntriesSinceFirstNotAckedMessage" : 1,
"totalNonContiguousDeletedMessagesRange" : 0,
"properties" : { }
}
}
}
from cluster B:
{
"entriesAddedCounter" : 31615,
"numberOfEntries" : 31615,
"totalSize" : 3200474,
"currentLedgerEntries" : 31615,
"currentLedgerSize" : 3200474,
"lastLedgerCreatedTimestamp" : "2021-03-31T07:34:58.097Z",
"waitingCursorsCount" : 1,
"pendingAddEntriesCount" : 0,
"lastConfirmedEntry" : "0:31614",
"state" : "LedgerOpened",
"ledgers" : [ {
"ledgerId" : 0,
"entries" : 0,
"size" : 0,
"offloaded" : false
} ],
"cursors" : {
"mysub" : {
"markDeletePosition" : "0:30758",
"readPosition" : "0:30759",
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 30759,
"cursorLedger" : 2,
"cursorLedgerLastEntry" : 83,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2021-03-31T07:34:58.833Z",
"state" : "Open",
"numberOfEntriesSinceFirstNotAckedMessage" : 1,
"totalNonContiguousDeletedMessagesRange" : 0,
"properties" : {
"pulsar.replicated.subscription" : 1
}
},
"pulsar.repl.cluster-a" : {
"markDeletePosition" : "0:31614",
"readPosition" : "0:31615",
"waitingReadOp" : true,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 31615,
"cursorLedger" : 1,
"cursorLedgerLastEntry" : 5367,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2021-03-31T07:34:58.219Z",
"state" : "Open",
"numberOfEntriesSinceFirstNotAckedMessage" : 1,
"totalNonContiguousDeletedMessagesRange" : 0,
"properties" : { }
}
}
}
It seems that the problem might be related to the individuallyDeletedMessages
state.
PIP-33 mentions "we're only targeting to sync the "mark-delete" position (eg: offset), without considering the messages deleted out of order after that point."
@codelipenghui Is it expected that individuallyDeletedMessages
should be used? The messages are consumed in order. Why doesn't the markDeletePosition
move instead? It seems to move only when all pending messages are consumed.
this is another example where there's multiple individual message ranges:
{
"entriesAddedCounter" : 38551,
"numberOfEntries" : 38551,
"totalSize" : 3907922,
"currentLedgerEntries" : 38551,
"currentLedgerSize" : 3907922,
"lastLedgerCreatedTimestamp" : "2021-03-31T07:34:58.097Z",
"waitingCursorsCount" : 1,
"pendingAddEntriesCount" : 0,
"lastConfirmedEntry" : "0:38550",
"state" : "LedgerOpened",
"ledgers" : [ {
"ledgerId" : 0,
"entries" : 0,
"size" : 0,
"offloaded" : false
} ],
"cursors" : {
"mysub" : {
"markDeletePosition" : "0:38447",
"readPosition" : "0:38506",
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 38503,
"cursorLedger" : 2,
"cursorLedgerLastEntry" : 97,
"individuallyDeletedMessages" : "[(0:38448..0:38451],(0:38452..0:38454],(0:38455..0:38505]]",
"lastLedgerSwitchTimestamp" : "2021-03-31T07:34:58.833Z",
"state" : "Open",
"numberOfEntriesSinceFirstNotAckedMessage" : 59,
"totalNonContiguousDeletedMessagesRange" : 3,
"properties" : {
"pulsar.replicated.subscription" : 1
}
},
"pulsar.repl.cluster-a" : {
"markDeletePosition" : "0:38550",
"readPosition" : "0:38551",
"waitingReadOp" : true,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 38551,
"cursorLedger" : 1,
"cursorLedgerLastEntry" : 6556,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2021-03-31T07:34:58.219Z",
"state" : "Open",
"numberOfEntriesSinceFirstNotAckedMessage" : 1,
"totalNonContiguousDeletedMessagesRange" : 0,
"properties" : { }
}
}
}
in this case, the 1. consumer was run on cluster-B (to consume 3 messages). The other steps performed where the same as described in the previous comment.
I also wonder about the messagesConsumedCounter
. I've only produced and consumed <50 messages in this cluster after it's creation. The readPosition and markDeletePosition seem to increase a lot too. I wonder if this is because of the subscription replication marker messages that are part of the messages in the topic (as described in PIP-33).
It seems that the readPosition, markDeletePosition and individuallyDeletedMessages doesn't take it into account that the counter is also increased by the replication subscription messages. IIRC, some of the logic make assumptions based on the gaps in the counter value. This is just a hunch of what the problem could be.
another observation. After leaving it running, there are negative values in the messagesConsumedCounter:
{
"entriesAddedCounter" : 0,
"numberOfEntries" : 48221,
"totalSize" : 4931008,
"currentLedgerEntries" : 0,
"currentLedgerSize" : 0,
"lastLedgerCreatedTimestamp" : "2021-03-31T10:42:19.984Z",
"waitingCursorsCount" : 0,
"pendingAddEntriesCount" : 0,
"lastConfirmedEntry" : "0:48220",
"state" : "LedgerOpened",
"ledgers" : [ {
"ledgerId" : 0,
"entries" : 48221,
"size" : 4931008,
"offloaded" : false
}, {
"ledgerId" : 3,
"entries" : 0,
"size" : 0,
"offloaded" : false
} ],
"cursors" : {
"mysub" : {
"markDeletePosition" : "0:38396",
"readPosition" : "0:38397",
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : -9824,
"cursorLedger" : 2,
"cursorLedgerLastEntry" : 97,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2021-03-31T10:42:19.999Z",
"state" : "NoLedger",
"numberOfEntriesSinceFirstNotAckedMessage" : 1,
"totalNonContiguousDeletedMessagesRange" : 0,
"properties" : {
"pulsar.replicated.subscription" : 1
}
},
"pulsar.repl.cluster-a" : {
"markDeletePosition" : "0:48216",
"readPosition" : "0:48217",
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : -4,
"cursorLedger" : 1,
"cursorLedgerLastEntry" : 8490,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2021-03-31T10:42:20.002Z",
"state" : "NoLedger",
"numberOfEntriesSinceFirstNotAckedMessage" : 1,
"totalNonContiguousDeletedMessagesRange" : 0,
"properties" : { }
}
}
}
It seems odd to have negative values. I wonder if this causes any issues.
It seems that org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl#getPreviousPosition
doesn't take the "marker messages" for replicated subscriptions into account and that is part of the problem.
In PIP-33 there is this documented limitation:
For this proposal, we're only targeting to sync the "mark-delete" position (eg: offset), without considering the messages deleted out of order after that point. These will appear as duplicates after a cluster failover. In future, it might be possible to combine different techniques to track individually deleted messages as well.
It seems that because of the "marker messages" being acknowledged automatically, the messages get deleted out of order in all cases where the consumer doesn't keep up with the producer. This explains the individuallyDeletedMessages
ranges seen when the problem occurs. When there are individuallyDeletedMessages
in the subscription, the markDeletedPosition
doesn't get updated and therefore the subscription state doesn't get replicated.
@merlimat Is this also a known limitation in the design? Do you have plans for addressing this issue where the "marker message" acknowledgements prevent subscription replication by causing out-of-order deletions? I might have misunderstood how the solution works and what is causing the behavior reported in the description of this issue.
this test https://github.com/apache/pulsar/pull/10098/commits/28faa8efd98a88004222d810407875c585cd8d67 reproduces the issue when "allowDuplicates" is set to "false". The test is part of PR #10098 .
@merlimat do you have a chance to take a look at this issue and answer the question above? thanks
the test case to reproduce the issue has been merged to master branch. You can reproduce the issue by modifying allowDuplicates = false
in the org.apache.pulsar.broker.service.ReplicatorSubscriptionTest#testReplicatedSubscriptionAcrossTwoRegions
test case:
Is it a valid expectation that the test case is making (when allowDuplicates = false
)?
(btw. It seems that replicated subscriptions are broken in the master branch. The fix #10247 is required, but that doesn't fix this issue.)
I took a look at the test and the issue is that by the time the consumer has acknowledged all the messages there is still no valid snapshot, therefore we cannot advance the position of the replicated subscription.
After the topic is created, the brokers will start sending markers and creating snapshots of the cursors. This happens with default cadence of 1sec.
If the consumer asks everything before that time, there will be no snapshot created yet. If traffic continues, after few secs, the subscription will be replicated correctly.
To "fix" the test I just put a Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis())
before the producer gets started.
One other minor thing in the test is that we don't need to create the subscription in R2 (https://github.com/apache/pulsar/blob/05f1f7e58fc54648e7843335dfb64d413d9b6a1e/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java#L94). Creating the replicated subscription in R1 will automatically trigger the creation of the subscription in all the clusters (where the topic is being replicated).
To "fix" the test I just put a
Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis())
before the producer gets started.One other minor thing in the test is that we don't need to create the subscription in R2 (
Thanks for checking this @merlimat .
I tried these changes in this experiment: https://github.com/lhotari/pulsar/commit/2892e25d1654ea0e2c71891ffc89502a637612e3
However, if the subscription isn't created before the producer is created, no messages are received on R2. I wonder if the test case is missing some necessary configuration?
@codelipenghui yes, consumeA acknowledged each message after receiving it.
@yabinmeng Has the problem of not being able to continue consumption on another cluster been resolved?
I took a look at the test and the issue is that by the time the consumer has acknowledged all the messages there is still no valid snapshot, therefore we cannot advance the position of the replicated subscription.
After the topic is created, the brokers will start sending markers and creating snapshots of the cursors. This happens with default cadence of 1sec.
If the consumer asks everything before that time, there will be no snapshot created yet. If traffic continues, after few secs, the subscription will be replicated correctly.
To "fix" the test I just put a
Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis())
before the producer gets started.One other minor thing in the test is that we don't need to create the subscription in R2 (
). Creating the replicated subscription in R1 will automatically trigger the creation of the subscription in all the clusters (where the topic is being replicated).
thank you @merlimat , The cause of the problem is that continuous produce messages are required, and one-time production messages that are too short indeed cannot trigger the replication state.
To sum up: There are two conditions for subscription replication between multiple sites in pulsar 2.8:
The issue had no activity for 30 days, mark with Stale label.
I had the same problem with version 2.8.1 of Pulsar. What is the final solution to this issue?
I had the same problem with version 2.8.1 of Pulsar. What is the final solution to this issue?
To sum up: There are two conditions for subscription replication between multiple sites in pulsar 2.8:
Continuous traffic data between two sites Close message deduplication in broker.conf
@liguangcheng Got it, thanks.
@liangjianwen @liguangcheng There's a possible mitigation for the "continuous traffic data between two sites" requirement in #16651 . It doesn't remove it completely since the subscription replication can happen only for positions which have a valid subscription snapshot. Please share more details about your observations or use cases.
@liguangcheng Got it, thanks.
@liangjianwen I still got the same problem with 2.8.1 even though I met the two conditions mentioned by @liguangcheng , does it work for you?
For anyone dealing with subscription replication issues, one thing to check is whether replication snapshots are timing out. Increasing the timeout threshold (replicatedSubscriptionsSnapshotTimeoutSeconds=30
-> replicatedSubscriptionsSnapshotTimeoutSeconds=60
) could help solve issues in that case.
See https://github.com/apache/pulsar/discussions/21612#discussioncomment-7649210 for more details.
There's also a pending PR #16651 that mitigates some issues in snapshot creation and reduces unnecessary snapshots.
Hey, was there any update on this ? I'm using pulsar 2.10.x , and facing the same issue. I read through the related issues on this repo, but even with continuous message production (100 messages with 1 sec sleep between each, as well as 1000 messages with 0.1 sleep between each) and dedup enabled. I'm not getting the expected behavior of consuming x messages on one cluster, closing the client, and starting from x+1 st message on second cluster (or anywhere near x for that matter, the second consumer always start from 0th message). set the snapshot timeout to 60, but not sure where to see if the snapshot is failing, there are no obvious logs related to replication failure in broker.
Hey, was there any update on this ? I'm using pulsar 2.10.x , and facing the same issue. I read through the related issues on this repo, but even with continuous message production (100 messages with 1 sec sleep between each, as well as 1000 messages with 0.1 sleep between each) and dedup enabled. I'm not getting the expected behavior of consuming x messages on one cluster, closing the client, and starting from x+1 st message on second cluster (or anywhere near x for that matter, the second consumer always start from 0th message). set the snapshot timeout to 60, but not sure where to see if the snapshot is failing, there are no obvious logs related to replication failure in broker.
@YJDoc2 there are discussions https://github.com/apache/pulsar/discussions/22315 and https://github.com/apache/pulsar/discussions/21612 which might contain some useful information.
Since replicated subscriptions will only replicate the mark-delete position, it is worth checking pulsar-admin topics stats-internal
for the topic in each cluster to check the subscription state.
Hey @lhotari , thanks for the resources, they helped a lot. So to be clear, from what I have understood, is the following correct : For two clusters A and B, where cluster B is replicating cluster A (so cluster A is the "main" one)
Is my understanding correct? Thanks!
Describe the bug I'm testing geo-replication with subscription enabled. In my testing environment, there are 2 clusters (ClusterA and ClusterB) with a global zookeeper on a dedicated server host.
Each cluster also has its own producer and consumer client application which is based on Pulsar (Java) producer API and consumer Sync API.
Below is the summary of my testing steps:
I'm expecting ConsumerB to receive the second half of the messages (e.g. msg 5 ~ 9) if subscription replication is working properly. But the results shows that for most time, ConsumerB receives all 10 messages (e.g. msg 0 ~ 9). There are also several cases where ConsumerB receives 6 or 7 messages (e.g. msg 3 ~ 9 or 4 ~ 9).
My testing is based on Pulsar 2.6.3 with default configuration, with key geo-replication/subscription replication settings below:
broker.conf
The consumer application also has the following code added:
To Reproduce See "Description" section
Expected behavior See "Description" section
Screenshots Below is one example of the testing results;
ProducerA publishes 6 messages to a geo-replication enabled topic "georep/default/t1"
ConsumerA receives 3 messages from this topic
Wait for a short while and starts ConsumerB in ClusterB and tries to receive all remaining messages. But actually it receives all 6 messages, the first 3 of which have already been consumed and acknowledged by ClusterA
Desktop (please complete the following information):
Additional context Add any other context about the problem here.