bytebeamio / rumqtt

The MQTT ecosystem in rust
Apache License 2.0
1.57k stars 244 forks source link

Non-blocking receive never notifies #820

Open IsaacDynamo opened 6 months ago

IsaacDynamo commented 6 months ago

Expected Behavior

Expected case 1, 2 and 3 to print the same notifications.

Current Behavior

Only case 1 prints notifications, case 2 and 3 show no notifications.

Context

Trying to integrate non-blocking receive into an existing event loop. But try_recv() and recv_timeout() never generate meaning full notifications. recv() does generate notifications, but is blocking.

use rumqttc::{Client, MqttOptions, QoS, RecvTimeoutError, TryRecvError};
use std::time::Duration;

const CASE: u8 = 1;

fn main() {
    let mut mqttoptions = MqttOptions::new("rumqtt-sync", "test.mosquitto.org", 1883);
    mqttoptions.set_keep_alive(Duration::from_secs(5));

    let (client, mut connection) = Client::new(mqttoptions, 10);
    client.subscribe("hello/rumqtt", QoS::AtMostOnce).unwrap();

    loop {
        // My own event loop

        match CASE {
            1 => {
                // Output:
                // Notification = Ok(Ok(Incoming(ConnAck(ConnAck { session_present: false, code: Success }))))
                // Notification = Ok(Ok(Outgoing(Subscribe(1))))
                // Notification = Ok(Ok(Incoming(SubAck(SubAck { pkid: 1, return_codes: [Success(AtMostOnce)] }))))
                // Notification = Ok(Ok(Incoming(Publish(Topic = hello/rumqtt, Qos = AtMostOnce, Retain = true, Pkid = 0, Payload Size = 13))))
                let notification = connection.recv();
                println!("Notification = {:?}", notification);
            }
            2 => {
                // No output
                let notification = connection.try_recv();
                if !matches!(notification, Err(TryRecvError::Empty)) {
                    println!("Notification = {:?}", notification);
                }
                std::thread::sleep(Duration::from_millis(1));
            }
            3 => {
                // No output
                let notification = connection.recv_timeout(Duration::from_millis(1));
                if !matches!(notification, Err(RecvTimeoutError::Timeout)) {
                    println!("Notification = {:?}", notification);
                }
            }
            _ => (),
        }
    }
}
[dependencies]
rumqttc = "0.24.0"
uname -a
Linux 5.10.16.3-microsoft-standard-WSL2 #1 SMP Fri Apr 2 22:23:49 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux
swanandx commented 5 months ago

hey, thanks for reporting.

in case 3, you can increase the timeout duration, e.g. connection.recv_timeout(Duration::from_secs(1)), it will work.

for case 2, i will get back to you!

de-sh commented 5 months ago

Requests take a while to be processed, so both try_recv and recv_timeout(1 ms) will fail to respond in time and hence the connection is never established, this is unfortunate, but it seems like atleast in the case of sync code, we should deprecate try_recv and put a note in place to deter use of extremely small timeouts for the other.

We require a sync specific EventLoop, but that is clearly not on priority, so that's upto anyone interested to contribute.

IsaacDynamo commented 5 months ago

From a users perspective the timeout in recv_timeout() should apply to reception of notification, and be decoupled from timeouts of connections that are managed inside the stack.

If try_recv() gets removed, I would reach for recv_timeout(0ms) which would still be wrong. Adding a note would be helpful, but what would be an acceptable, but small timeout? It all seems a bit finicky, and leads to surprising behavior.

For now I moved the connection to a separate thread, and post notification into a queue that can be read non-blocking from my own event loop. This works without issues so far.

let (mut client, mut connection) = Client::new(mqttoptions, 100);
let (send_event, recv_event)= std::sync::mpsc::channel();
std::thread::spawn(move || {
    for notification in connection.iter() {
        send_event.send(notification).unwrap();
    }
});

... 

loop {
    if let Ok(notification) = recv_event.try_recv() {
        println!("Notification = {:?}", notification);
    }
}     

Maybe it is possible to do something similar in the Connection implementation. This could fix to the issue, without deprecation.