Closed Bas83 closed 1 year ago
This happens because routeCommand
method receives a null
clientId
https://github.com/moquette-io/moquette/blob/9eb2a6f3d4b582324444a198d94e6e6846acf0fd/broker/src/main/java/io/moquette/broker/PostOffice.java#L629-L632.
That clientId is extracted from the packet, do you have any logs around the time the problem manifest?
@andsel I'm afraid not, definitely not logs of the contents of packets. We have a number of clients integrating with us. Do you think it a pure coincidence that this started happening after this update, by some misbehaving client, or is there new code that could have affected this?
@andsel This is the order of things happening:
With version 0.16
was introduced event loops to isolate processing of session's requests. Now sessions are sticky to a session event loop. Same loop can handle many sessions.
Every loop is a thread, and we have as many loop as many CPU cores.
Every loop has a command queue, which contains SessionCommands to be executed. So the routing method has the responsibility to create the session command and enqueue to the appropriate queue.
So your problem is not a coincidence, because before this code wasn't present. The motivation of its introduction is to simplify concurrency management in Moquette.
So the problem could:
@andsel also we saw many additional warning logs that did not accompany those exceptions, while we didn't see those logs occur before.
That clientId is extracted from the packet
Do you not already know the clientId from the open connection itself? Does it not make sense to get the clientId from a more permanent place to avoid misbehaving clients, if that is a possibility?
Do you have a plain test form of the logs to share?
Does it not make sense to get the clientId
ClientId (if present) is passed by the client to the server during the CONNECT, then the broker stores the clientID in MQTTConnection and in Session.
Is it possible for the Publish packet to be handled before the code is run that sets the ClientID on the Channel? https://github.com/moquette-io/moquette/blob/9eb2a6f3d4b582324444a198d94e6e6846acf0fd/broker/src/main/java/io/moquette/broker/MQTTConnection.java#L204-L237 (Actual setting in line 215 and 223)
Maybe we should move the call to NettyUtils.clientID(channel, clientIdUsed);
directly in executeConnect
and not in that ChannelFutureListener
?
Do you have a plain test form of the logs to share?
I had to do some redaction but this is what I have:
{
"message": "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.",
"attributes": {
"@timestamp": "2023-01-20T02:07:56.862Z",
"level": "WARN",
"thread_name": "nioEventLoopGroup-3-2",
"level_value": 30000,
"logger": {
"name": "io.netty.channel.DefaultChannelPipeline"
},
"@version": "1",
"logger_name": "io.netty.channel.DefaultChannelPipeline",
"stack_trace": "io.netty.channel.StacklessClosedChannelException: null\n\tat io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object, ChannelPromise)(Unknown Source)\n",
"env": "production"
}
}
{
"message": "Error processing protocol message: PUBLISH",
"attributes": {
"msg": {
"type": "[PUBLISH]"
},
"@timestamp": "2023-01-20T02:07:56.903Z",
"level": "ERROR",
"thread_name": "nioEventLoopGroup-3-1",
"level_value": 40000,
"logger": {
"name": "io.moquette.broker.NewNettyMQTTHandler"
},
"@version": "1",
"channel": "[id: 0xa08d1d44, L:/10.0.2.68:1883 - R:/10.0.2.243:45852]",
"logger_name": "io.moquette.broker.NewNettyMQTTHandler",
"stack_trace": "java.lang.NullPointerException: Cannot invoke "String.hashCode()" because the return value of "io.moquette.broker.SessionCommand.getSessionId()" is null
at io.moquette.broker.PostOffice.routeCommand(PostOffice.java:632)
at io.moquette.broker.MQTTConnection.processPublish(MQTTConnection.java:427)
at io.moquette.broker.MQTTConnection.handleMessage(MQTTConnection.java:80)
at io.moquette.broker.NewNettyMQTTHandler.channelRead(NewNettyMQTTHandler.java:58)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.moquette.broker.metrics.MQTTMessageLogger.channelRead(MQTTMessageLogger.java:47)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.moquette.broker.metrics.MessageMetricsHandler.channelRead(MessageMetricsHandler.java:50)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.moquette.broker.metrics.BytesMetricsHandler.channelRead(BytesMetricsHandler.java:51)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)",
"env": "production"
}
}
{
"message": "Closed client channel due to exception in processing",
"attributes": {
"msg": {
"type": "[PUBLISH]"
},
"@timestamp": "2023-01-20T02:07:56.905Z",
"level": "INFO",
"thread_name": "nioEventLoopGroup-3-1",
"level_value": 20000,
"logger": {
"name": "io.moquette.broker.NewNettyMQTTHandler"
},
"@version": "1",
"channel": "[id: 0xa08d1d44, L:/10.0.2.68:1883 - R:/10.0.2.243:45852]",
"logger_name": "io.moquette.broker.NewNettyMQTTHandler",
"env": "production"
}
}
{
"message": "CONNACK send failed, cleanup session and close the connection",
"attributes": {
"@timestamp": "2023-01-20T02:07:57Z",
"level": "ERROR",
"thread_name": "nioEventLoopGroup-3-1",
"level_value": 40000,
"logger": {
"name": "io.moquette.broker.MQTTConnection"
},
"@version": "1",
"logger_name": "io.moquette.broker.MQTTConnection",
"stack_trace": "io.netty.channel.StacklessClosedChannelException: null\n\tat io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object, ChannelPromise)(Unknown Source)\n",
"env": "production"
}
}
@hylkevds
Maybe we should move the call to NettyUtils.clientID(channel, clientIdUsed); directly in executeConnect and not in that ChannelFutureListener?
Yes that actually would be safer, however I don't think it's related to this. The publish on the new connection is done after it set the clientId in the channel's attributes: https://github.com/moquette-io/moquette/blob/9eb2a6f3d4b582324444a198d94e6e6846acf0fd/broker/src/main/java/io/moquette/broker/MQTTConnection.java#L220
The idea was to set up all connection related stuff after we the broker has a working connection with the client, so setting the clientId into the attributes after the broker was successful in sending the CONNACK.
That's for sending queued messages with cleanSession=false. But what happens when the client sends a new publish before the connect handshake is finished? Before that ChannelFutureListener has fired... is that possible?
If cleanSession=true
means that the client is not yet subscribed to any topic.If I recall correctly the MQTT specification says that a client is not allowed to send any other message other than a DISCONNECT until it receives a CONNACK.
@hylkevds your case could manifest when a client sends a CONNECT, send a SUBSCRIBE before the receiving CONNACK, and another client sends PUBLISH that match the topic.
Now I don't recall exactly if the protocol prohibits to send SUBSCRIBE/PUBSLISH after a CONNECT and before a CONNACK. In case Moquette should enforce it, it should close the connection if the connecting client is sending a PUB on an MQTTConnect which is not yet transitioned to connected
state (aka connected = true;
).
If I understand the exception right, in this case it is the client itself that publishes, not another client.
So is it possible for a client to send a PUBLISH before the CONNACK has gone fully through? Considering that from the client perspective the CONNACK may have been sent, but not processed yet due to our short buffering?
From [MQTT-3.1.4-5] in MQTT 3.1.1 spec http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
Clients are allowed to send further Control Packets immediately after sending a CONNECT Packet;
Clients need not wait for a CONNACK Packet to arrive from the Server. If the Server rejects the CONNECT,
it MUST NOT process any data sent by the Client after the CONNECT Packet [MQTT-3.1.4-5]
Non normative comment
Clients typically wait for a CONNACK Packet, However, if the Client exploits its freedom to send
Control Packets before it receives a CONNACK, it might simplify the Client implementation as
it does not have to police the connected state. The Client accepts that any data that it sends before
it receives a CONNACK packet from the Server will not be processed if the Server rejects the connection.
So this means that a PUB could be sent by the client after the CONNECT and before it receives the CONNACK. So this means that the clientId has to be setup in the Channel attributes before the server reply with a CONNACK.
So this is a bug and the solution is to move the clientID assignment into Channel's attributes before the CONNACK message is written to the Channel itself.
Running on JVM 17, yesterday we updated moquette-broker from 0.15 to 0.16 and started seeing these errors in our logs:
And before ánd after this:
Also, our consuming application started reporting errors so it wasn't just some heads-ups but things were actually failing.
Mind you, these didn't start happening immediately but took like half a day to appear.
Any idea what this could be?