moquette-io / moquette

Java MQTT lightweight broker
http://moquette-io.github.io/moquette/
Apache License 2.0
2.3k stars 819 forks source link

MQTT durable subscription exception #455

Open OleksandrBerezianskyi opened 5 years ago

OleksandrBerezianskyi commented 5 years ago

Expected behavior

After reconnect of the same client with clean_session = false and subscribing to the same topic with QoS = 1 all missed messages should appear

Actual behavior

Exception is thrown while trying to resend unacked message:

2019-03-08 14:19:37.194 ERROR 31746 --- [ntLoopGroup-5-5] io.moquette.broker.NewNettyMQTTHandler   : Unexpected exception while processing MQTT message. Closing Netty channel. CId=client123

java.lang.ClassCastException: class io.moquette.broker.SessionRegistry$PubRelMarker cannot be cast to class io.moquette.broker.SessionRegistry$PublishedMessage (io.moquette.broker.SessionRegistry$PubRelMarker and io.moquette.broker.SessionRegistry$PublishedMessage are in unnamed module of loader 'app')
    at io.moquette.broker.Session.resendInflightNotAcked(Session.java:293)
    at io.moquette.broker.MQTTConnection.resendNotAckedPublishes(MQTTConnection.java:481)
    at io.moquette.broker.NewNettyMQTTHandler.userEventTriggered(NewNettyMQTTHandler.java:106)
    at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
    at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
    at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
    at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
    at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
    at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
    at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
    at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
    at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
    at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
    at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
    at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
    at io.netty.handler.codec.ByteToMessageDecoder.userEventTriggered(ByteToMessageDecoder.java:366)
    at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
    at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
    at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
    at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
    at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
    at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
    at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
    at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
    at io.moquette.broker.MoquetteIdleTimeoutHandler.userEventTriggered(MoquetteIdleTimeoutHandler.java:48)
    at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
    at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
    at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
    at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
    at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
    at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
    at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
    at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
    at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
    at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
    at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
    at io.moquette.broker.InflightResender.resendNotAcked(InflightResender.java:160)
    at io.moquette.broker.InflightResender.access$100(InflightResender.java:32)
    at io.moquette.broker.InflightResender$WriterIdleTimeoutTask.run(InflightResender.java:58)
    at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
    at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:127)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:495)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)

2019-03-08 14:19:37.194 ERROR 31746 --- [ntLoopGroup-5-5] io.moquette.broker.NewNettyMQTTHandler   : Unexpected exception while processing MQTT message. Closing Netty channel. CId=client123

java.nio.channels.ClosedChannelException: null
    at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown Source)

Steps to reproduce

  1. Run moquette broker
  2. Publish messages every second to any topic, e.g. "/test-topic"
  3. Run subscriber with client = "client123", clean_session = false and subscribe to topic "/test-topic" with QoS = 1
  4. Kill subscriber
  5. Rerun subscriber
  6. Retry 4.-5. until you get an exception

Minimal yet complete reproducer code (or URL to code) or complete log file

Client using python 3.7

import paho.mqtt.client as mqtt

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))

    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    client.subscribe("/test-topic", 1)

# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))

client = mqtt.Client(client_id="client123", clean_session=False)
client.username_pw_set("user123", "session_id")
client.on_connect = on_connect
client.on_message = on_message

print("connecting")
client.connect("localhost", 1883, 60)
print("connected")

client.loop_forever()

Moquette MQTT version

moquette-broker-0.12.1.jar

JVM version (e.g. java -version)

java -version
openjdk version "11.0.1" 2018-10-16
OpenJDK Runtime Environment 18.9 (build 11.0.1+13)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.1+13, mixed mode)

OS version (e.g. uname -a)

uname -a
Darwin Admins-MacBook-Pro.local 18.2.0 Darwin Kernel Version 18.2.0: Thu Dec 20 20:46:53 PST 2018; root:xnu-4903.241.1~1/RELEASE_X86_64 x86_64
andsel commented 5 years ago

@OleksandrBerezianskyi you missed the publisher client part. I mean the error here relates to the fact that PubRelMarker can't be cast to PublishedMessage due to a timeout of a message in flight from broker to client. The PubRelMarker is a placeholder class that reppresents the PUBREL message from broker to a subscriber that want to receive messages at QoS2. Please, could you share a complete log file from bootstrap of the broker till the crash moquette.log and messages.logs? It could help a lot to understand/ reproduce, thanks