quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.88k stars 2.71k forks source link

Reactive Messaging JMS connector support #30943

Open mpumd opened 1 year ago

mpumd commented 1 year ago

Description

Hi everybody,

Related to the zulip topic, https://quarkusio.zulipchat.com/#narrow/stream/187030-users/topic/Artemis.20jms.20.2B.20smallrye-jms.20connector, we are looking for a smart way to talk with jboss eap6 by jms message (configurable protocol like hornetq core). Initial tests works by the old school way (create factory -> connexion -> session...). With the quarkus-artemis-jms extension, we were able to inject a ConnectionFactory, and produce/consume messages against our hornetq broker. It compile natively.

@Path("/activemqjms2")
@RegisterForReflection(targets = { HornetQClientProtocolManagerFactory.class, ActiveMQConnectionFactory.class })
public class ActiveMQJMS2Resource {

    private static final Logger log = LoggerFactory.getLogger(ActiveMQJMS2Resource.class);     

    @Inject ConnectionFactory factory;     

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public String connection() throws JMSException {

        Queue queue = ActiveMQDestination.createQueue(PacketImpl.OLD_QUEUE_PREFIX + QUEUE_NAME);

        try (Connection connection = factory.createConnection()) {
            int count = 0;
            connection.start();
            try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
                try (MessageProducer producer = session.createProducer(queue)) {
                    String text = "coucou at " + new Date();
                    for (int i = 0; i < 10; i++) {
                        producer.send(session.createTextMessage(text + " - " + i));
                    }
                }
                // session.commit();
            }
            log.info("sent 10 messages");
            try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
                try (MessageConsumer consumer = session.createConsumer(queue)) {
                    Message message = consumer.receive(2000);
                    while (message != null) {
                        if (message instanceof TextMessage textMessage) {
                            log.info("received " + textMessage.getText());
                        } else {
                            log.info("received " + message);
                        }
                        count++;
                        message = consumer.receive(2000);
                    }
                }
                // session.commit();
            }
            return "received messages: " + count;
        }
    }
}
<dependency>
    <groupId>io.quarkiverse.artemis</groupId>
    <artifactId>quarkus-artemis-jms</artifactId>
    <version>2.0.4</version>
</dependency>
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>artemis-hqclient-protocol</artifactId>
    <version>2.26.0</version>
</dependency>
quarkus:
  artemis:
    url: (tcp://jboss1:5049,tcp://jboss2:5049)?protocolManagerFactoryStr=org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory
    username: user
    password: pass

Although this is technically working, we would like an improved jms support, such as what is already provided by the smallrye jms connector.

here are the main requirements:

XA support is not a requirement.

here is what has been done on SB:

@Component
public class ValidationBrokerListener {     

    @JmsListener(
        destination = "com.x.jee.ValidationBroker.jms.Out.Queue",
        selector = "event = 'MyEvent' AND participant = 'X'",
        concurrency = "1-10")
    @Transactional
    public void processMessage(Message message) throws JMSException, InterruptedException {
        // ...
    }
}

or in the old days of EAP (nothing fancy):

@MessageDriven(name = "MyMDB", activationConfig = {
    @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
    @ActivationConfigProperty(propertyName = "maxSession", propertyValue = "${Messaging.maxSession:3}"),
    @ActivationConfigProperty(propertyName = "destination", propertyValue = "com/x/Messaging/MyQueue"),
    @ActivationConfigProperty(propertyName = "rebalanceConnections", propertyValue = "true") 
})
@ResourceAdapter("${MyRarName}")
public class MyMDB implements MessageListener {  
    public void onMessage(Message msg) {
        ...
    }
}

Many thanks.

Implementation ideas

No response

quarkus-bot[bot] commented 1 year ago

You added a link to a Zulip discussion, please make sure the description of the issue is comprehensive and doesn't require accessing Zulip

This message is automatically generated by a bot.

quarkus-bot[bot] commented 1 year ago

/cc @cescoffier (reactive-messaging), @ozangunalp (reactive-messaging)

vsevel commented 1 year ago

hello, have you considered this issue? what is the status? thanks

cescoffier commented 1 year ago

We would like to do it, but we have other priorities at the moment (like getting Quarkus 3 out of the door).

vsevel commented 1 year ago

ok. understood. thanks.

vsevel commented 1 year ago

Hello, any news on this? Is it part of a roadmap? Thanks

Serkan80 commented 1 year ago

Have you looked at SmallRye Reactive JMS ?

I’m currently also trying to use it, but it seems like the IBM MQ driver isn’t ported to Jakarta yet.

And I also find the documentation lacking. Besides adding the smallrye-jms dependency, I also had to add the quarkus-reactive-amqp & jakarta-jms-api to make my application at least startup without errors.

vsevel commented 1 year ago

Thanks @Serkan80 . I missed the news

vsevel commented 1 year ago

anybody knows if it supports:

what version of quarkus does it work with? thanks

Serkan80 commented 1 year ago

See here for a list of available configurations and this will answer most of your questions.

what version of quarkus does it work with?

Currently, none. I was also struggling to make it work with Quarkus until I saw Clement saying in another thread that Quarkus doesn’t support reactive-jms and it’s really unfortunate.

However, you might have a chance using reactive messaging if your message broker supports amqp.

cescoffier commented 1 year ago

Unfortunately, we still didn't have time to integrate (and improve) the JMS connector. While on the roadmap, I can't tell when we will do it.

vsevel commented 1 year ago

we now use the ironjacamar extension

cescoffier commented 1 year ago

@vsevel Does ironjacamar integrate with reactive messaging? Handle the duplicated contexts and virtual threads?

If not, we should re-open this issue.

vsevel commented 1 year ago

I would say no. although I have no idea on how to approach it.

Serkan80 commented 12 months ago

@vsevel I've made smallrye-reactive-jms work with IBM MQ, see my post on stackoverflow:

https://stackoverflow.com/questions/60885669/quarkus-ibm-mq-extension/77625283#77625283