apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.26k stars 3.59k forks source link

[Bug] Hash range collision causes out-of-order cases between existing consumers in Key_Shared #23315

Closed equanz closed 1 month ago

equanz commented 2 months ago

Search before asking

Read release policy

Version

Tested with https://github.com/apache/pulsar/tree/4f96146f13b136644a4eb0cf4ec36699e0431929 .

Minimal reproduce step

Apply the following patches and run the test.

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
index 48311c5733..685baeef9d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
@@ -29,11 +29,13 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
 import org.apache.pulsar.client.api.Range;
 import org.testng.Assert;
 import org.testng.annotations.Test;

+@Slf4j
 @Test(groups = "broker")
 public class ConsistentHashingStickyKeyConsumerSelectorTest {

@@ -216,4 +218,35 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest {
         // then there should be no mapping remaining
         Assert.assertEquals(selector.getConsumerKeyHashRanges().size(), 0);
     }
+
+    @Test
+    public void testModifyMappingBetweenExistingConsumers() {
+        final ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(100);
+        final String consumerName = "c1";
+        final int numOfInitialConsumers = 3;
+        for (int i = 0; i < numOfInitialConsumers; i++) {
+            final Consumer consumer = mock(Consumer.class);
+            when(consumer.consumerName()).thenReturn(consumerName);
+            when(consumer.consumerId()).thenReturn((long) i);
+            selector.addConsumer(consumer);
+        }
+
+        final int keyHash = numOfInitialConsumers + 1;
+        // get expected consumer
+        final Consumer expectedConsumer = selector.select(keyHash);
+
+        // add new consumer
+        final Consumer newConsumer = mock(Consumer.class);
+        when(newConsumer.consumerName()).thenReturn(consumerName);
+        when(newConsumer.consumerId()).thenReturn((long) numOfInitialConsumers);
+        selector.addConsumer(newConsumer);
+
+        final Consumer actualConsumer = selector.select(keyHash);
+        try {
+            Assert.assertEquals(actualConsumer.consumerId(), expectedConsumer.consumerId());
+        } catch (AssertionError e) {
+            // if it changes (that is normal behavior), expect it to be the new consumer
+            Assert.assertEquals(actualConsumer.consumerId(), newConsumer.consumerId());
+        }
+    }
 }

What did you expect to see?

In auto-split hash mode, we expect that the new consumer takes the hash range from existing consumers. (The dispatcher addresses the above case by recentlyJoinedConsumers.)

So, the range doesn't move between existing consumers.

What did you see instead?

[ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 1.289 s <<< FAILURE! - in org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelectorTest
[ERROR] org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelectorTest.testModifyMappingBetweenExistingConsumers  Time elapsed: 0.212 s  <<< FAILURE!
java.lang.AssertionError: expected [3] but found [0]
        at org.testng.Assert.fail(Assert.java:110)
        at org.testng.Assert.failNotEquals(Assert.java:1577)
        at org.testng.Assert.assertEqualsImpl(Assert.java:149)
        at org.testng.Assert.assertEquals(Assert.java:131)
        at org.testng.Assert.assertEquals(Assert.java:979)
        at org.testng.Assert.assertEquals(Assert.java:955)
        at org.testng.Assert.assertEquals(Assert.java:989)
        at org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelectorTest.testModifyMappingBetweenExistingConsumers(ConsistentHashingStickyKeyConsumerSelectorTest.java:251)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:569)
        at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
        at org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
        at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
        at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)

When the hash range collides, the selector stores the consumer in the list of collisions. https://github.com/apache/pulsar/blob/4f96146f13b136644a4eb0cf4ec36699e0431929/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java#L67-L73

And, get the consumer by the following calculation. https://github.com/apache/pulsar/blob/4f96146f13b136644a4eb0cf4ec36699e0431929/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java#L127

4 % 3 = 1 , then return the consumer which has consumerId 1 (add new consumer which has consumerId 3) 4 % 4 = 0 , then return the consumer which has consumerId 0 Consumers with consumerId of 0 and 1 are existing consumers. So, the range moves between existing consumers.

The above case leads to out-of-order redelivery. Shouldn't we care about this?

