apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.16k stars 3.57k forks source link

Consumer able to receive message which is not matching the regex pattern #22529

Open ragaur-tibco opened 5 months ago

ragaur-tibco commented 5 months ago

Search before asking

Read release policy

Version

3.2.2

Minimal reproduce step

Steps to reproduce:

What did you expect to see?

What did you see instead?

package Pulsar;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

public class AllTopicsConsumerExample {
    private static PulsarAdmin adm;
    private static final String SERVICE_URL = "pulsar://localhost:6650";
    private static final String NAMESPACE = "my-tenant/new-name";
    private static final String SUBSCRIPTION_NAME = "your-subscription";

    public static void main(String[] args) throws PulsarClientException {
        PulsarClient pulsarClient = PulsarClient.builder()
                .serviceUrl(SERVICE_URL)
                .build();

        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic("non-persistent://my-tenant/new-name/topic-non-1")
                .enableBatching(false).create();
        producer.send("=========from topic non-persistent://my-tenant/new-name/topic-non-1 ");
        System.out.println("new producer");
        Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
                .topic("persistent://my-tenant/new-name/topic-pers-1")
                .enableBatching(false).create();
        producer1.send("=======from topic persistent://my-tenant/new-name/topic-pers-1 ");

        Pattern allTopicsPattern = Pattern.compile("non-persistent://my-tenant/new-name/.*");

        Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
                .topicsPattern(allTopicsPattern)
                .subscriptionName(SUBSCRIPTION_NAME).subscriptionTopicsMode(RegexSubscriptionMode.valueOf("AllTopics"))
                .subscribe();

        while (true) {
            Message<byte[]> message = allTopicsConsumer.receive();
            System.out.println("Received message from topic " + message.getTopicName()
                    + ": " + new String(message.getValue()));
            allTopicsConsumer.acknowledge(message);
        }
    }
}

image

Anything else?

No response

Are you willing to submit a PR?

visortelle commented 5 months ago

According to https://github.com/apache/pulsar/issues/19798, there is no need to specify persistent:// or non-persistent:// prefix in the topicsPattern

