obabec / rust-mqtt

Rust native mqtt client for both std and no_std environmnents.
MIT License
43 stars 24 forks source link

Connection dropping when no new messages in subscribed topic for some time #36

Closed holotrack closed 9 months ago

holotrack commented 9 months ago

I connecting such way to my MQTT server:

let mut client =
        MqttClient::<_, 5, _>::new(socket, &mut write_buffer, 80, &mut recv_buffer, 80, config);
    debug!("BROKER CONNECTING");
    client.connect_to_broker().await.unwrap();
    debug!("BROKER AFTER CONNECTING");
    let mut topic_names = Vec::<_, 2>::new();
    topic_names.push("switch_0").unwrap();
    topic_names.push("switch_1").unwrap();

    client.subscribe_to_topics(&topic_names).await.unwrap();
    Timer::after_millis(500).await;

    loop {

        Timer::after_millis(500).await;
        let (topic, message) = match client.receive_message().await {
            Ok(msg) => msg,
            Err(err) => {
                error!("ERROR OCCURED: {}", err);
                continue;
            }
        };
        info!("topic: {}, message: {}", topic, message);
    }

Connection is working and receiving messages but when there is no new message in MQTT subscirbed topic for around 30 sec im getting NetworkError. I tried to use such solution:

 select_biased! {
            receive_res = client.receive_message().fuse() => {
                let (topic, message) = receive_res.unwrap();
                info!("topic: {}, message: {}", topic, message);
            }
            result = client.send_ping().fuse() => {
                info!("Ping: {}", result);
            }
        };

to sustain connection but i can not borrow client twice (i think this could be need to handle it this way: https://github.com/obabec/rust-mqtt/issues/23). Is it normal behavior of such dropping connection when there is no new messages? Shouldn't that send_ping() be implemented inside of that redeive_message functions?

Or im just using this library wrong way?

lulf commented 9 months ago

I think this is the same usability issue as #23 in practice. I think explicitly requiring the user to run the ping is one of the things that saves resource usage on constrained devices (that doesn't need it), but the way it is now I'm not sure if it's achievable without refactoring the client.

lulf commented 9 months ago

@holotrack Here is an example of how to explicitly send a ping after some time:

    use embassy_futures::select::{Either, select};
    use embassy_time::{Timer, Duration};
    loop {
        match select(client.receive_message(), Timer::after(Duration::from_secs(2))).await {
            Either::First(msg) => {
                // Received message!
            }
            Either::Second(_timeout) => {
                // Send ping
                client.send_ping().await.unwrap();
            }
        }
    }
holotrack commented 9 months ago

@lulf Thank you very much. My problem is solved here is my working match/select function:

match select(
            client.receive_message(),
            Timer::after(Duration::from_secs(2)),
        )
        .await
        {
            Either::First(msg) => {
                let (topic, message) = msg.unwrap();
                info!("topic: {}, message: {}", topic, message);
            }
            Either::Second(_timeout) => {
                info!("sending ping");
                client.send_ping().await.unwrap();
            }
        }