eclipse / paho.mqtt.java

Eclipse Paho Java MQTT client library. Paho is an Eclipse IoT project.
https://eclipse.org/paho
Other
2.08k stars 879 forks source link

Sometimes a 'deadlock' may occur when resubscribing to a topic in the connectComplete callback #999

Open QBH-insist opened 1 year ago

QBH-insist commented 1 year ago

Reproduction steps

  1. create a sub client(subscribing to a topic in the connectComplete callback) public void connectComplete(boolean reconnect, String serverURI) { myClient.subscribe(this.topic); } and create a pub client
  2. the pub client continuously publishes messages.
  3. the sub client receive messages
  4. then, close the sub client whille the pub client continues to publish message, ensuring that the pub client still published more than 10 message
  5. now, the sub client session(in broker) has more than 10 unconsumed messages
  6. close the pub client, reconnect the sub client(sub client id must unchanged)
  7. finally, the sub client will be blocked until the checkForActivity method closes the client
    ���� 11, 2023 5:22:46 ���� org.eclipse.paho.client.mqttv3.internal.ClientState checkForActivity
    ����: client1: Timed out as no activity, keepAlive=15,000,000,000 lastOutboundActivity=202,748,527,866,900 lastInboundActivity=202,733,513,164,800 time=202,763,529,941,000 lastPing=202,748,527,900,000
    on connectionLost

More Information

  1. the checkForActivity Method closes the client because that client was not received MqttPingResp(ping response)
  2. additionally, I've discovered that subscribe ack(MqttSuback) was not recevied either
  3. however, ping ack and sub ack both had been sent by the broker(EMQX Broker)

image image

  1. the sub client can recevied 10 message from broker when reconnecting(debug can found it, but did not send message ack)

Based on further speculation

Reason analysis

Paho Mqtt Thread

Client startup process (coarse-grained):

  1. connect packet sent
  2. connect ack recevied
  3. received message from broker if need
  4. the connectComplete called
  5. subscribe a topic in the connectComplete, and then the CallThread(CommsCallback) will wait until it gets sub ack notified

Rec Thread(CommsReceiver) Part

  1. connect ack received
  2. received message from broker(more than 10 messages)
  3. put the messages in the CommsCallback.messageQueue

image

  1. The important logic is coming:clientState.notifyReceivedMsg(message) Rec Thread(CommsReceiver) will wait when messageQueue.size() >= 10

image

Conclusion

**1. The Call Thread(CommsCallback) state in WAITING when subscribe in the connectComplete

  1. The Rec Thread(CommsReceiver) state in WAITING when CommsCallback.messageQueue size >= 10
  2. The CommsCallback.messageQueue consumed in Call Thread(CommsCallback), but Call Thread(CommsCallback) state in WAITING now
  3. The Rec Thread(CommsReceiver) state in WAITING, Thus Ping Ack and Sub Ack are never received.
  4. Based on the above point, The Call Thread(CommsCallback) will not be notified
  5. Finally, The PingTask will close the client**