datastax / pulsar-jms

DataStax Starlight for JMS, a JMS API for Apache Pulsar ®
Apache License 2.0
48 stars 21 forks source link

Implement createConnectionConsumer for Jboss EE . - Required for Consuming messages for Jboss Container #26

Closed akhil1000 closed 2 years ago

akhil1000 commented 2 years ago

Implement createConnectionConsumer for Jboss EE . This is the method called by the JBoss container.

This method is currently not implemented https://github.com/datastax/pulsar-jms/blob/96606d0f22a1af8fdde0c5eff0f5dde086d9862c/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnection.java#L685

eolivelli commented 2 years ago

@akhil1000 this is a draft https://github.com/datastax/pulsar-jms/pull/27

please test it

akhil1000 commented 2 years ago

Hello @eolivelli,
It seems to move a little ahead and called the ConnectionConsumer but gave a null pointer. It required me to put webserviceURL also - which is not required while publishing...but required for consuming . Everything worked with PR except one compilation error. I just changed word "apply" to "build" in PulsarConnection line no. 966 MessageConsumer consumer = consumerBuilder.build(session);

Error Stack javax.jms.JMSException: org.apache.pulsar.client.admin.PulsarAdminException: java.lang.NullPointerException at com.datastax.oss.pulsar.jms.Utils.handleException(Utils.java:63) at com.datastax.oss.pulsar.jms.PulsarConnectionFactory.ensureQueueSubscription(PulsarConnectionFactory.java:963) at com.datastax.oss.pulsar.jms.PulsarMessageConsumer.subscribe(PulsarMessageConsumer.java:116) at com.datastax.oss.pulsar.jms.PulsarSession.createConsumer(PulsarSession.java:783) at com.datastax.oss.pulsar.jms.PulsarSession.createConsumer(PulsarSession.java:734) at com.datastax.oss.pulsar.jms.PulsarSession.createConsumer(PulsarSession.java:73) at com.datastax.oss.pulsar.jms.PulsarConnection.lambda$createConnectionConsumer$0(PulsarConnection.java:688) at com.datastax.oss.pulsar.jms.PulsarConnection.buildConnectionConsumer(PulsarConnection.java:965) at com.datastax.oss.pulsar.jms.PulsarConnection.createConnectionConsumer(PulsarConnection.java:687) at org.jboss.resource.adapter.jms.inflow.JmsServerSessionPool.setupConsumer(JmsServerSessionPool.java:289) at org.jboss.resource.adapter.jms.inflow.JmsServerSessionPool.start(JmsServerSessionPool.java:89) at org.jboss.resource.adapter.jms.inflow.JmsActivation.setupSessionPool(JmsActivation.java:656) at org.jboss.resource.adapter.jms.inflow.JmsActivation.setup(JmsActivation.java:359) at org.jboss.resource.adapter.jms.inflow.JmsActivation$SetupActivation.run(JmsActivation.java:729) at org.jboss.resource.work.WorkWrapper.execute(WorkWrapper.java:205) at org.jboss.util.threadpool.BasicTaskWrapper.run(BasicTaskWrapper.java:260) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) Caused by: org.apache.pulsar.client.admin.PulsarAdminException: java.lang.NullPointerException at org.apache.pulsar.client.admin.internal.BaseResource.getApiException(BaseResource.java:251) at org.apache.pulsar.client.admin.internal.TopicsImpl.createSubscription(TopicsImpl.java:1108) at com.datastax.oss.pulsar.jms.PulsarConnectionFactory.ensureQueueSubscription(PulsarConnectionFactory.java:932) ... 17 more

eolivelli commented 2 years ago

Try with setting jms.usePulsarAdmin = false and you won't need to use Pulsar Admin https://docs.datastax.com/en/fast-pulsar-jms/docs/1.1/pulsar-jms-reference.html

akhil1000 commented 2 years ago

Thank you, @eolivelli - I used the above flag and now get a little differnt error ConcurrentExceution error.

akhil1000 commented 2 years ago

Here is the complete trace - Its ccalling OMSClientImpl.subscribeAsyncParent at some point -

javax.jms.JMSException: org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Receiver queue size needs to be greater than 0 for Topics Consumer

at com.datastax.oss.pulsar.jms.Utils.handleException(Utils.java:63)

