eclipse / paho.mqtt.rust

paho.mqtt.rust
Other
511 stars 102 forks source link

manual disconnect does not send a will message #186

Closed xiaozefeng closed 1 year ago

xiaozefeng commented 1 year ago

Cargo.toml

[dependencies]
paho-mqtt="*"
tokio= {version = "1", features = ["full"]}

main.rs

use std::{env, process, thread, time::Duration};

use tokio::signal::unix::{signal, SignalKind};

extern crate paho_mqtt as mqtt;

const DFLT_BROKER: &str = "tcp://localhost:1883";
const DFLT_CLIENT: &str = "rust_subscribe";
const DFLT_TOPICS: &[&str] = &["rust/mqtt", "rust/test"];
// The qos list that match topics above.
const DFLT_QOS: &[i32] = &[0, 1];

// Reconnect to the broker when connection is lost.
async fn try_reconnect(cli: &mqtt::AsyncClient) -> bool {
    println!("Connection lost. Waiting to retry connection");
    for _ in 0..12 {
        thread::sleep(Duration::from_millis(5000));
        if cli.reconnect().await.is_ok() {
            println!("Successfully reconnected");
            return true;
        }
    }
    println!("Unable to reconnect after several attempts.");
    false
}

// Subscribes to multiple topics.
async fn subscribe_topics(cli: &mqtt::AsyncClient) {
    if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS).await {
        println!("Error subscribes topics: {:?}", e);
        process::exit(1);
    }
}

#[tokio::main]
async fn main() {
    let host = env::args()
        .nth(1)
        .unwrap_or_else(|| DFLT_BROKER.to_string());

    // Define the set of options for the create.
    // Use an ID for a persistent session.
    let create_opts = mqtt::CreateOptionsBuilder::new()
        .server_uri(host)
        .client_id(DFLT_CLIENT.to_string())
        .finalize();

    // Create a client.
    let cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|err| {
        println!("Error creating the client: {:?}", err);
        process::exit(1);
    });

    let cloned_cli = cli.clone();

    tokio::spawn(async move {
        // Initialize the consumer before connecting.
        let rx = cli.start_consuming();

        // Define the set of options for the connection.
        let lwt = mqtt::MessageBuilder::new()
            .topic("lost_connection_topic")
            .payload("{}")
            .qos(1)
            .finalize();
        let conn_opts = mqtt::ConnectOptionsBuilder::new()
            .keep_alive_interval(Duration::from_secs(20))
            .clean_session(false)
            .will_message(lwt)
            .finalize();

        // Connect and wait for it to complete or fail.
        if let Err(e) = cli.connect(conn_opts).await {
            println!("Unable to connect:\n\t{:?}", e);
            process::exit(1);
        }

        // Subscribe topics.
        subscribe_topics(&cli).await;

        println!("Processing requests...");
        for msg in rx.iter() {
            if let Some(msg) = msg {
                println!("{}", msg);
            } else if !cli.is_connected() {
                if try_reconnect(&cli).await {
                    println!("Resubscribe topics...");
                    subscribe_topics(&cli).await;
                } else {
                    break;
                }
            }
        }
    });
    let mut stream = signal(SignalKind::terminate()).unwrap();
    stream.recv().await;
    if cloned_cli.is_connected() {
        println!("Disconnecting");
        cloned_cli.unsubscribe_many(DFLT_TOPICS).await.unwrap();
        let disconnection_opts = paho_mqtt::DisconnectOptionsBuilder::new()
            .publish_will_message()
            .finalize();
        cloned_cli.disconnect(disconnection_opts).await.unwrap();
    }
}

important code:

 let disconnection_opts = paho_mqtt::DisconnectOptionsBuilder::new()
            .publish_will_message()
            .finalize();
        cloned_cli.disconnect(disconnection_opts).await.unwrap();

Expect: send a will message Actual: not send

fpagliughi commented 1 year ago

I will test that to verify, but my initial thought is that by using the default settings you’re making an MQTT v3.1.1 connection. The ability to publish the will message on a clean disconnect is a feature of the MQTT v5 protocol.

fpagliughi commented 1 year ago

Have a look at the mqttrs_chat example, noting that all the examples were updated slightly with the v0.12 release this week. I assume that if you tried it in the last few days, with the paho-mqtt="*" dependency, you got this version.

I just ran the mqttrs_chat example and it works for me.

As I said, to get the will publication with a clean disconnect, you need a v5 connection. Change your connect options to something like this:

let conn_opts = mqtt::ConnectOptionsBuilder::new_v5()
    .keep_alive_interval(Duration::from_secs(20))
    .clean_start(false)
    .will_message(lwt)
    .finalize();

Use new_v5() to initialize the options for a v5 connection. And then use clean_start() instead of clean_session(), for v5.

xiaozefeng commented 1 year ago

Got it !

But ConnectOptionsBuilder::new_v5() have no new_v5() function , I found ConnectOptionsBuilder::new().mqtt_version(5) , But an error has occurred, the error log

Unable to connect:
        PahoDescr(-16, "Wrong MQTT version")

the core code

 let conn_opts = mqtt::ConnectOptionsBuilder::new()
            .mqtt_version(5)
            .keep_alive_interval(Duration::from_secs(20))
            // .clean_session(false)
            .clean_start(false)
            .will_message(lwt)
            .finalize();

Am I using the wrong version? This is my Cargo.toml

[dependencies]
paho-mqtt="0.11.1"
fpagliughi commented 1 year ago

v0.12 was released a few days ago.

I would suggest updating to this version as it fixes some bugs and performance issues, but if you want to stay with v0.11, the code you posted should work.

appelgriebsch commented 1 year ago

Had the same issue and can confirm that after upgrading dependency to v0.12 and change from

    let connect_options = ConnectOptionsBuilder::new()
...

to

    let connect_options = ConnectOptionsBuilder::new_v5().clean_start(false)
...

the last will message is send with a .disconnect() call.