apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.18k stars 3.58k forks source link

Cannot connect to multiple geo-replicated partitioned topics via consumer #6832

Closed darksungg closed 4 years ago

darksungg commented 4 years ago

We are using the geo-repliated model, producing in US-East via a partitioned topic (persistent://my-tenant/my-namespace/my-topic). Looking in US-West, we can see persistent://my-tenant/my-namespace/my-topic-part0 to part2, as expected. Problem is that when we try to connect to persistent://my-tenant/my-namespace/my-topic in US-West or to persistent://my-tenant/my-namespace/my-topic. via regex, the java consumer client does not get any messages produces in US-East via persistent://my-tenant/my-namespace/my-topic or persistent://my-tenant/my-namespace/my-topic-partition-. .

I have read the documentation multiple times and no real success to consume a geo-replicated partitioned topic. Only way o can consume is manually consuming from persistent://my-tenant/my-namespace/my-topic-partition-0 (not a multi topic).

Thanks for your help.

darksungg commented 4 years ago

Adding to this it seems that testing the issue, having 3 consumers on 1 client for each partition does not work, the second consumer gets stuck at .subscribe().

I've tried via the tutorial and it did not work possible the above problem (gets stuck at subscribe() also)

ConsumerBuilder consumerBuilder = pulsarClient.newConsumer() .subscriptionName(subscription);

// Subscribe to all topics in a namespace Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*"); Consumer allTopicsConsumer = consumerBuilder .topicsPattern(allTopicsInNamespace) .subscribe();

jiazhai commented 4 years ago

@darksungg Seems the default consumer mode is exclusive, Would you please help try on a failover or shared mode?

darksungg commented 4 years ago

Hey @jiazhai thank you for the response.

Please see below the producer (us-east), consumer(us-west) code:

Producer:

this.client = PulsarClient.builder() .serviceUrl(serviceUrl) .build(); String topic = "persistent://my-tenant/my-namespace/my-topic"; Producer<byte[]> producer = client.newProducer() .topic(topic) .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) .create(); producer.send(message.getBytes());

Consumer:

this.client = PulsarClient.builder() .serviceUrl(serviceUrl) .build(); Pattern allTopicsInNamespace = Pattern.compile("persistent://my-tenant/my-namespace/my-topic.*"); this.consumer = client.newConsumer() .topicsPattern(allTopicsInNamespace) .subscriptionName("my-sub") .subscriptionType(SubscriptionType.Shared) .subscribe();

There are 2 clusters, deployed with the terraform-ansbile scripts, i produce in us-east and consume in us-west. As stated above, the sub is shared. Problem is that every time i try to sub to a multi-topic, the subscribe() method gets stuck. If i try on a normal topic, things are good.

Thanks for the help!

darksungg commented 4 years ago

Also would like to ask if the same partition topic needs to be created in both regions or only in one ( we produce only in one region only )

darksungg commented 4 years ago

Found the problem, not mentioned in docs :

this.client = PulsarClient.builder() .serviceUrl(serviceUrl) .connectionsPerBroker(12) .connectionTimeout(2, TimeUnit.SECONDS) .operationTimeout(2, TimeUnit.SECONDS) .keepAliveInterval(300, TimeUnit.SECONDS) .ioThreads(12) .listenerThreads(12) .build();

You need multiple threads if multipleTopicConsumer for client.