at com.datastax.oss.pulsar.jms.PulsarConnectionFactory.ensureQueueSubscription(PulsarConnectionFactory.java:963)

at com.datastax.oss.pulsar.jms.PulsarMessageConsumer.subscribe(PulsarMessageConsumer.java:116)

at com.datastax.oss.pulsar.jms.PulsarSession.createConsumer(PulsarSession.java:783)

at com.datastax.oss.pulsar.jms.PulsarSession.createConsumer(PulsarSession.java:734)

at com.datastax.oss.pulsar.jms.PulsarSession.createConsumer(PulsarSession.java:73)

at com.datastax.oss.pulsar.jms.PulsarConnection.lambda$createConnectionConsumer$0(PulsarConnection.java:688)

at com.datastax.oss.pulsar.jms.PulsarConnection.buildConnectionConsumer(PulsarConnection.java:965)

at com.datastax.oss.pulsar.jms.PulsarConnection.createConnectionConsumer(PulsarConnection.java:687)

at org.jboss.resource.adapter.jms.inflow.JmsServerSessionPool.setupConsumer(JmsServerSessionPool.java:289)

at org.jboss.resource.adapter.jms.inflow.JmsServerSessionPool.start(JmsServerSessionPool.java:89)

at org.jboss.resource.adapter.jms.inflow.JmsActivation.setupSessionPool(JmsActivation.java:656)

at org.jboss.resource.adapter.jms.inflow.JmsActivation.setup(JmsActivation.java:359)

at org.jboss.resource.adapter.jms.inflow.JmsActivation$SetupActivation.run(JmsActivation.java:729)

at org.jboss.resource.work.WorkWrapper.execute(WorkWrapper.java:205)

at org.jboss.util.threadpool.BasicTaskWrapper.run(BasicTaskWrapper.java:260)

at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

at java.lang.Thread.run(Unknown Source)

Caused by: org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Receiver queue size needs to be greater than 0 for Topics Consumer

at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1074)

at org.apache.pulsar.client.impl.ConsumerBuilderImpl.subscribe(ConsumerBuilderImpl.java:101)

at com.datastax.oss.pulsar.jms.PulsarConnectionFactory.ensureQueueSubscription(PulsarConnectionFactory.java:946)

... 17 more

Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Receiver queue size needs to be greater than 0 for Topics Consumer

at java.util.concurrent.CompletableFuture.reportGet(Unknown Source)

at java.util.concurrent.CompletableFuture.get(Unknown Source)

at org.apache.pulsar.client.impl.ConsumerBuilderImpl.subscribe(ConsumerBuilderImpl.java:99)

... 18 more

Caused by: java.lang.IllegalArgumentException: Receiver queue size needs to be greater than 0 for Topics Consumer

at com.google.common.base.Preconditions.checkArgument(Preconditions.java:145)

at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.(MultiTopicsConsumerImpl.java:139)

at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.(MultiTopicsConsumerImpl.java:128)

at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.(MultiTopicsConsumerImpl.java:112)

at org.apache.pulsar.client.impl.PulsarClientImpl.multiTopicSubscribeAsync(PulsarClientImpl.java:514)

at org.apache.pulsar.client.impl.PulsarClientImpl.subscribeAsync(PulsarClientImpl.java:466)

at com.intuit.platform.messaging.pulsar.client.OMSClientImpl.subscribeAsyncParent(OMSClientImpl.java:328)

at com.intuit.platform.messaging.pulsar.client.OMSClientImpl.subscribeAsyncInternal(OMSClientImpl.java:429)

at com.intuit.platform.messaging.pulsar.client.OMSClientImpl.subscribeAsync(OMSClientImpl.java:533)

at org.apache.pulsar.client.impl.ConsumerBuilderImpl.lambda$subscribeAsync$1(ConsumerBuilderImpl.java:173)

at java.util.concurrent.CompletableFuture.uniComposeStage(Unknown Source)

at java.util.concurrent.CompletableFuture.thenCompose(Unknown Source)

