hivemq / hivemq-mqtt-client

HiveMQ MQTT Client is an MQTT 5.0 and MQTT 3.1.1 compatible and feature-rich high-performance Java client library with different API flavours and backpressure support
https://hivemq.github.io/hivemq-mqtt-client/
Apache License 2.0
847 stars 158 forks source link

Concurrent handling of messages on high load #494

Closed rubengees closed 3 years ago

rubengees commented 3 years ago

Expected behavior

When specifying a custom executor via the applicationScheduler on the builder, all threads of the underlying executor should be used to concurrently handle messages.

Actual behavior

On high load the client seems to route the work only to a single thread. See reproduction code.

To Reproduce

Steps

Reproducer code

fun main() {
    val subscribeClient = Mqtt5Client.builder()
        ...
        .executorConfig()
        .applicationScheduler(Schedulers.from(Executors.newFixedThreadPool(2)))
        .applyExecutorConfig()
        .build()
        .toAsync()

    subscribeClient.connect().get()

    subscribeClient.subscribe(Mqtt5Subscribe.builder().topicFilter("test").build()) {
        println("Received ${it.payloadAsBytes.decodeToString()} on ${Thread.currentThread().name}")

        // Simulate a delay of 1000ms for every message.
        Thread.sleep(1000)
    }

    val publishClient = Mqtt5Client.builder()
        ...
        .build()
        .toAsync()

    publishClient.connect().get()

    repeat(times = Int.MAX_VALUE) { counter ->
        val message = Mqtt5Publish.builder().topic("test").payload(counter.toString().toByteArray()).build()

        publishClient.publish(message).get()

        println("Publishing $counter")

        // Send a message every 800ms. We have 2 threads so this should work.
        Thread.sleep(800)
    }
}

When running, this has the following output:

Publishing 0
Received 0 on pool-2-thread-2
Publishing 1
Received 1 on pool-2-thread-2
Publishing 2
Received 2 on pool-2-thread-2
Publishing 3
Received 3 on pool-2-thread-2
Publishing 4
Publishing 5
Received 4 on pool-2-thread-2
Publishing 6
Received 5 on pool-2-thread-2
Publishing 7
Received 6 on pool-2-thread-2
Publishing 8
Received 7 on pool-2-thread-2
Publishing 9
Received 8 on pool-2-thread-2
Publishing 10
Publishing 11
Received 9 on pool-2-thread-2
Publishing 12
Received 10 on pool-2-thread-2

It can be seen that only one thread is used. The messages are getting delayed: After 10 messages, the subscriber is two behind.

When running the same code but with a delay in the publisher of 1100 instead of 800, the output is this:

Publishing 0
Received 0 on pool-2-thread-2
Publishing 1
Received 1 on pool-2-thread-1
Publishing 2
Received 2 on pool-2-thread-2
Publishing 3
Received 3 on pool-2-thread-1
Publishing 4
Received 4 on pool-2-thread-2
Publishing 5
Received 5 on pool-2-thread-1
Publishing 6
Received 6 on pool-2-thread-2
Publishing 7
Received 7 on pool-2-thread-1
Publishing 8
Received 8 on pool-2-thread-2
Publishing 9
Received 9 on pool-2-thread-1
Publishing 10
Received 10 on pool-2-thread-2

Here, all threads are used, even though it would not be necessary.

Details

Is this a usage problem, e.g. not configuring the thread pool correctly or might there be a bug either in this client or one of the dependencies? If not, what can be done to actually concurrently handle messages?

SgtSilvio commented 3 years ago

Hi @rubengees Sorry for the delayed response. The same subscribe callback is called sequentially. This is necessary to uphold the MQTT message ordering guarantees (Actually it would be possible to distribute messages with different topics to different threads and still uphold the MQTT ordering guarantees. This may be done for the next major version, but your test uses the same topic so the callback will still be called in sequence in your case).

what can be done to actually concurrently handle messages?

You can of course decide to not care about the ordering and distribute the messages in the callback yourself like:

client.subscribe(...) { publish ->
    threadPool.execute {
        process(publish)
    }
}
rubengees commented 3 years ago

Alright, thanks for the answer!

vivianazimbone commented 2 years ago

@SgtSilvio are you planning to add this improvement for the 2.0.0? It could be useful in case of subscription via wildcard.