moquette-io / moquette

Java MQTT lightweight broker
http://moquette-io.github.io/moquette/
Apache License 2.0
2.3k stars 818 forks source link

BrokerInterceptor thread pool never closing on serverStop #506

Open bobrowskim opened 4 years ago

bobrowskim commented 4 years ago

Expected behavior

When using moquette-io as embedded broker in application when invoking method stopServer from class Server it would be expected that all related threads are closing.

Actual behavior

Invoking stopServer from class Server does not call to stop BrokerInterceptor thus the threads pool it created is never shutdown and some thread is kept in awaiting state. This leads to application hang running.

Steps to reproduce

  1. Start Server
  2. Connect client to server, publish and subscribe to some topics
  3. Stop server Server.stopServer
    • Thread pool created by BrokerInterceptor is still in waiting state
  4. Application never stops.

Minimal yet complete reproducer code (or URL to code) or complete log file

Here is sample code

Run class NotClosingApplication.

Moquette MQTT version

Tested on version: 0.12.1

JVM version (e.g. java -version)

Tested with: 1.8_162, 11.0.1 and 13

OS version (e.g. uname -a)

Windows 10 and OSx 10.11.6

mjacoby commented 3 years ago

Any progress on this? I just stumbled across the exact same issue and could reproduce it with Moquette version 0.11, 0.13 and 0.14.

Actually, my sample scenario is even a bit simpler: start server, do publish, stop server; which results in the application never closing as there is still a thread-pool thread alive. Just starting and stopping a server without doing a publish works just fine, however adding a call to publish(...) seems to spawn that thread that is never closed.

Here's my smaple code:

package MoquetteTest;

import io.moquette.BrokerConstants;
import io.moquette.broker.Server;
import io.moquette.broker.config.IConfig;
import io.moquette.broker.config.MemoryConfig;
import java.io.IOException;
import java.util.Properties;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class Main {

    private static final String HOST = "localhost";
    private static final int PORT = 1883;
    private static Server server;

    public static void main(String[] args) throws IOException, MqttException, InterruptedException {
        startServer();
        publish();
        Thread.sleep(2000);
        stopServer();
    }

    private static void startServer() throws IOException {
        server = new Server();
        IConfig config = new MemoryConfig(new Properties());
        config.setProperty(BrokerConstants.PORT_PROPERTY_NAME, Integer.toString(PORT));
        config.setProperty(BrokerConstants.HOST_PROPERTY_NAME, HOST);
        config.setProperty(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, Boolean.toString(true));
        server.startServer(config);
    }

    private static void stopServer() {
        server.stopServer();
    }

    private static void publish() throws MqttException {
        String endpoint = "tcp://" + HOST + ":" + PORT;
        String clientId = "FooBar";
        MqttClient client = new MqttClient(endpoint, clientId, new MemoryPersistence());
        client.connect(new MqttConnectOptions());
        MqttMessage msg = new MqttMessage("hello world".getBytes());
        msg.setQos(0);
        client.publish("foo/bar", msg);
        if (client.isConnected()) {
            client.disconnect();
        }
        client.close(true);
        client = null;
    }

}
bobrowskim commented 3 years ago

@mjacoby I have found where is the problem lying with it, but never heard back from maintainers of the projects so I was not putting any PR with fix. If you want to fix this "dirty" from you code, then add below code in place where you want to stop it:

Server server = new Server();
...
Field field = server.getClass().getDeclaredField("interceptor");
field.setAccessible(true);
BrokerInterceptor interceptor = (BrokerInterceptor) field.get(server);
Method stop = interceptor.getClass().getDeclaredMethod("stop");
stop.setAccessible(true);
stop.invoke(interceptor);

That easily leads to core problem which is ExecutorService never being shutdown properly becasue BrokerInerceptor is not invoked to stop at Server class :)

mjacoby commented 3 years ago

Thanks for the feedback @bobrowskim ! Your hack works for me, so at least there is a quick workaround. However, it would be nice to have this bug properly fixed. Maybe I'll find some time in the future to come up with a PR.