vert-x3 / vertx-mqtt

Vert.x MQTT
Apache License 2.0
184 stars 86 forks source link

The problem of receiving message length limitation #191

Closed liuchengts closed 3 years ago

liuchengts commented 3 years ago

I am using version 3.8.5 to 4.0.0

Environmental description: jdk : OpenJDK 64-Bit Server 1.8.0_275 os : Ubuntu 18.04.3 LTS
os Kernel version 4.15.0-68-generic

There is a question about the length of the received message.

Please allow me to describe: I now have a 1047-character mqtt message, which cannot be received through vertx-mqtt v4.0.0, and once this message is sent to the mqtt server, the current vertx-mqtt receiving program will not receive any subsequent messages. I tested it afterwards, Use spring-integration-mqtt v5.2.3.RELEASE without replacing the mqtt server This phenomenon does not exist.

Therefore, please verify the current vertx-mqtt v3.8.5 to v4.0.0 range of this problem, thank you

vietj commented 3 years ago

do you have a reproducer for this issue @liuchengts ?

liuchengts commented 3 years ago

您是否有本期@liuchengts的转载者? Here is the client code:

private void client() { MqttClientOptions mqttClientOptions = new MqttClientOptions(); // mqttClientOptions.setMaxInflightQueue(9999); // mqttClientOptions.setAutoKeepAlive(true); mqttClient = MqttClient.create(vertx, mqttClientOptions); mqttClient.connect(config.getMqttPort(), config.getMqttIp(), c -> { if (c.succeeded()) { mqttClient.subscribe("lot-admin", 2); log.info("Connected to a server"); } else { log.error("Failed to connect to a server"); log.error("error", c.cause()); } }) .publishHandler(pub -> { Buffer buffer = pub.payload(); log.info("Content(as string) of the message: " + buffer.toString()); applicationContext.getHandleAction().handle(buffer.toJsonObject()); }); }

mqtt-service docker image: zlee/mqtt:latest

vietj commented 3 years ago

@liuchengts can you provide a full project reproducing the issue ?

liuchengts commented 3 years ago

@liuchengts您可以提供一个完整的项目来重现该问题吗?

Well, I wrote an example specifically for this, so check it out here

https://github.com/liuchengts/vertx-mqtt-demo

Gondolav commented 3 years ago

I have a similar problem, large messages are not received by the MQTT server.

vietj commented 3 years ago

thanks I'll have a look soon

pigbayspy commented 3 years ago

如果你没有使用 MqttClientOptions.setMaxMessageSize 来设置最大消息长度的话,MqttClientImpl 的 initChannel 方法中会使用默认的 MqttDecoder 的构造方法,而默认的构造方法设置了 MqttDecoder 的 maxBytesInMessage 值为 8092,大小超过此限制的消息会处理失败。关键的代码如下:

private void initChannel(ChannelPipeline pipeline) {

    // add into pipeline netty's (en/de)coder
    pipeline.addBefore("handler", "mqttEncoder", MqttEncoder.INSTANCE);

    if (this.options.getMaxMessageSize() > 0) {
      pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder(this.options.getMaxMessageSize()));
    } else {
      // max message size not set, so the default from Netty MQTT codec is used
      pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder());
    }
    // ...
}
liuchengts commented 3 years ago

如果你没有使用 MqttClientOptions.setMaxMessageSize 来设置最大消息长度的话,MqttClientImpl 的 initChannel 方法中会使用默认的 MqttDecoder 的构造方法,而默认的构造方法设置了 MqttDecoder 的 maxBytesInMessage 值为 8092,大小消息会超过限制的处理失败。关键的代码如下:

私有 无效initChannel(ChannelPipeline管道){

    //添加到管道 netty 的(en/de)编码器
    管道中。addBefore( “处理”, “ mqttEncoder ”, MqttEncoder 。 INSTANCE);

    if ( this . options . getMaxMessageSize() >  0 ) {
      管道。addBefore( “处理程序”,“ mqttDecoder ”,新 MqttDecoder(此。选项。 getMaxMessageSize()));
    } else {
       //未设置最大消息大小,因此使用 Netty MQTT 编解码器的默认值
      管道。addBefore( " handler " , " mqttDecoder " , new  MqttDecoder ());
    }
    // ... 
}

是的,我仔细看了这段代码当不指定的时候默认为-1 ,会进行new MqttDecoder ()中默认的maxBytesInMessage=8092,我查阅了mqtt相关的mosquitto.conf:

# This option sets the maximum publish payload size that the broker will allow.
# Received messages that exceed this size will not be accepted by the broker.
# The default value is 0, which means that all valid MQTT messages are
# accepted. MQTT imposes a maximum payload size of 268435455 bytes. 
#message_size_limit 0

因此我在代码中加入(第三行代码),重新运行了示例,没有发现异常

 MqttClientOptions mqttClientOptions = new MqttClientOptions();
      mqttClientOptions.setAutoKeepAlive(true);
      mqttClientOptions.setMaxMessageSize(268435455);
Gondolav commented 3 years ago

Why was this closed?

liuchengts commented 3 years ago

Why was this closed? After the above problem location and repair tests, the problem has been resolved