hivemq / hivemq-mqtt-client

HiveMQ MQTT Client is an MQTT 5.0 and MQTT 3.1.1 compatible and feature-rich high-performance Java client library with different API flavours and backpressure support
https://hivemq.github.io/hivemq-mqtt-client/
Apache License 2.0
824 stars 153 forks source link

Async Mqtt3Client does not release threads created by publishes() when disconnected #633

Open ASunc opened 3 weeks ago

ASunc commented 3 weeks ago

🐛 Bug Report

🔬 How To Reproduce

Steps to reproduce the behavior:

  1. ... create client by Mqtt3Client.builder()......buildAsync()
  2. call client.publishes(MqttGlobalPublishFilter.REMAINING, ....) // thread com.hivemq.client.mqtt-1-1 is created
  3. call client.connectIWth()....
  4. call client.disconnect().get() // thread is not released

Code sample

public class HiveTest {

public static void main(String[] args) throws InterruptedException, ExecutionException
{
    var test = new HiveTest();

    for (int i = 0; i < 10; i++) {
        var cli = test.connect();

        Thread.sleep(2 * 1000L);

        test.disconnect(cli);
    }

    System.out.println("Sleeping... com.hivemq.client.mqtt-n-n threads are still there.");
    Thread.sleep(300 * 1000L);
}

private void disconnect(Mqtt3AsyncClient cli) throws InterruptedException, ExecutionException
{
    cli.disconnect().get();
}

public Mqtt3AsyncClient connect() throws InterruptedException, ExecutionException
{
    var client = Mqtt3Client.builder()
            .identifier("myid")
            .serverHost("broker")
            .serverPort(1883)
            .addDisconnectedListener(ctx -> {

                System.err.println("MQTT disconnected from broker: " + ctx.getCause());
            })
            .addConnectedListener(ctx -> {

                System.out.println("MQTT connected to broker");
            })
            .buildAsync();

    client.publishes(MqttGlobalPublishFilter.REMAINING, pub -> {

        try {

            String payload = new String(pub.getPayloadAsBytes(), "UTF-8");
            System.out.println("MESSAGE:" + pub.getTopic().toString() + ":" + payload);
        }
        catch (UnsupportedEncodingException e) {

            System.err.println("Bad payload encoding.");
        }
    });

    client.connectWith().cleanSession(false).keepAlive(60).send().thenCompose(connAck -> {

        return client.subscribeWith()
                .addSubscription(Mqtt3Subscription.builder().topicFilter("test").build())
                .send();

    }).get();

    return client;
}

}

Environment

Where are you running/using this client? Windows 11

Hardware or Device? Dell Laptop

What version of this client are you using? 1.3.3

JVM version? 11.0.22.7

Operating System? Windows 11

Which MQTT protocol version is being used? 3

Which MQTT broker (name and version)? mosquitto 2.0.18

Screenshots

📈 Expected behavior

When disconnecting, I would expect that thread would be terminated. Don't confuse this thread with various RxComputation threads, I know that number of those are capped.

📎 Additional context