jetlinks / jetlinks-community

JetLinks 基于Java8,Spring Boot 2.x ,WebFlux,Netty,Vert.x,Reactor等开发, 是一个全响应式的企业级物联网平台。支持统一物模型管理,多种设备,多种厂家,统一管理。统一设备连接管理,多协议适配(TCP,MQTT,UDP,CoAP,HTTP等),屏蔽网络编程复杂性,灵活接入不同厂家不同协议等设备。实时数据处理,设备告警,消息通知,数据转发。地理位置,数据可视化等。能帮助你快速建立物联网相关业务系统。
https://www.jetlinks.cn/
Apache License 2.0
5.59k stars 1.68k forks source link

MqttClient receive message error: MqttMessage cannot be cast to class MqttPublishMessage #26

Closed Gu-Li closed 3 years ago

Gu-Li commented 3 years ago

jdk:openjdk11.0.8 jetlinks-community versoin: 1.6 netty-mqtt-client version: 1.0.0

mqtt客户端在接收数据长度较长的message时报错:

java.lang.ClassCastException: class io.netty.handler.codec.mqtt.MqttMessage cannot be cast to class io.netty.handler.codec.mqtt.MqttPublishMessage (io.netty.handler.codec.mqtt.MqttMessage and io.netty.handler.codec.mqtt.MqttPublishMessage are in unnamed module of loader 'app')
    at org.jetlinks.mqtt.client.MqttChannelHandler.channelRead0(MqttChannelHandler.java:31) ~[netty-mqtt-client-1.0.0.jar:na]
    at org.jetlinks.mqtt.client.MqttChannelHandler.channelRead0(MqttChannelHandler.java:11) ~[netty-mqtt-client-1.0.0.jar:na]
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at org.jetlinks.mqtt.client.MqttPingHandler.channelRead(MqttPingHandler.java:39) ~[netty-mqtt-client-1.0.0.jar:na]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) ~[netty-handler-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323) ~[netty-codec-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310) ~[netty-codec-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.handler.codec.ReplayingDecoder.callDecode(ReplayingDecoder.java:349) ~[netty-codec-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278) ~[netty-codec-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:677) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491) ~[netty-transport-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905) ~[netty-common-4.1.34.Final.jar:4.1.34.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.34.Final.jar:4.1.34.Final]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

初步分析错误源于数据长度超过了MqttDecoder.DEFAULT_MAX_BYTES_IN_MESSAGE=8092

private static final int DEFAULT_MAX_BYTES_IN_MESSAGE = 8092;

当通过MqttClient的create静态方法创建org.jetlinks.mqtt.client.MqttClientImpl时,MqttClientImpl使用MqttDecoder的无参构造创建MqttDecoder实例,而这里没有找到相关配置入口。

final class MqttClientImpl implements MqttClient {
...
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            if (sslContext != null) {

                SSLEngine engine = sslContext.newEngine(ch.alloc(), host, port);
                sslEngineConsumer.accept(engine);
                ch.pipeline().addFirst("ssl", new SslHandler(engine));
            }

            ch.pipeline().addLast("mqttDecoder", new MqttDecoder());
            ch.pipeline().addLast("mqttEncoder", MqttEncoder.INSTANCE);
            ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(MqttClientImpl.this.clientConfig.getTimeoutSeconds(), MqttClientImpl.this.clientConfig.getTimeoutSeconds(), 0));
            ch.pipeline().addLast("mqttPingHandler", new MqttPingHandler(MqttClientImpl.this.clientConfig.getTimeoutSeconds()));
            ch.pipeline().addLast("mqttHandler", new MqttChannelHandler(MqttClientImpl.this, connectFuture));
        }
...
}
public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
...
    public MqttDecoder() {
      this(DEFAULT_MAX_BYTES_IN_MESSAGE);
    }

    public MqttDecoder(int maxBytesInMessage) {
        super(DecoderState.READ_FIXED_HEADER);
        this.maxBytesInMessage = maxBytesInMessage;
    }
...
}

请问能否在MqttClientConfig中增加针对MqttDecoder的maxBytesInMessage字段配置?

Gu-Li commented 3 years ago

已通过vertx client解决,感谢老周指点!