Anything else?

For ease, I use the same name as the consumer in this example. However, this issue is caused not only by consumers of the same name but also by coincidence hash collisions.

(This issue was originally reported by @hrsakai .)

Are you willing to submit a PR?

lhotari commented 2 months ago

@equanz @hrsakai Thanks for reporting. Coincidentally I was investigating this today and had this concern but didn't validate it.

lhotari commented 2 months ago

Just guessing, but perhaps the intention of this code is to select a different client for each partition when there are multiple consumers with the same name: https://github.com/apache/pulsar/blob/4f96146f13b136644a4eb0cf4ec36699e0431929/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java#L127

lhotari commented 2 months ago

Slightly related issue about the incorrect results of getConsumerKeyHashRanges: #23321

codelipenghui commented 2 months ago

The above case leads to out-of-order redelivery.

I don't think it will cause out-of-order issue.

Get a different consumer for the same hash is expected since the consumers are changed.

lhotari commented 2 months ago

I don't think it will cause out-of-order issue.

  • The ordering is not guaranteed by Hash Selector, is was guaranteed by the PersistentStickyKeyDispatcherMultipleConsumers
  • The newly joined consumer will only get messages after the old consumers have acknowledged all the outstanding messages

Get a different consumer for the same hash is expected since the consumers are changed.

The result of this bug is that the target consumer will switch also for existing consumers in certain cases. @codelipenghui Did you consider that case?

lhotari commented 2 months ago

I'm pretty sure that consumerList.get(hash % consumerList.size()) is wrong. The fix is explained in https://github.com/apache/pulsar/issues/23321#issuecomment-2362826648 . I'll submit a PR.

codelipenghui commented 2 months ago

The result of this bug is that the target consumer will switch also for existing consumers in certain cases. @codelipenghui Did you consider that case?

@lhotari Oh, I got your point for now. One consumer joined will cause the key assignment change for many other consumers. Thanks for the explanation.

lhotari commented 2 months ago

It looks like #8396 wasn't a correct solution at the time it was made. @codelipenghui I think that we need to address this for all maintenance branches.

lhotari commented 2 months ago

It looks like #8396 wasn't a correct solution at the time it was made. @codelipenghui I think that we need to address this for all maintenance branches.

I have created #23327 to fix the issue. Please review

lhotari commented 1 month ago

23327 is now ready for review after multiple iterations.

equanz commented 1 month ago

Just guessing, but perhaps the intention of this code is to select a different client for each partition when there are multiple consumers with the same name: https://github.com/apache/pulsar/issues/23315#issuecomment-2360761086

The list is sorted by consumerName. https://github.com/apache/pulsar/blob/4f96146f13b136644a4eb0cf4ec36699e0431929/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java#L67-L73

The behavior when the keys to be compared are the same seems to be undefined. (I haven't rechecked it, but I think, in my environment, it was in order of addition.) https://docs.oracle.com/javase/8/docs/api/java/util/ArrayList.html#sort-java.util.Comparator-

So, as you say, each partition's selector could be different. However, if the producer uses a partition key in RoundRobin or SinglePartition routing mode, specific key messages are sent to a specific partition. If this is correct, it would be no issue on the key order guarantee. cf. https://pulsar.apache.org/docs/3.3.x/concepts-messaging/#ordering-guarantee

equanz commented 1 month ago

I don't think it will cause out-of-order issue. https://github.com/apache/pulsar/issues/23315#issuecomment-2361736557

The result of this bug is that the target consumer will switch also for existing consumers in certain cases. https://github.com/apache/pulsar/issues/23315#issuecomment-2361810469

One consumer joined will cause the key assignment change for many other consumers. https://github.com/apache/pulsar/issues/23315#issuecomment-2362842362

(Thank you @lhotari !) That is correct. My concern is "the range moves between existing consumers". In my understanding, this case is not currently addressed by the dispatcher.

lhotari commented 1 month ago

Possibly the same issue in this Slack message: https://www.linen.dev/s/apache-pulsar/t/23079402/hi-we-re-using-key-shared-mode-the-key-is-a-code-between-100