After discuss, we don't need this PIP, we just need to:

  • Add the warn log when the user-configured pattern contains a domain(‘persistent://public/default/topic.*')
    • Enhancement of the documentation, patternTopics cannot contain domains.
ragaur-tibco commented 5 months ago

ok Thank you @visortelle But for non-persistent I was not able to do multi topic subscription and as you said "Once in about ~5 runs I see some messages from non-persistent topic"

means for non-persistent it is not working properly for non-persistent topic.

visortelle commented 5 months ago

@ragaur-tibco I completely agree and don't argue with that.

visortelle commented 5 months ago

@ragaur-tibco the current behavior is correct. See my comment here: https://github.com/apache/pulsar/issues/22527#issuecomment-2067133567

visortelle commented 5 months ago

@ragaur-tibco I fixed your code.

When using non-persistent delivery, killing a Pulsar broker or disconnecting a subscriber to a topic means that all in-transit messages are lost on that (non-persistent) topic, meaning that clients may see message loss.

Source: https://pulsar.apache.org/docs/next/cookbooks-non-persistent/#overview

In your code, the subscriber was created after messages were sent.

Code:

package b;

import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

public class App {
    private static final String SERVICE_URL = "pulsar://localhost:6650";
    private static final String SUBSCRIPTION_NAME = "your-subscription";

    public static void main(String[] args) throws Exception {
        PulsarClient pulsarClient = PulsarClient.builder()
                .serviceUrl(SERVICE_URL)
                .build();

        Producer<String> producerA = pulsarClient.newProducer(Schema.STRING)
                .topic("non-persistent://my-tenant/new-name/topic-non-1")
                .enableBatching(false).create();

        Producer<String> producerB = pulsarClient.newProducer(Schema.STRING)
                .topic("persistent://my-tenant/new-name/topic-pers-1")
                .enableBatching(false).create();

        Pattern allTopicsPattern = Pattern.compile("my-tenant/new-name/.*");

        Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
                .topicsPattern(allTopicsPattern)
                .subscriptionName(SUBSCRIPTION_NAME)
                .subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
                .subscribe();

        producerA.send("=========from topic non-persistent://my-tenant/new-name/topic-non-1 ");
        producerB.send("=========from topic persistent://my-tenant/new-name/topic-pers-1 ");

        while (true) {
            Message<byte[]> message = allTopicsConsumer.receive();
            System.out.println("Received message from topic " + message.getTopicName()
                    + ": " + new String(message.getValue()));
            allTopicsConsumer.acknowledge(message);
        }
    }
}

Logs:

  a mvn exec:java                                                                                                                                                   <aws:aws-superadmin> <region:us-east-2>
[INFO] Scanning for projects...
[INFO] 
[INFO] --------------------------------< c:a >---------------------------------
[INFO] Building a 1.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec-maven-plugin:3.2.0:java (default-cli) @ a ---
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Received message from topic non-persistent://my-tenant/new-name/topic-non-1: =========from topic non-persistent://my-tenant/new-name/topic-non-1 
Received message from topic persistent://my-tenant/new-name/topic-pers-1: =========from topic persistent://my-tenant/new-name/topic-pers-1 
visortelle commented 5 months ago

@ragaur-tibco please check and let me know if it resolves the issue.

ragaur-tibco commented 5 months ago

Hi @visortelle

I tried creating subscriber before sending the messages

package Pulsar;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

public class AllTopicsConsumerExample {
    private static PulsarAdmin adm;
    private static final String SERVICE_URL = "pulsar://localhost:6650";
    private static final String NAMESPACE = "my-tenant/new-name";
    private static final String SUBSCRIPTION_NAME = "your-subscription-1";

    public static void main(String[] args) throws PulsarClientException {
        PulsarClient pulsarClient = PulsarClient.builder()
                .serviceUrl(SERVICE_URL)
                .build();

        // Pattern allTopicsPattern = Pattern.compile("tenant-1/name/topic.*");

        Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
                .topicsPattern("tenant-1/name/topic.*").subscriptionType(SubscriptionType.Shared)
                .subscriptionName(SUBSCRIPTION_NAME).subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
                .subscribe();

        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic("non-persistent://tenant-1/name/topic-1")
                .enableBatching(false).create();

        System.out.println("new producer");
        producer.send("=========from topic non-persistent://tenant-1/name/topic-1 ");

        Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
                .topic("persistent://tenant-1/name/topic-8")
                .enableBatching(false).create();
        producer1.send("=======from topic  persistent://tenant-1/name/topic-8 ");
        while (true) {
            Message<byte[]> message = allTopicsConsumer.receive();
            System.out.println("Received message from topic " + message.getTopicName()
                    + ": " + new String(message.getValue()));
            allTopicsConsumer.acknowledge(message);
        }
    }
}

response: only getting the response from persistent topic but not from non-persistent image

visortelle commented 5 months ago

Interesting.

My observation is that after we create a pattern consumer, for an existing non-persistent topic it doesn't "immediately" create the underlying subscription and consumers if there are no connected producers at this moment. But it will eventually be created after a short time.

TIP: you can display the list of the underlying consumers by casting your consumer to PatternMultiTopicsConsumerImpl and calling the .getConsumers() method.

List<ConsumerImpl<byte[]>> consumers = 
    ((PatternMultiTopicsConsumerImpl<byte[]>) allTopicsConsumer).getConsumers();

for (ConsumerImpl<byte[]> consumer : consumers) {
    System.out.println("consumer: " + consumer.getTopic());
}

If you modify your code to send a lot of messages asynchronously, you'll start to receive them after a short time.

for (int i = 0; i < 100000; i++) {
    producer.sendAsync("=========from topic non-persistent://tenant-1/name/topic-1 " + i);
}
...
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10732
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10733
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10734
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10735
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10736
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10737
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10738
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10739
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10740
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10741
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10742
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10743
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10744
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10745
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10746

I don't know if it can qualify as a bug. cc @lhotari

@ragaur-tibco do you have a real use-case in mind? I wouldn't rely on non-persistent topics if losing a non-significant amount of messages could affect my application.

visortelle commented 5 months ago

Here is the reason. Before adding a topic to the topics list, it checks that the topic isActive(), which checks for !subscriptions.isEmpty() || hasLocalProducers();.

https://github.com/apache/pulsar/blob/21647a1fc69ff46e65b6eaa37dd6d435e9f8eaef/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java#L1519

https://github.com/apache/pulsar/blob/21647a1fc69ff46e65b6eaa37dd6d435e9f8eaef/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java#L993