eclipse / paho.mqtt.rust

paho.mqtt.rust
Other
511 stars 102 forks source link

Manually listening to the system signal processing disconnect has bug #188

Open xiaozefeng opened 1 year ago

xiaozefeng commented 1 year ago

I want to manually listen to the linux signal in rust to handle the disconnection。 but has bug.

My guess is that there is a processing signal at the c, but this signal is intercepted by my upper layer, causing the c layer not to process it?

full code

use std::{env, process, thread, time::Duration};

use tokio::signal::unix::{signal, SignalKind};

extern crate paho_mqtt as mqtt;

const DFLT_BROKER: &str = "tcp://localhost:1883";
const DFLT_CLIENT: &str = "rust_subscribe";
const DFLT_TOPICS: &[&str] = &["rust/mqtt", "rust/test"];
// The qos list that match topics above.
const DFLT_QOS: &[i32] = &[0, 1];

// Reconnect to the broker when connection is lost.
async fn try_reconnect(cli: &mqtt::AsyncClient) -> bool {
    println!("Connection lost. Waiting to retry connection");
    for _ in 0..12 {
        thread::sleep(Duration::from_millis(5000));
        if cli.reconnect().await.is_ok() {
            println!("Successfully reconnected");
            return true;
        }
    }
    println!("Unable to reconnect after several attempts.");
    false
}

// Subscribes to multiple topics.
async fn subscribe_topics(cli: &mqtt::AsyncClient) {
    if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS).await {
        println!("Error subscribes topics: {:?}", e);
        process::exit(1);
    }
}

#[tokio::main]
async fn main() {
    let host = env::args()
        .nth(1)
        .unwrap_or_else(|| DFLT_BROKER.to_string());

    // Define the set of options for the create.
    // Use an ID for a persistent session.
    let create_opts = mqtt::CreateOptionsBuilder::new()
        .server_uri(host)
        .client_id(DFLT_CLIENT.to_string())
        .persistence(None)
        .finalize();

    // Create a client.
    let cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|err| {
        println!("Error creating the client: {:?}", err);
        process::exit(1);
    });

    let cloned_cli = cli.clone();

    tokio::spawn(async move {
        // Initialize the consumer before connecting.
        let rx = cli.start_consuming();

        let lwt_props = mqtt::properties!(mqtt::PropertyCode::WillDelayInterval => 10);
        // Define the set of options for the connection.
        let lwt = mqtt::MessageBuilder::new()
            .topic("lost_connection_topic")
            .payload("{}")
            .properties(lwt_props)
            .qos(1)
            .finalize();
        let conn_opts = mqtt::ConnectOptionsBuilder::new()
            .keep_alive_interval(Duration::from_secs(20))
            .clean_session(false)
            .will_message(lwt)
            .finalize();

        // Connect and wait for it to complete or fail.
        if let Err(e) = cli.connect(conn_opts).await {
            println!("Unable to connect:\n\t{:?}", e);
            process::exit(1);
        }

        // Subscribe topics.
        subscribe_topics(&cli).await;

        println!("Processing requests...");
        for msg in rx.iter() {
            if let Some(msg) = msg {
                println!("{}", msg);
            } else if !cli.is_connected() {
                if try_reconnect(&cli).await {
                    println!("Resubscribe topics...");
                    subscribe_topics(&cli).await;
                } else {
                    break;
                }
            }
        }
    });
    let mut stream = signal(SignalKind::terminate()).unwrap();
    stream.recv().await;
    println!("gruceful shutdown")
}

start it

cargo run 

stop it

ctrl + c

At this point the process will get stuck。

my dependencies

[dependencies]
paho-mqtt = "0.11.1"

emqx server version: 5.0.13

fpagliughi commented 1 year ago

I have no idea, but I’m not sure what you’re doing here.

I think you should just block on the spawned tokio task to await its completion. Or, for that matter, you appear to be running in an asynchronous tokio main function, so you don’t need a separate async task to run the MQTT connection?