fusesource / mqtt-client

A Java MQTT Client
http://mqtt-client.fusesource.org/
Apache License 2.0
1.27k stars 368 forks source link

Client error when reconnecting back durable Session with pending message #29

Open xcynn opened 10 years ago

xcynn commented 10 years ago

I encountered the below error code when testing with a Subscriber having cleanSession=false. The error occurs when reconnecting a subscriber back a durable Session with pending message on broker side. I did a search online and found a discussion on the ActiveMQ website (https://issues.apache.org/jira/browse/AMQ-4585) on the same problem but not yet resolved. The failure is exactly the same as described in the link, and the actual error code could differ from time to time. It seems that when receiving a pending message from a durable session, the MQTTFrame frame.messageType() byte tends to corrupt. In processFrame(), the frame is supposed to be decoded with PUBLISH.TYPE (3), but sometimes it doesn't, mostly is SUBACK.TYPE (9). Since AMQ does not receive the ACK if Subscriber crashes, and if the exact subscriber is restarted a few more times, it will have a chance to correctly process the exact same frame. I am quite confused with where the problem arises. Could it be a racing condition somewhere?

java.net.ProtocolException: Command from server contained an invalid message id: 1 at org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:820) at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:859) at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:57) at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:487) at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:664) at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:543) at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)

xcynn commented 10 years ago

Just noticed that if I disabled the SSL and connect through TCP, there will be far less chance to hit with this error.