at org.apache.pulsar.client.impl.ConsumerBuilderImpl.subscribeAsync(ConsumerBuilderImpl.java:171

eolivelli commented 2 years ago

The error "Receiver queue size needs to be greater than 0 for Topics Consumer" happens if you try to to create a Consumer with 0 queue size on a partitioned topic.

In your case you are setting jms.usePulsarAdmin to false and so the JMS Client can only pre-create the jms-queue subscription by creating a dummy Consumer with zero-queue (this is needed in order to not prefetch messages, that would alter the order of dispatching the message).

We have some ways to work around this issue: 1) use a non-partitioned topic 2) use a JMS Topic instead of a JMS Queue (this doesn't require to pre-create the subscription) 3) set jms.usePulsarAdmin to true and provide the webServiceUrl connection parameter 4) pre-create the jms-queue subscription on the partition, but in order to support this case we have to add some verification on the ensureQueueSubscripion method (https://github.com/datastax/pulsar-jms/blob/d136014bb8ec5cfba5e3528cc14fd9ea11f26ce9/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java#L849)

The best way here is to let the JMS Client use PulsarAdmin (option 3), otherwise you will be still limited in other features (like Temporary destinations management or the 'unsubscribe' feature).

if you can't go with 1), 2), 3) then please let me know

akhil1000 commented 2 years ago

Thank you Enrico- 1 and 2 - Can't use a Topic as we are moving from ActiveMQ to Pulsar so need to use existing queues and topics for cutover.

  1. PulsarAdmin is also not sure whether the team will allow or maintain webservice URL etc.
  2. I have to talk to the team here if they can pre-create I have not faced this issue with ActiveMQ RAR- We are moving from ActiveMQ to Pulsar.
akhil1000 commented 2 years ago

Also, altering of the order of dispatching messages would not be a problem if it's only at create time.

akhil1000 commented 2 years ago

ActiveMq used to have this Prefetch policies not sure if this applies to Pulsar also

https://activemq.apache.org/what-is-the-prefetch-limit-for

eolivelli commented 2 years ago

@akhil1000 in order to support your use case (partitioned-topic + cannot use PulsarAdmin) I added support for this new configuration parameter: jms.precreateQueueSubscriptionConsumerQueueSize=0

please set it to 1 in your PulsarConfigurationFactory configuration, the error should go away

akhil1000 commented 2 years ago

2022-05-24 00:38:39,346 INFO [com.sds.prs.mdb.derivation.MDBDerivation] Success: <?xml version="1.0" encoding="UTF-8"?> </ns0:PartyMaster>

SuccessNoReply 2022-05-24 00:38:39,347 INFO [STDOUT] ON MESSAGE FINISHED 2022-05-24 00:38:39,352 ERROR [com.datastax.oss.pulsar.jms.PulsarMessageConsumer] Error while receiving message con consumer PulsarConsumer{subscriptionName=3fdedde1-66c3-43e6-ba53-9d309e7d60f3, destination=Queue{persistent://mse/cmdm/public.globalvirtual.e2e.changepub.Virtual}} 2022-05-24 00:38:39,352 ERROR [com.datastax.oss.pulsar.jms.PulsarSession] Internal error javax.jms.JMSException: org.apache.pulsar.client.api.PulsarClientException$InvalidConfigurationException: Transactions are not enabled at com.datastax.oss.pulsar.jms.Utils.handleException(Utils.java:63) at com.datastax.oss.pulsar.jms.PulsarSession.startTransaction(PulsarSession.java:134) at com.datastax.oss.pulsar.jms.PulsarSession.getTransaction(PulsarSession.java:111) at com.datastax.oss.pulsar.jms.PulsarMessageConsumer.handleReceivedMessage(PulsarMessageConsumer.java:431) at com.datastax.oss.pulsar.jms.PulsarMessageConsumer.lambda$runListener$5(PulsarMessageConsumer.java:596) at com.datastax.oss.pulsar.jms.Utils.lambda$executeMessageListenerInSessionContext$0(Utils.java:113) at com.datastax.oss.pulsar.jms.PulsarSession.executeCriticalOperation(PulsarSession.java:1676) at com.datastax.oss.pulsar.jms.Utils.executeMessageListenerInSessionContext(Utils.java:111) at com.datastax.oss.pulsar.jms.PulsarMessageConsumer.runListener(PulsarMessageConsumer.java:581) at com.datastax.oss.pulsar.jms.PulsarSession.lambda$run$0(PulsarSession.java:659) at com.datastax.oss.pulsar.jms.PulsarConnection.executeInConnectionPausedLock(PulsarConnection.java:880) at com.datastax.oss.pulsar.jms.PulsarSession.run(PulsarSession.java:657) at com.datastax.oss.pulsar.jms.PulsarSession$ListenerThread.run(PulsarSession.java:1737) Caused by: org.apache.pulsar.client.api.PulsarClientException$InvalidConfigurationException: Transactions are not enabled at org.apache.pulsar.client.impl.PulsarClientImpl.newTransaction(PulsarClientImpl.java:1119) at com.datastax.oss.pulsar.jms.PulsarSession.startTransaction(PulsarSession.java:122)
eolivelli commented 2 years ago

You have to enable Transactions:

Please note that with Starlight for JMS 2.x we are using the Pulsar client 2.10, that is compatible only with Pulsar 2.9 onwards when you are using Transactions. This is due to a change in the wire protocol regarding transactions between Pulsar 2.8 and Pulsar 2.9.

By the way in Pulsar 2.8 transactions are not a stable feature, the first version of Pulsar when you can use Transactions in Production is 2.10.

akhil1000 commented 2 years ago

I already tried enableTransaction=true and also enableTransaction=false- both did not help

akhil1000 commented 2 years ago

But have to look at the Pulsar server doc

akhil1000 commented 2 years ago

I get below exception when I set enableTransaction=false.

2022-05-24 01:23:48,222 ERROR [org.jboss.resource.adapter.jms.inflow.JmsActivation] Unable to reconnect org.jboss.resource.adapter.jms.inflow.JmsActivationSpec@1a1203(ra=org.jboss.resource.adapter.jms.JmsResourceAdapter@81a08d destination=queues/persistent:///cmdm/public.globalvirtual.e2e.cmdm.changepub.Virtual destinationType=javax.jms.Queue tx=true durable=false reconnect=10 user= pass= maxMessages=1 minSession=1 maxSession=1 keepAlive=60000 useDLQ=false)

javax.jms.JMSException: Please enable transactions on PulsarConnectionFactory with enableTransaction=true at com.datastax.oss.pulsar.jms.PulsarSession.(PulsarSession.java:103) at com.datastax.oss.pulsar.jms.PulsarConnection.createSession(PulsarConnection.java:172) at com.datastax.oss.pulsar.jms.PulsarConnection.createSession(PulsarConnection.java:49) at org.jboss.resource.adapter.jms.inflow.JmsServerSession.setup(JmsServerSession.java:116) at org.jboss.resource.adapter.jms.inflow.JmsServerSessionPool.setupSessions(JmsServerSessionPool.java:197) at org.jboss.resource.adapter.jms.inflow.JmsServerSessionPool.start(JmsServerSessionPool.java:88) at org.jboss.resource.adapter.jms.inflow.JmsActivation.setupSessionPool(JmsActivation.java:656) at org.jboss.resource.adapter.jms.inflow.JmsActivation.setup(JmsActivation.java:359) at org.jboss.resource.adapter.jms.inflow.JmsActivation.handleFailure(JmsActivation.java:292) at org.jboss.resource.adapter.jms.inflow.JmsActivation$SetupActivation.run(JmsActivation.java:733) at org.jboss.resource.work.WorkWrapper.execute(WorkWrapper.java:205) at org.jboss.util.threadpool.BasicTaskWrapper.run(BasicTaskWrapper.java:260) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source)

eolivelli commented 2 years ago

This is because JBoss is creating a Transacted Session, so you must enable Transactions both in the JMS Client, but in order to do so you have to enable transactions on the Server as well. Please remember that you need Pulsar 2.10.x or Luna Streaming 2.10 (https://github.com/datastax/pulsar)

akhil1000 commented 2 years ago

Yes. I have asked to enable transactions on the server - I'll check on Pulsar version we are using Pulsar2.10.x on client side

eolivelli commented 2 years ago

The Pulsar client bundled inside the .RAR file is 2.10.x (actually it is the Luna Streaming fork, but the code is the same as Apache Pulsar 2.10).

eolivelli commented 2 years ago

from JBoss logs:

@81a08d destination=queues/persistent:///cmdm/public.globalvirtual.e2e.cmdm.changepub.Virtual destinationType=javax.jms.Queue tx=true durable=false reconnect=10 user= pass= maxMessages=1 minSession=1 maxSession=1 keepAlive=60000 useDLQ=false)

maybe there is a way to disable the requirement to use Transactions on JBoss side, if you cannot enable Transactions on Pulsar servers

akhil1000 commented 2 years ago

I tried to disable transaction with SessionTransacted=false I already have TransactionAttributeType to NOT_SUPPORTED and using no-tx-connection-factory

Transaction do show as disable in the log below but still PulsarConnection/PulserSession is trying to create new transaction.

[org.jboss.resource.adapter.jms.inflow.JmsActivation] Unable to reconnect org.jboss.resource.adapter.jms.inflow.JmsActivationSpec@11140e9(ra=org.jboss.resource.adapter.jms.JmsResourceAdapter@f7d64e destination=queues/persistent://mse/cmdm/public.globalvirtual.e2e.cmdm.changepub.Virtual destinationType=javax.jms.Queue tx=false ack=Auto-acknowledge durable=false reconnect=10 provider=java:/ACTIVEMQJMSNONXAProviderAsync user=ldcp.cmdm pass= maxMessages=1 minSession=1 maxSession=1 keepAlive=60000 useDLQ=false)

javax.jms.JMSException: Please enable transactions on PulsarConnectionFactory with enableTransaction=true at com.datastax.oss.pulsar.jms.PulsarSession.(PulsarSession.java:103) at com.datastax.oss.pulsar.jms.PulsarConnection.createSession(PulsarConnection.java:172) at com.datastax.oss.pulsar.jms.PulsarConnection.createSession(PulsarConnection.java:49) at org.jboss.resource.adapter.jms.inflow.JmsServerSession.setup(JmsServerSession.java:116) at org.jboss.resource.adapter.jms.inflow.JmsServerSessionPool.setupSessions(JmsServerSessionPool.java:197) at org.jboss.resource.adapter.jms.inflow.JmsServerSessionPool.start(JmsServerSessionPool.java:88) at org.jboss.resource.adapter.jms.inflow.JmsActivation.setupSessionPool(JmsActivation.java:656) at org.jboss.resource.adapter.jms.inflow.JmsActivation.setup(JmsActivation.java:359) at org.jboss.resource.adapter.jms.inflow.JmsActivation.handleFailure(JmsActivation.java:292) at org.jboss.resource.adapter.jms.inflow.JmsActivation$SetupActivation.run(JmsActivation.java:733) at org.jboss.resource.work.WorkWrapper.execute(WorkWrapper.java:205) at org.jboss.util.threadpool.BasicTaskWrapper.run(BasicTaskWrapper.java:260) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) ^C

eolivelli commented 2 years ago

I believe that in JavaEE/JakartaEE it is better to have Transactions enabled. I suggest to setup Transactions on the client and the broker. Please remember to use Pulsar 2.10+ or Luna Streaming 2.10+

akhil1000 commented 2 years ago

Sure - Do we need use Pulsar 2.10+ on the server side ? Or is that possible to have a flag to explicitly set session_transacted to false.

akhil1000 commented 2 years ago

Here it says - Pulsar 2.8.0 or later supports transactions. https://pulsar.apache.org/docs/txn-use/

eolivelli commented 2 years ago

yes, 2.8.0 supported transactions, but:

So if you need transactions it is really better that you use Pulsar 2.10+

Or is that possible to have a flag to explicitly set session_transacted to false. This is possible only if the JavaEE server (JBoss) supports it. if the Container asks to the JMS Provider to open a transacted session then it must open a transaction, otherwise the system won't work properly (leading to data loss or corruption)

Do you have problems in using Pulsar 2.10 ?

If you really want to go for Pulsar 2.8 server (and I suggest to not go that path, I am not sure you will be able to upgrade to newer Pulsar version if you enabled transactions, because other things may have changed) we have to port this patch to the 1.5 branch of Starlight for JMS, that still bundles the Pulsar 2.8 client.

akhil1000 commented 2 years ago

Hello Enrico, I tried to disable transactions with SessionTransacted=false I already have TransactionAttributeType to NOT_SUPPORTED and using no-tx-connection-factory Transaction from Jboss log shows tx=false but still, Transactions are getting created.

We are using 2.9.2 Pulsar Server and 2.10.x client version. I talked to the JMS team here they said transactions are supported in 2.9.2 and they are not going to move to 2.10.x anytime soon

I am not sure we can use transactions. I would use Client Acknowledge and no transaction. For that, we would need a flag to disable transactions on RAR implementation. like SessionTransacted= false.

Can we have SessionTransacted= false flag passed as a parameter to RAR.