eclipse / paho.mqtt.rust

paho.mqtt.rust
Other
511 stars 102 forks source link

Got error when connection lost using SSL #206

Open leruetkins opened 1 year ago

leruetkins commented 1 year ago

I run the program and everything works fine, but after the connection to the server is lost, the program crashes and may have time to catch such an error:

thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: Poi
sonError { .. }', C:\Users\user\.cargo\registry\src\github.com-1ecc6299db9ec823\
paho-mqtt-0.12.1\src\token.rs:369:41

Used code:

fn mqtt_connect() -> mqtt::Result<()> {
    let period = CONFIG_JSON["settings"]["mqtt"]["period"].as_u64().unwrap() * 1000;
    let period_duration = Duration::from_millis(period);
    let mut zabbix_last_msg = Instant::now() - period_duration - Duration::from_millis(1000);
    let host = CONFIG_JSON["settings"]["mqtt"]["url"]
        .as_str()
        .unwrap()
        .to_string();
    println!("Connecting to host: '{}'", host);

    let zabbix_topic = CONFIG_JSON["settings"]["mqtt"]["topic"].as_str().unwrap();

    // let random_name_result = format!("zbx-np_{}", RANDOM_NAME.to_string());
    // println!("Client ID: {}", random_name_result);
    let cli = mqtt::CreateOptionsBuilder::new()
        .server_uri(&host)
        .client_id("random_name_result")
        .max_buffered_messages(100)
        .create_client()?;

    let ssl_opts = mqtt::SslOptionsBuilder::new()
        .enable_server_cert_auth(false)
        //  .trust_store(trust_store)?
        //  .key_store(key_store)?
        .finalize();

    let login = CONFIG_JSON["settings"]["mqtt"]["login"]
        .as_str()
        .unwrap()
        .to_string();

    let password = CONFIG_JSON["settings"]["mqtt"]["password"]
        .as_str()
        .unwrap()
        .to_string();
    let conn_opts = mqtt::ConnectOptionsBuilder::new()
        .ssl_options(ssl_opts)
        .user_name(login)
        .password(password)
        .keep_alive_interval(Duration::from_secs(20))
        .clean_session(false)
        // .will_message(lwt)
        .finalize();

    cli.connect_with_callbacks(conn_opts, on_connect_success, on_connect_failure);

    cli.set_connected_callback(|_cli: &mqtt::AsyncClient| {
        println!("Connected.");
    });
    cli.set_connection_lost_callback(|cli: &mqtt::AsyncClient| {
        println!("Connection lost. Attempting reconnect.");
        thread::sleep(Duration::from_millis(2500));
        cli.reconnect_with_callbacks(on_connect_success, on_connect_failure);
    });

    cli.set_message_callback(move |_cli, msg| {
        if let Some(msg) = msg {
            let topic = msg.topic();
            let payload_str = msg.payload_str();
            if topic == zabbix_topic {
                let now = Instant::now();
                if (now - zabbix_last_msg) > Duration::from_millis((period).try_into().unwrap()) {
                    print_time_date();
                    let data: Result<Data, _> = serde_json::from_str(&payload_str);
                    // let json_obj: Result<Value, serde_json::Error> = serde_json::from_str(&payload_str);
                    match data {
                        Ok(ref obj) => {
                            let json_string = serde_json::to_string(&obj);
                            match json_string {
                                Ok(string) => {
                                    // println!("{}", string);
                                    println!("Received data from MQTT:\n{} - {}", topic, string);
                                }
                                Err(ref e) => {
                                    println!("Failed to convert JSON to string: {}", e);
                                }
                            }
                        }
                        Err(ref e) => {
                            println!("Failed to parse JSON: {}", e);
                        }
                    }
                    if let Ok(data) = data {
                        let response_json = json!({
                            "zabbix_server": data.zabbix_server,
                            "item_host_name": data.item_host_name,
                            "item": data.item,
                        });
                        let show_result = send_to_zabbix(&response_json.to_string());
                        let decoded_show_result = match show_result {
                            Ok(show_result) => decode_unicode_escape_sequences(&show_result),
                            Err(err) => {
                                eprintln!("Error sending data to Zabbix server: {}", err);
                                // Create an error response JSON
                                return;
                            }
                        };
                        let mut response_data = json!({
                            "data": response_json,
                            "result": decoded_show_result
                        });
                        // Convert the "show_result" field to a JSON value
                        if let Some(show_result_value) = response_data.get_mut("result") {
                            if let Some(show_result_str) = show_result_value.as_str() {
                                if let Ok(show_result_json) = serde_json::from_str(show_result_str)
                                {
                                    *show_result_value = Value::Object(show_result_json);
                                }
                            }
                        }
                    } else if let Err(err) = data {
                        eprintln!("Failed to parse payload as JSON object: {}", err);
                        // Handle the parsing error
                    }
                    zabbix_last_msg = Instant::now();
                }
            }
        }
    });

    loop {
        thread::sleep(Duration::from_millis(1000));
    }
}
matmoscicki commented 3 months ago

