vert-x3 / vertx-mqtt

Vert.x MQTT
Apache License 2.0
184 stars 87 forks source link

Best practice for multiple topic subscribe #99

Open abccbaandy opened 6 years ago

abccbaandy commented 6 years ago

Currently I know there are 2 ways(maybe there is more? ) to do this :

  1. reuse same mqtt client.
  2. create multi mqtt client(and multi Verticle) for each topic.

Is there any performance issue or resource(thread/CPU/memory) issue for it? for ex:

I know I can test it by myself, but I don't want to try & error.

Btw, my handler is simple, only do something like convert json to vo, save message to redis and send to another mqtt broker, but I have a lot of messages need to be handle in almost realtime.

Sammers21 commented 6 years ago

@abccbaandy, generally, I think you should reuse a single client for all the topics you have.

In fact, MQTT client is single threaded, and in order to achieve a good performance, you should ensure that there are no blocking operations in a handler code, or try to do them in a separated thread.

If your system is a high load system and you have already achieved a maximum performance when using a single client, you can try to do consumption from different topics with different clients, and there is also no guarantee that there are will be any performance impact.

Finally, my opinion is that you should try and figure out what works better for you.

Sammers21 commented 6 years ago

And it would be also great if you would share your results/measurements afterwards

ppatierno commented 6 years ago

I think that @Sammers21 already answer in a really good way. Using a single client could be the best option but having the handler doing the minimal in order to avoid blocking the Vert.x event loop.

abccbaandy commented 6 years ago

MQTT client is single threaded

What's the problem is that? Isn't the whole Vert.x verticle single thread(event loop thread)?

btw, I put all these code in one verticle, is that good idea? Or should I put these handler in different verticle and access some global mqtt/redis client created from main vericle.

you should ensure that there are no blocking operations in a handler code

This is my concern here. But as I said in the first comment

convert json to vo, save message to redis and send to another mqtt broker

Actually, I implement all these feature with Vert.x stuff

convert json to vo

save message to redis

send to another mqtt broker

Sammers21 commented 6 years ago

btw, I put all these code in one verticle, is that good idea? Or should I put these handler in different verticle and access some global mqtt/redis client created from main vericle.

I think you should have different verticles for different services access, and it would be better if they would communicate via the event bus. In fact, different verticles can be executed by the different event loops, so the more verticles you have, the better your application in terms of processor cores scaling.

And yes, should keep in mind that the code inside a verticle executed by the same thread.

There is also "Vert.x guide for Java developers"(https://vertx.io/docs/guide-for-java-devs/). So you might be interested in reading it.

abccbaandy commented 6 years ago

And it would be also great if you would share your results/measurements afterwards

sure, but I don't know how to make it right.

I'm trying to generate(publish) mass messages by vertx.

but I keep getting some error

Attempt to exceed the limit of 65535 inflight messages

Received PUBACK packet without having related PUBLISH packet in storage

my source code is like this

@Slf4j
public class MqttService extends AbstractVerticle {
    private int counter = 0;
    private int counter2 = 0;

    @Override
    public void start() throws Exception {
        MqttClientOptions mqttClientOptions = new MqttClientOptions().setMaxInflightQueue(Integer.MAX_VALUE);
        MqttClient mqttClient = MqttClient.create(vertx, mqttClientOptions);

        mqttClient.connect(1883, host, ar -> {
            if (ar.succeeded()) {
                vertx.setPeriodic(1000, h -> {
                    log.info("{} : {}, {}", LocalDateTime.now(), counter, counter2);
                    counter = 0;
                    counter2 = 0;
                    for (int i = 0; i < 20000; i++) {
                        mqttClient
                                .publish("test", Buffer.buffer("asdf"), qos, false, false, publishAr -> {
                                    if (publishAr.succeeded()) {
                                        counter++;
                                    }
                                })
                                .publishCompletionHandler(publishAr -> {
                                    counter2++;
                                });
                    }
                });
            }
        });
    }
}

my main

Vertx.vertx().deployVerticle(MqttService.class, new DeploymentOptions().setInstances(num));

To avoid those error, I try to use lower message per sec per instance(1 instance 20K/s messages now), start with 3 instances seems to work fine in my 4 core vm, means it can generate 60K messages per sec(3 x 20K ).

Anything I can do to improve this result?

btw it looks like it only consume CPU(almost all cpu), but not much memory(only use 2g in my case).

Sammers21 commented 6 years ago

In fact, it seems that you reached the MQTT 3.1.1 protocol limitation since you can have more than 65535 active packets: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html#_Toc442180838

abccbaandy commented 6 years ago

So MQTT is not a good solution for high concurrent messages system?

Sammers21 commented 6 years ago

@abccbaandy, I think that MQTT is fine for high concurrent messages system because you can easily scale horizontally and vertically. You just need to keep in mind that you are limited to 65535 concurrent messages, so even with the limitation a single broker/client able to handle millions of messages per second.

abccbaandy commented 6 years ago

a single broker/client able to handle millions of messages per second.

How? Theory, a single client can only handle 65535 concurrent messages(in fact, it can only handle 20K messages in my test).

To handle 65535+ concurrent messages, I have to scale out my mqtt verticle like I did in my test and it looks like a little resource wasted.