apache / iotdb

Apache IoTDB
https://iotdb.apache.org/
Apache License 2.0
5.55k stars 1k forks source link

Allow Bridging from existing MQTT Brokers to internal service #3280

Open smaktacular opened 3 years ago

smaktacular commented 3 years ago

Is your feature request related to a problem? Please describe. I like the idea of the integrated mqtt service. But I already have configured all my devices to a dedicated broker (mosquitto). I am now looking for an easy way to bridge my existing broker to the integrated mqtt service.

Describe the solution you'd like Allow bridging from other brokers like e.g. Mosquitto

Describe alternatives you've considered Create a dedicated client that subscribes to # and forwards all messages to IOTDB and vice versa.

Additional context Mosquitto.conf: persistence true persistence_location /mosquitto/data/ log_dest file /mosquitto/log/mosquitto.log

connection iotdbltbridge address 192.168.1.13 topic # remote_username root remote_password ****

When trying to setup a bridge on mosquitto side I get an exception on IOTDB side:

021-05-27 09:40:58,357 [nioEventLoopGroup-3-5] ERROR i.m.b.NewNettyMQTTHandler:87 - Unexpected exception while processing MQTT message. Closing Netty channel. CId=null 
java.io.IOException: invalid massage
    at io.moquette.broker.NettyUtils.validateMessage(NettyUtils.java:86)
    at io.moquette.broker.metrics.MQTTMessageLogger.logMQTTMessage(MQTTMessageLogger.java:61)
    at io.moquette.broker.metrics.MQTTMessageLogger.logMQTTMessageRead(MQTTMessageLogger.java:50)
    at io.moquette.broker.metrics.MQTTMessageLogger.channelRead(MQTTMessageLogger.java:45)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.moquette.broker.metrics.MessageMetricsHandler.channelRead(MessageMetricsHandler.java:50)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:297)
    at io.netty.handler.codec.ReplayingDecoder.callDecode(ReplayingDecoder.java:350)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.moquette.broker.metrics.BytesMetricsHandler.channelRead(BytesMetricsHandler.java:51)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1414)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:945)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:146)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:545)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:499)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: io.netty.handler.codec.mqtt.MqttUnacceptableProtocolVersionException: MQIsdp and -125 are not match
    at io.netty.handler.codec.mqtt.MqttVersion.fromProtocolNameAndLevel(MqttVersion.java:55)
    at io.netty.handler.codec.mqtt.MqttDecoder.decodeConnectionVariableHeader(MqttDecoder.java:216)
    at io.netty.handler.codec.mqtt.MqttDecoder.decodeVariableHeader(MqttDecoder.java:182)
    at io.netty.handler.codec.mqtt.MqttDecoder.decode(MqttDecoder.java:88)
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
    at io.netty.handler.codec.ReplayingDecoder.callDecode(ReplayingDecoder.java:367)
    ... 32 common frames omitted
2021-05-27 09:41:10,498 [nioEventLoopGroup-3-4] INFO  i.m.broker.MQTTConnection:253 - Notifying connection lost event. CId: mqtt-explorer-57cdbe16, channel: [id: 0x7a76b80d, L:0.0.0.0/0.0.0.0:1883] 
github-actions[bot] commented 3 years ago

Hi, this is your first issue in IoTDB project. Thanks for your report. Welcome to join the community!

jimagic commented 3 years ago

I can reproduce this issue with 0.12 source code, but from my server logs which indicate the builtin moquette does not support '#' wildcards, @smaktacular you can replace '#' with any others, as the topic is useless for iotdb in current version.

Caused by: io.netty.handler.codec.DecoderException: invalid publish topic name: # (contains wildcards) at io.netty.handler.codec.mqtt.MqttDecoder.decodePublishVariableHeader(MqttDecoder.java:274) at io.netty.handler.codec.mqtt.MqttDecoder.decodeVariableHeader(MqttDecoder.java:198) at io.netty.handler.codec.mqtt.MqttDecoder.decode(MqttDecoder.java:88) at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489) at io.netty.handler.codec.ReplayingDecoder.callDecode(ReplayingDecoder.java:367)