apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.14k stars 3.57k forks source link

Failover subscription sends first message to multiple consumers #15189

Open gvolpe opened 2 years ago

gvolpe commented 2 years ago

Describe the bug

I have two consumers connected using a Failover subscription to the same topic. I first run one consumer, and after a few minutes, I run the second consumer (no messages are being produced yet).

Once both consumers are up, I start publishing messages to the topic. When this happens, both consumers get the first message, and from there, only the selected active consumer gets the subsequent messages.

It looks like a bug to me, but if this is expected, I would appreciate if you could point me out to any kind of documentation in this regard.

To Reproduce

Steps to reproduce the behavior:

  1. Create a persistent non-partitioned topic.
  2. Create two consumers in Failover mode and run them.
  3. Create a producer and start publishing messages to the topic.
  4. See first message being received by both consumers.

Expected behavior

Only the active consumer should get all messages.

Desktop (please complete the following information):

Additional context

Here are a few things that might be relevant.

Pulsar version: 2.10.0 Pulsar Java client: 2.10.0

I have deduplication enabled on my broker configuration:

brokerDeduplicationEnabled=true

Messages published to the topic have an orderingKey, as I have other service running in KeyShared subscription mode reading the very same messages.

Technoboy- commented 2 years ago

Hi, does the topic has partitions? Below is the explanation of failover:

/**
* Multiple consumer will be able to use the same subscription name but only 1 consumer will receive the messages.
* If that consumer disconnects, one of the other connected consumers will start receiving messages.
*
* <p>In failover mode, the consumption ordering is guaranteed.
*
 * <p>In case of partitioned topics, the ordering is guaranteed on a per-partition basis.
* The partitions assignments will be split across the available consumers. On each partition,
* at most one consumer will be active at a given point in time.
*/
gvolpe commented 2 years ago

@Technoboy- no, it's not a partitioned topic. I have updated the issue details to clarify that.

Technoboy- commented 2 years ago

@Technoboy- no, it's not a partitioned topic. I have updated the issue details to clarify that.

Could the below test match the reproduce steps ?

@Test
    public void testFailoverConsumer() throws Exception {
        stopBroker();
        conf.setBrokerDeduplicationEnabled(true);
        setup();
        TopicName topicName = TopicName.get("persistent://my-property/my-ns/testFailoverConsumer");

        CompletableFuture<Consumer<byte[]>> consumer1 = pulsarClient.newConsumer()
                .topic(topicName.toString())
                .consumerName("aaa")
                .subscriptionName("sub-1")
                .subscriptionType(SubscriptionType.Failover)
                .subscribeAsync();

        Thread.sleep(10_000);

        CompletableFuture<Consumer<byte[]>> consumer2 = pulsarClient.newConsumer()
                .topic(topicName.toString())
                .consumerName("bbb")
                .subscriptionName("sub-1")
                .subscriptionType(SubscriptionType.Failover)
                .subscribeAsync();

        @Cleanup
        Producer<byte[]> producer = pulsarClient
                .newProducer()
                .topic(topicName.toString())
                .create();

        MessageId messageId = producer.newMessage().orderingKey("aaa".getBytes(UTF_8)).value("hello-failover".getBytes(UTF_8)).send();
        log.info("consumer1 : {}", consumer1.get().receive(5, TimeUnit.SECONDS));
        log.info("consumer2 : {}", consumer2.get().receive(5, TimeUnit.SECONDS));
    }
gvolpe commented 2 years ago

@Technoboy- yes, just need to use orderingKey instead of key when producing the message, and potentially enable deduplication in the broker if the issue can't be reproduced.

Furthermore, I use subscribeAsync / sendAsync in case that makes any difference.

Technoboy- commented 2 years ago

@Technoboy- yes, just need to use orderingKey instead of key when producing the message, and potentially enable deduplication in the broker if the issue can't be reproduced.

Furthermore, I use subscribeAsync / sendAsync in case that makes any difference.

Hi, I have updated the test case. You can put it in SimpleProducerConsumerTest to run locally. It's not reproducible. If the test need update, just put it in the comment to help us reproduce.

gvolpe commented 2 years ago

@Technoboy- thank you. I will try to adapt the test to see if it can be reproduced when I get back from holidays. Otherwise, I will create a minimal application where this is reproducible.

I can reproduce this issue consistently in my application, but it is written in functional Scala code via the Neutron library, so I'll have to see if there's something different that is not reflected in this Java code.

gvolpe commented 2 years ago

@Technoboy- one more thing. I don't set the consumer name of my consumers, not sure if that changes anything.

gvolpe commented 2 years ago

Another thing that's different in that test is that Neutron consumes messages by repeatedly invoking receiveAsync (see here). In your Java test, there is a single call to receive from each consumer running sequentially.

In most applications, consumers run concurrently, which is not reflected in this test.

Could that be the meaningful difference?

Technoboy- commented 2 years ago

Another thing that's different in that test is that Neutron consumes messages by repeatedly invoking receiveAsync (see here). In your Java test, there is a single call to receive from each consumer running sequentially.

In most applications, consumers run concurrently, which is not reflected in this test.

Could that be the meaningful difference?

I think there is no difference.

Technoboy- commented 2 years ago

@Technoboy- one more thing. I don't set the consumer name of my consumers, not sure if that changes anything.

Ah, not change anything.

gvolpe commented 2 years ago

Another key difference: in that test, a message is produced before the calls to receive.

In my application, both consumers are concurrently calling receiveAsync on a loop. Only after this the message is published.

gvolpe commented 2 years ago

@Technoboy- I came back from my holidays and tried to reproduce this but it no longer happens in my application 🤷🏽. There was probably some bad state either in my Pulsar broker or in my services, can't know for sure.

I also created an isolated application to try and reproduce this issue I had with the steps I had documented, but it turned out unsuccessful, which is good news! There is no such a bug :)

https://github.com/gvolpe/pulsar-failover-bug

Thanks a lot for bearing with me, appreciate the effort.

lhotari commented 5 days ago

This might be a real issue. There's also #15781 which is possibly related.

In Key_Shared subscriptions, there's a slightly related challenge when a consumer gets switched from one to another. In PIP-379 (currently in discussion phase) a "draining hashes" solution is proposed for handling this for Key_Shared subscriptions.

With failover subscriptions there should be a solution in place that the other consumer isn't just forcefully terminated if the consumer gets closed by a higher priority consumer. Any inflight acknowledgements should be handled with some configurable timeout so that duplicates are prevented in this case. I haven't checked the details of how this is currently handled.

lhotari commented 5 days ago

It looks like there has been a consideration to solve the issue with "PIP-260: Client consumer filter received messages", #19864.