eclipse / paho.mqtt.rust

paho.mqtt.rust
Other
511 stars 102 forks source link

set_connected_callback is not called #213

Closed takaomag closed 8 months ago

takaomag commented 8 months ago

If I execute examples/dyn_subscribe.rs as is, it works well.

But if the lines 118-120 is changed from

    cli.set_connected_callback(|_cli: &mqtt::AsyncClient| {
        println!("Connected.");
    });

to

    cli.set_connected_callback(|_cli: &mqtt::AsyncClient| {
        println!("Connected: {}", _cli.is_connected());
    });

the callback is not called. In the callback, we MUST NOT access is_connected() ?

takaomag commented 8 months ago

Also, in case that subscribe_with_options(...).wait() is called in set_connected_callback, it blocks (never return).

    cli.set_connected_callback(|cli: &mqtt::AsyncClient| {
        let _ = cli.subscribe_with_options(
            "a", 
            paho_mqtt::SubscribeOptionsBuilder::new()
                        .no_local(true)
                        .retain_as_published(false)
                        .retain_handling(paho_mqtt::RetainHandling::SendRetainedOnSubscribe)
                        .finalize(),
           None,
        ).wait()
    });
fpagliughi commented 8 months ago

The callbacks are run in the context of the library thread that manages the network connection. So you definitely can not call a blocking function on the client from inside a callback. That will always cause a deadlock. But you can call an asynchronous function to start a transaction and then check on it later - perhaps by supplying callback to fire when it succeeds or fails.

So, in other words, you can call cli.subscribe_with_options(). But you can not call .wait() on the token that it returns to block waiting for it to complete. Guaranteed deadlock.

But I don't know why cli.is_connected() would cause a problem. It should return immediately unless there's an internal mutex deadlock, which would be a bug that should be fixed.

But you definitely don't need to call is_connected() from the on_connected() callback. The whole point of the callback is to alert you that the client just connected. In that case, is_connected() should always return true.

takaomag commented 8 months ago

Thanks.

I feel it should be documented because it is dangerous.

I just want to ensure that one or more subscriptions are always there on an available MQTT connection. If one subscription request failed or timeout, I want to disconnect the connection and raise an error (or retry the connection).

As you say, in async mode, if I can not call wait(), please let me know the best way to do it.

takaomag commented 8 months ago

Maybe, an async task should be spawned in the connected callback and other waiters should check it. But it is a little annoying.

fpagliughi commented 8 months ago

Agreed, it’s annoying. But if the library defers it to a secondary queue/channel, you can lose coherence when you fall behind.

The callback capability is powerful and allows you to write code with a purely event-driven model, but it requires all the code to be non-blocking, and have the application deal with threading issues manually. This is obviously not the way Rust programming has evolved.

This library’s initial API pre-dates the Rust async/await. If I were starting over today, I probably wouldn’t implement the callbacks at all. I would just make the calls async and provide an enumeration for notifications coming back from the library and/or server - new message, disconnect packet (with ResonCode), connected, or lost connection.

I don't plan on removing the callbacks any time soon, but I will likely add the “full consumer” idea in the next release and start to phase out the callbacks from the sample code.

fpagliughi commented 8 months ago

As you say, in async mode, if I can not call wait(), please let me know the best way to do it.

Oh, to clarify, you can definitely wait on an action completing in an async block. The Token objects returned by the async client implement std::Future.

But the callbacks are not async blocks. They are regular (non-async) functions run in the context of the one-and-only library thread. That’s the same thread that is used to signal all the other futures when they complete. So if you block that thread, the futures don’t complete and you deadlock.

takaomag commented 8 months ago

you can definitely wait on an action completing in an async block.

Yes I know it. The reason why cli.subscribe_with_options().wait() is that I want to make subscription(s) established immediately after a ConnAck. Currently, the paho-c team seems to suggest such way.

Some the other rust MQTT libraries use buffered channel architecture, but which lead to coherence problem in case of failure.

I understand the current design which can not call a blocking function on the client inside a callback.

I'm looking forward to your “full consumer” idea. Thanks!