smallrye / smallrye-reactive-messaging

SmallRye Reactive Messaging
http://www.smallrye.io/smallrye-reactive-messaging/
Apache License 2.0
228 stars 174 forks source link

The JMS connector does not reconnect after disconnections #1420

Open coiouhkc opened 2 years ago

coiouhkc commented 2 years ago

In my current Quarkus project i'm using Emitter together with smallrye-reactive-messaging-jms to send messages to a JMS queue. It works fine, until the first connectivity issue with the message server (IBM MQ in my case) - after the first attempt to send the message to an unavailable host:port i keep getting java.lang.IllegalStateException: SRMSG00028: The subscription to <queue-name> has been cancelled. Is there a way to restart the emitter (get it back into RUNNING state)? If not, what should be the correct (Quarkus/smallrye) way of handling this kind of issues to build a more resilient system?

cescoffier commented 2 years ago

I guess the problem is in the JMS connector. The connector is not integrated into Quarkus as it's more a best-effort thing at the moment. It probably does not recover after the loss of connection.

coiouhkc commented 2 years ago

Ok, @cescoffier, thanks for the quick reply!

wmilewski commented 1 year ago

Note that while the connector does not support reconnecting, the underlying JMS implementation could. Unfortunately I'm not familiar with IBM MQ. The issue I came across recently was with connecting to ActiveMQ and failing for the same reason as described here. Fixed the issue by configuring the ActiveMQ client https://activemq.apache.org/how-can-i-support-auto-reconnection.

coiouhkc commented 1 year ago

It's an interesting feature of ActiveMQ, I couldn't find anything similar for IBM MQ. In our project we ended up creating Proxyies capable of detecting the above mentioned connectivity issue (and a couple more) and recreating the connection.

fsantagostinobietti commented 8 months ago

hi @coiouhkc one year later I had the same issue with IBM MQ. Looking to IBM docs I realised that MQ client actually has an automatic reconnect option: MQConnectionFactory.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT)

So, putting things together I defined a connection factory (as described in smallrye reactive messaging doc) with MQ client properties :

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.jms.ConnectionFactory;

import com.ibm.msg.client.jakarta.jms.JmsConnectionFactory;
import com.ibm.msg.client.jakarta.jms.JmsFactoryFactory;
import com.ibm.msg.client.jakarta.wmq.WMQConstants;

@ApplicationScoped
public class ConnectionFactoryBean {

    @Produces
    ConnectionFactory factory() {
        JmsFactoryFactory ff = JmsFactoryFactory.getInstance(JmsConstants.JAKARTA_WMQ_PROVIDER);
        JmsConnectionFactory factory = ff.createConnectionFactory(); // actually com.ibm.mq.jakarta.jms.MQConnectionFactory
        MQConnectionFactory mqFactory = (com.ibm.mq.jakarta.jms.MQConnectionFactory)factory;
        mqFactory.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
        mqFactory.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT); // <-- enable automatic reconnection
        mqFactory.setStringProperty(WMQConstants.WMQ_HOST_NAME, "localhost");
        mqFactory.setIntProperty(WMQConstants.WMQ_PORT, 1414);
        mqFactory.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, "QM1");
        mqFactory.setStringProperty(WMQConstants.WMQ_CHANNEL, "DEV.APP.SVRCONN");
        mqFactory.setStringProperty(WMQConstants.WMQ_APPLICATIONNAME, "MySample");

        return mqFactory;
    }

}

This way client can automatically recover from a temporary network/server failure.

I hope this can be useful to someone else.

coiouhkc commented 8 months ago

@fsantagostinobietti - i left the project and don't have access to the codebase anymore, so I cannot verify the initial case, but thanks for providing a documented solution!

nojomyth-dev commented 3 months ago

Is there any chance to integrate this into the connector so it behaves like the smallrye MQTT one? The approach provided by @fsantagostinobietti is suboptimal for the IBM MQ client, since i could not find a way for it to provide logging on connection loss nor when it starts reconnecting / attempts the reconnect nor for providing time intervals (only fixed time) (etc.)