I have a similar problem. When the connection is lost, my client crashes. Sometimes it works properly and the client reconnect, but very often not.

I've tested this by killing mosquitto server.

Thread 7 "MQTTAsync_rcv" received signal SIGPIPE, Broken pipe.
[Switching to Thread 0x7ffff68b2700 (LWP 1327907)]
__libc_write (nbytes=24, buf=0x7fffd800ed03, fd=9) at ../sysdeps/unix/sysv/linux/write.c:26
26      ../sysdeps/unix/sysv/linux/write.c: No such file or directory.
(gdb) bt
#0  __libc_write (nbytes=24, buf=0x7fffd800ed03, fd=9) at ../sysdeps/unix/sysv/linux/write.c:26
#1  __libc_write (fd=9, buf=0x7fffd800ed03, nbytes=24) at ../sysdeps/unix/sysv/linux/write.c:24
#2  0x00007ffff7cf9489 in ?? () from /lib/x86_64-linux-gnu/libcrypto.so.1.1
#3  0x00007ffff7cf466e in ?? () from /lib/x86_64-linux-gnu/libcrypto.so.1.1
#4  0x00007ffff7cf3684 in ?? () from /lib/x86_64-linux-gnu/libcrypto.so.1.1
#5  0x00007ffff7cf3b47 in BIO_write () from /lib/x86_64-linux-gnu/libcrypto.so.1.1
#6  0x00007ffff7f46dde in ?? () from /lib/x86_64-linux-gnu/libssl.so.1.1
#7  0x00007ffff7f47cd9 in ?? () from /lib/x86_64-linux-gnu/libssl.so.1.1
#8  0x00007ffff7f5188e in ?? () from /lib/x86_64-linux-gnu/libssl.so.1.1
#9  0x00007ffff7f4fa65 in ?? () from /lib/x86_64-linux-gnu/libssl.so.1.1
#10 0x00007ffff7f5aec3 in SSL_shutdown () from /lib/x86_64-linux-gnu/libssl.so.1.1
#11 0x0000555555ad6d36 in SSLSocket_close (net=0x555555eac928) at /home/mmoscicki/.cargo/registry/src/index.crates.io-6f17d22bba15001f/paho-mqtt-sys-0.9.0/paho.mqtt.c/src/SSLSocket.c:957
#12 0x0000555555ad345e in MQTTAsync_closeOnly (client=0x555555eac8f0, reasonCode=MQTTREASONCODE_SUCCESS, props=0x0) at /home/mmoscicki/.cargo/registry/src/index.crates.io-6f17d22bba15001f/paho-mqtt-sys-0.9.0/paho.mqtt.c/src/MQTTAsyncUtils.c:2409
#13 0x0000555555ad34e3 in MQTTAsync_closeSession (client=0x555555eac8f0, reasonCode=MQTTREASONCODE_SUCCESS, props=0x0) at /home/mmoscicki/.cargo/registry/src/index.crates.io-6f17d22bba15001f/paho-mqtt-sys-0.9.0/paho.mqtt.c/src/MQTTAsyncUtils.c:2427
#14 0x0000555555ad0dd9 in nextOrClose (m=0x555555eac6b0, rc=-1, message=0x555555cbbcea "socket error") at /home/mmoscicki/.cargo/registry/src/index.crates.io-6f17d22bba15001f/paho-mqtt-sys-0.9.0/paho.mqtt.c/src/MQTTAsyncUtils.c:1644
#15 0x0000555555ad20ea in MQTTAsync_receiveThread (n=0x555555eac6b0) at /home/mmoscicki/.cargo/registry/src/index.crates.io-6f17d22bba15001f/paho-mqtt-sys-0.9.0/paho.mqtt.c/src/MQTTAsyncUtils.c:2053
#16 0x00007ffff7c0b609 in start_thread (arg=<optimized out>) at pthread_create.c:477
#17 0x00007ffff79d9353 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95