streamnative / mop

MQTT on Pulsar implemented using Pulsar Protocol Handler
Apache License 2.0
169 stars 54 forks source link

Cannot push/receive Mosquitto Bridge messages on MOP #1388

Open mzaferyahsi opened 2 months ago

mzaferyahsi commented 2 months ago

Describe the bug When bridging another MQTT with MOP, I'm getting the following error.

2024-07-11T20:21:17,698+0000 [mqtt-redirect-io-46-5] ERROR io.streamnative.pulsar.handlers.mqtt.MQTTCommonInboundHandler - Exception was caught while processing MQTT message, 
java.lang.IllegalStateException: MQTT is an unknown protocol name
    at io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.checkState(MqttMessageUtils.java:48) ~[3h-3vkPU0Yfb3TTJDBOvGQ/:?]
    at io.streamnative.pulsar.handlers.mqtt.MQTTCommonInboundHandler.channelRead(MQTTCommonInboundHandler.java:62) ~[3h-3vkPU0Yfb3TTJDBOvGQ/:?]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
    at io.streamnative.pulsar.handlers.mqtt.adapter.CombineHandler.channelRead(CombineHandler.java:31) ~[3h-3vkPU0Yfb3TTJDBOvGQ/:?]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.handler.codec.ReplayingDecoder.callDecode(ReplayingDecoder.java:349) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:535) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.handler.codec.ReplayingDecoder.callDecode(ReplayingDecoder.java:366) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289) ~[io.netty-netty-handler-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:801) ~[io.netty-netty-transport-classes-epoll-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509) ~[io.netty-netty-transport-classes-epoll-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407) ~[io.netty-netty-transport-classes-epoll-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[io.netty-netty-common-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.108.Final.jar:4.1.108.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.108.Final.jar:4.1.108.Final]
    at java.lang.Thread.run(Thread.java:840) ~[?:?]

To Reproduce Steps to reproduce the behavior:

  1. Configure MOP on Pulsar with following options;
    messagingProtocols: mqtt
    protocolHandlerDirectory: "/pulsar/protocols"
    mqttListeners: "mqtt://127.0.0.1:1883"
    mqttProxyEnabled: "true"
    mqttProxyPort: "5682"
  2. Setup a new mosquitto instance locally (or via docker) with following config (updating the address of the MOP)
    
    listener 1883
    allow_anonymous true

persistence true persistence_location /mosquitto/data/

connection alroze address MOP:1883

topic # both 0


3. Publish a message on the mosquitto instance

**Expected behavior**
The messages that are synced between MOP and Mosquitto

**Desktop (please complete the following information):**
 - MOP: Kubernetes v1.28.3
 - Pulsar: v3.0.5
 - Mosquitto: 2.0.18

**Additional context**
N/A