reinhapa / rabbitmq-cdi

Provides a JavaEE Event ←→ RabbitMQ bridge
MIT License
17 stars 11 forks source link

Lost connection not detected by ConnectListener #505

Open arjenjobse opened 2 weeks ago

arjenjobse commented 2 weeks ago

Our ConnectListener implementation does not act as expeted. We're using rabbitmq-cdi version 2.0.0.

Our EventBinder:

import com.rabbitmq.client.BuiltinExchangeType;

import jakarta.inject.Inject;
import net.reini.rabbitmq.cdi.EventBinder;
import net.reini.rabbitmq.cdi.ExchangeDeclaration;
import nl.stater.commons.interestservicestore.InterestserviceStoreItem;
import nl.stater.interestservicestore.config.encoding.JsonEncoder;

public class RabbitBinder extends EventBinder {

    @Inject 
    RabbitConnectionListener listener;

    @Override
    protected void bindEvents() {
        ExchangeDeclaration interestservicStoreExchange = declarerFactory()
                .createExchangeDeclaration("exchangename")
                    .withType(BuiltinExchangeType.DIRECT)
                    .withAutoDelete(false)
                    .withDurable(true);

                bind(InterestserviceStoreItem.class)
                    .toExchange("exchangename")
                    .withDeclaration(interestservicStoreExchange)
                    .withRoutingKey("routingkey")
                    .withEncoder(new JsonEncoder<>());

                registerConnectionListener(listener);
    }
}

Our Binder Initializer:

import com.rabbitmq.client.Address;

import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.servlet.ServletContextEvent;
import jakarta.servlet.ServletContextListener;
import jakarta.servlet.annotation.WebListener;
import nl.stater.commons.blueriq.foundation.exception.ApplicationException;

@WebListener
public class ContentstoreConfig implements ServletContextListener {

    @Inject
    private RabbitBinder rabbitBinder;

    @PostConstruct
    public void initialize() {
        try {
            rabbitBinder.configuration()
            .addHost(new Address("hostname", 5671))
            .setUsername("user")
            .setPassword("pwd")
            .setVirtualHost("virtualhost")
            .setSecure(true)
            .setConnectTimeout(10000)
            .setConnectRetryWaitTime(10000)
            .setRequestedConnectionHeartbeatTimeout(3);
            rabbitBinder.initialize();
        } catch (Exception e) {
            throw new ApplicationException("Inializiation of RabbitMQ Binder failed", e);
        }
    }

    @Override
    public void contextInitialized(ServletContextEvent sce) {
    }

}

Our ConnectionListener implementation:

import com.rabbitmq.client.Connection;

import jakarta.enterprise.context.ApplicationScoped;
import net.reini.rabbitmq.cdi.ConnectionListener;

@ApplicationScoped
public class RabbitConnectionListener implements ConnectionListener {

    private boolean connected = false;

    @Override
    public void onConnectionEstablished(Connection connection) {
        connected = true;
    }

    @Override
    public void onConnectionLost(Connection connection) {
        connected = false;
    }

    @Override
    public void onConnectionClosed(Connection connection) {
        connected = false;
    }

    public boolean isConnected() {
        return connected;
    }
}

Connecting to RabbitMQ, firing events to Rabbit etc works fine.

At startup of our application, onConnectionEstablished (in our ConnectionListener implementation) is being called. And when shutting down, the onConnectionClosed method.

But when I disrupt the connection between our application and RabbitMQ, I expect the onConnectionLost being called. I tested the disruption by adding a outbound firewall rule with a port block on port 5671. After enabling this rule, logging says:

2024-09-10 10:05:47,457 ERROR [com.rabbitmq.client.impl.ForgivingExceptionHandler] (AMQP Connection 172.22.0.195:5671) Caught an exception during connection recovery!: java.net.SocketException: Permission denied: getsockopt
    at java.base/sun.nio.ch.Net.pollConnect(Native Method)
    at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:682)
    at java.base/sun.nio.ch.NioSocketImpl.timedFinishConnect(NioSocketImpl.java:542)
    at java.base/sun.nio.ch.NioSocketImpl.connect(NioSocketImpl.java:592)
    at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:327)
    at java.base/java.net.Socket.connect(Socket.java:751)
    at java.base/sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:304)
    at deployment.interestdeterminationservice.war//com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:61)
    at deployment.interestdeterminationservice.war//com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:69)
    at deployment.interestdeterminationservice.war//com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConnection(AutorecoveringConnection.java:628)
    at deployment.interestdeterminationservice.war//com.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection.java:589)
    at deployment.interestdeterminationservice.war//com.rabbitmq.client.impl.recovery.AutorecoveringConnection.lambda$addAutomaticRecoveryListener$3(AutorecoveringConnection.java:524)
    at deployment.interestdeterminationservice.war//com.rabbitmq.client.impl.AMQConnection.notifyRecoveryCanBeginListeners(AMQConnection.java:839)
    at deployment.interestdeterminationservice.war//com.rabbitmq.client.impl.AMQConnection.doFinalShutdown(AMQConnection.java:816)
    at deployment.interestdeterminationservice.war//com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:700)
    at java.base/java.lang.Thread.run(Thread.java:1583)

So I ask myself : why does our ConnectionListener implementation not detect that disruption?

Hope you can help us. Many thanks in advance!

reinhapa commented 1 week ago

@arjenjobse looked into it today and it looks as this will not be a small change to be made. I will need some time to do this as it has to be done in my spare time...