eclipse / paho.mqtt.rust

paho.mqtt.rust
Other
511 stars 102 forks source link

Published messages sent during connection failure not resent after connection restore #218

Open cartertinney opened 6 months ago

cartertinney commented 6 months ago

When doing publishes, after a connection loss, but before the Paho client has realized it has lost connection, any messages sent are lost and never resent upon connection restore.

let create_opts = mqtt::CreateOptionsBuilder::new()
        .server_uri("test.mosquitto.org:1883")
        .send_while_disconnected(true)
        .finalize();
let paho_client =  mqtt::AsyncClient::new(create_opts).expect("err!!!");
let conn_opts = mqtt::ConnectOptionsBuilder::new_v5()
        .keep_alive_interval(Duration::from_secs(3))
        .automatic_reconnect(Duration::from_secs(2), Duration::from_secs(4))
        .finalize();

if let Err(err) = block_on(async {

        paho_client.connect(conn_opts).await?;

        let msg1 = mqtt::Message::new(TEST_TOPIC, "Message 1", mqtt::QOS_1);
        paho_client.publish(msg1);    //DeliveryToken indicates success

        thread::sleep(Duration::from_secs(2))
        //Connection is lost, but keepalive hasn't noticed yet

        let msg2 = mqtt::Message::new(TEST_TOPIC, "Message 2", mqtt::QOS_1);
        paho_client.publish(msg2);    //DeliveryToken indicates "Operation Incomplete", presumably indicating PUBACK not received

        thread::sleep(Duration::from_secs(2))
        // Client now has become aware of the disconnect due to exceeding keepalive time

        let msg3 = mqtt::Message::new(TEST_TOPIC, "Message 3", mqtt::QOS_1);
        paho_client.publish(msg3);    //DeliveryToken does not indicate a result immediately as the message is queued pending reconnect

        thread::sleep(Duration::from_secs(4))
        // Client has reconnected now

        let msg4 = mqtt::Message::new(TEST_TOPIC, "Message 4", mqtt::QOS_1);
        paho_client.publish(msg3);    //DeliveryToken indicates success

The broker on the other side of the above example would only receive messages 1, 3 and 4. Message 2, which timed out waiting for PUBACK is never redelivered upon reconnect. This seems to be in violation of the MQTT specification (section 4.4)