moquette-io / moquette

Java MQTT lightweight broker
http://moquette-io.github.io/moquette/
Apache License 2.0
2.27k stars 814 forks source link

io.netty.util.IllegalReferenceCountException: refCnt: 0 #773

Open BigGitWorld opened 10 months ago

BigGitWorld commented 10 months ago

Hello. I have embedded the Moquette version 0.17 in a spring boot project and overrided the AbstractInterceptHandler class to handle onPublish callback, but I get the following error when I publish a simple json message to the broker.

@Override
public void onPublish(InterceptPublishMessage msg) {
    super.onPublish(msg);
    final String decodedPayload = msg.getPayload().toString(UTF_8); //get error here!
    System.out.println("Received on topic: " + msg.getTopicName() + " content: " + decodedPayload);
}

error:

Exception in thread "pool-2-thread-2" io.netty.util.IllegalReferenceCountException: refCnt: 0
    at io.netty.handler.codec.mqtt.MqttPublishMessage.content(MqttPublishMessage.java:49)
    at io.netty.handler.codec.mqtt.MqttPublishMessage.payload(MqttPublishMessage.java:42)
    at io.moquette.interception.messages.InterceptPublishMessage.getPayload(InterceptPublishMessage.java:40)
    at com.test.broker.intercept.BrokerEventListener.onPublish(BrokerEventListener.java:201)
    at com.test.broker.intercept.BrokerEventListener$$FastClassBySpringCGLIB$$b1d48b25.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
    at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
    at com.test.broker.intercept.BrokerEventListener$$EnhancerBySpringCGLIB$$18337bb1.onPublish(<generated>)
    at io.moquette.interception.BrokerInterceptor.lambda$notifyTopicPublished$3(BrokerInterceptor.java:134)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
hylkevds commented 10 months ago

Moquette (by way of Netty) uses reference-counted buffers to avoid duplicating data. The last one to receive the buffer realeases it. In this case you pass the buffer on to the super-class, thus the super class releases the buffer and you can no longer use it afterwards. To avoid the release, you have to add a "claim" to it by increasing the reference count before passing it on, and releasing the claim again once you are done:

@Override
public void onPublish(InterceptPublishMessage msg) {
    msg.getPayload().retain();
    super.onPublish(msg);
    final String decodedPayload = msg.getPayload().toString(UTF_8);
    System.out.println("Received on topic: " + msg.getTopicName() + " content: " + decodedPayload);
    msg.getPayload().release();
}

Or, alternatively, move the call to super to be the last in your method.