eclipse / paho.mqtt.rust

paho.mqtt.rust
Other
511 stars 102 forks source link

mqtt v5 subscriber disconnect problem #195

Open xiaozefeng opened 1 year ago

xiaozefeng commented 1 year ago

I use paho-mqtt v5 to subscribe to a topic and after a period of almost 12 hours it disconnects

rust version :

rustup 1.25.2 (17db695f1 2023-02-01)
info: This is the version for the rustup toolchain manager, not the rustc compiler.
info: The currently active `rustc` version is `rustc 1.67.1 (d5a82bbd2 2023-02-07)`

the os: No LSB modules are available. Distributor ID: Ubuntu Description: Ubuntu 20.04.5 LTS Release: 20.04 Codename: focal

this is the Cargo.toml

tokio= {version = "1", features = ["full"]}
tracing="0.1.3"
tracing-subscriber = "0.3.16"
paho-mqtt = "0.12.0"
tokio-stream = "0.1.12"

this is the code :

use std::time::Duration;

use mqtt::MQTT_VERSION_5;
use paho_mqtt as mqtt;
use tokio_stream::StreamExt;
use tracing::info;
#[tokio::main]
async fn main() {
    tracing_subscriber::fmt::init();

    let host = "mqtt://localhost:1883";
    let create_opts = mqtt::CreateOptionsBuilder::new()
        .server_uri(host)
        .client_id("paho-mqtt-client-v5")
        .finalize();
    let mut cli = mqtt::AsyncClient::new(create_opts).unwrap();
    let mut strm = cli.get_stream(25);

    let conn_opts = mqtt::ConnectOptionsBuilder::with_mqtt_version(MQTT_VERSION_5)
        .keep_alive_interval(Duration::from_secs(30))
        .clean_start(false)
        .properties(mqtt::properties![mqtt::PropertyCode::SessionExpiryInterval => 3600])
        .user_name("username")
        .password("password")
        .finalize();

    info!("connecting to mqtt server...");
    cli.connect(conn_opts).await.unwrap();

    let topic = "mining_updater_topic_test";
    info!("subscribing topic: {}", topic);

    cli.subscribe(topic, 1).await.unwrap();

    info!("waiting for receive message...");

    while let Some(msg_opt) = strm.next().await {
        if let Some(msg) = msg_opt {
            println!("{msg}");
        } else {
            info!("lost connection to mqtt server");
            while let Err(e) = cli.reconnect().await {
                println!("Error reconnect: {}", e);
                tokio::time::sleep(Duration::from_secs(1)).await;
            }
        }
    }
}
fpagliughi commented 1 year ago

What server are you using? Is it public or private? If it's a public server, consider changing you Client ID to something unique. Someone else might be connecting with the same ID and knocking you off the server.

Is there a chance that the server is dropping you? Maybe monitor for a disconnect message from the server - perhaps using on_disconnect() or spying the network with Wireshark.

Is there any chance your board (computer) is powering down or going to sleep?

Does it reconnect?

xiaozefeng commented 1 year ago

The server I am using is EMQX 5.0 and I am sure that my client id is unique.

I'm not running on my laptop, I'm running on a small machine with ubuntu installed. And on the same machine, I can reconnect using the v3 protocol, but using the v5 protocol, I get disconnected and can't reconnect automatically.

os: also use ubuntu , is same the v5 No LSB modules are available. Distributor ID: Ubuntu Description: Ubuntu 20.04.5 LTS Release: 20.04 Codename: focal

the v3 code:

use std::time::Duration;

use paho_mqtt as mqtt;
use tokio_stream::StreamExt;
use tracing::info;

#[tokio::main]
async fn main(){
    tracing_subscriber::fmt::init();

    let host = "mqtt://localhost:1883";
    let create_opts = mqtt::CreateOptionsBuilder::new_v3()
        .server_uri(host)
        .client_id("paho-mqtt-client-v3")
        .finalize();

    let mut cli =  mqtt::AsyncClient::new(create_opts).unwrap();
    let mut strm = cli.get_stream(25);
    let conn_opts = mqtt::ConnectOptionsBuilder::new_v3()
            .keep_alive_interval(Duration::from_secs(30))
            .clean_session(false)
            .finalize();
    info!("connecting to mqtt server...");
    cli.connect(conn_opts).await.unwrap();

    let topic = "mining_updater_topic_test";
    info!("subscribing topic: {}", topic);

    cli.subscribe(topic, 1).await.unwrap();

    info!("waiting for receive message...");

    while let Some(msg_opt) = strm.next().await {
        if let Some(msg) = msg_opt{
            println!("{msg}");
        }else{
            info!("lost connection to mqtt server");
            while let Err(e)= cli.reconnect().await{
                println!("Error reconnect: {}",e);
                tokio::time::sleep(Duration::from_secs(1)).await;
            }

        }
    }
}
fpagliughi commented 1 year ago

OK. Thanks. I will try to reproduce it.