eclipse / paho.mqtt.rust

paho.mqtt.rust
Other
511 stars 102 forks source link

Recommended pattern for "clean start first only" #219

Open cartertinney opened 6 months ago

cartertinney commented 6 months ago

Paho libraries in other languages I have used support a clean start for the client being set to "first only" (i.e. clean start the first connect, but if connection is lost, it will restore connection without a clean start and keep subscriptions intact, as well as receive missed messages) It does not appear to me that the Rust implementation has this feature, and as a result, this also makes automatic reconnect, as well as the reconnect APIs rather unusable for my purposes.

What are the recommended patterns for implementing this functionality? I do not see any samples that demonstrate it. As far as I can tell the most viable pattern one would be to use the message stream to detect a disconnect and invoke connect() again using a cloned and modified ConnectionOptions, but are there alternative recommendations or patterns? (this approach is not ideal for my application structure)

fpagliughi commented 6 months ago

I haven't seen that implemented anywhere, but it does seem like it would be a nice feature.

Which library has it? I'd like to maintain some consistency when possible. The only thing I find is talk of adding it to the mosquitto client, but it looks like that issue is still open.

I'm curious about the use case, though. If the program/board/system were to crash and restart, you want to clear the persistent session, but not if the network goes down?

Most of the auto reconnect functionality is handled by the underlying Paho C library, so we would need to add the functionality there and wrap it, with a few mods necessary here. We can put a feature request up there.

The connect options are cached in the client, and there was a recent request to expose them publicly so that applications don't need to keep their own copy. (The request may have been in the C++ library, but I'm trying to keep them roughly equivalent).

But you're essentially correct. The way to do this is to detect the disconnect and just call connect() yourself. I almost universally use manual reconnects in all my applications, and show how to do that in most of the subscriber examples. In those I use a call to reconnect(), but in any of those you can replace that with a call to connect() taking modified connect options. I can update one of the sample apps as a concrete example.

The RPC math server might be a good one. With RPC's I can actually see wanting to clear any old requests from the persistent session when starting up, but then maintaining them while running, even if there are short network drops. That makes sense.

cartertinney commented 6 months ago

@fpagliughi

Which library has it? I'd like to maintain some consistency when possible.

Paho for Python has the feature, and, in fact, it is the default setting if not specified by the user. Quite useful, I've found.

I'm curious about the use case, though. If the program/board/system were to crash and restart, you want to clear the persistent session, but not if the network goes down?

Precisely. We're writing long-running applications that are not expected to not crash or stop until a session is complete but are expected to experience varying levels of network reliability.

To follow up on how it might work, would it be possible to get an example showing the manual connect being done via connection loss callback? I apologize, but I'm fairly new to Rust, and I've been struggling to get a ConnectOptions struct that can be passed to .connect() into the connection loss callback due to it not implementing the Copy trait (I think). For the purposes of my application, it would be highly preferable to use connection loss callbacks over streams if at all possible.

fpagliughi commented 6 months ago

I've been struggling to get a ConnectOptions struct that can be passed to .connect() into the connection loss callback

You can move a value into a closure. In this case, just be sure to move a clone of the options into the closure, because you still need them for the initial connect.

let conn_opts = ConnectOptionsBuilder::new()...;

cli.set_connection_lost_callback({
    let mut conn_opts` = conn_opts.clone();
    conn_opts.set_clean_start(false);

    move |cli| {
        info!("Connection lost");
        cli.connect(conn_opts);
        // ...
    }
});

This is a little "trick" to use a code block around the closure to create local clones that use the same name as the original value to keep names simple,

But in the next release, you shouldn't need to do this, because I'll let you get a reference to the connect options that the client is holding, so you can just get and clone those in the callback:

// *** Coming soon ***

cli.set_connection_lost_callback(|cli| {
    info!("Connection lost");
    let mut conn_opts = cli.connect_options().clone();
    conn_opts.set_clean_start(false);
    cli.connect(conn_opts);
    // ...
});

But there's an important limitation here.... You can't block in a callback

So you can't call wait() on the returned token to find out if the connection attempt succeeded. If you use callbacks, you need to go with a fully event-driven paradigm, using the connect_with_callbacks() variety:

cli.connect_with_callbacks(onn_opts, on_conn_success, on_conn_failure);

The dyn_subscribe.rs example shows connection callbacks.

Personally I'm starting to view callbacks as a legacy API, and am moving to just async and streams in my own applications. (Remember the library with callbacks predates async/await by a few years).

cartertinney commented 6 months ago

@fpagliughi

I'm actually doing exactly what you suggest already, and I'm getting the following error:

cannot move out of `conn_opts`, a captured variable in an `FnMut` closure
move occurs because `conn_opts` has type `ConnectOptions`, which does not implement the `Copy` trait

My understanding is that conn_opts (or a clone of it) cannot be moved into the closure because the struct definition itself hasn't implemented the ability to do it. Am I missing something here? I am using the latest release (0.12.3).

An investigation of the source code suggests that Copy (required for the use of the move keyword) has not been implemented for ConnectOptions (although, weirdly, has been for some other options structures, e.g. SubscribeOptions). I can't seem to implement the trait on it myself either, since Copy is external (standard library) and ConnectOptions is also external (Paho)

Source code that won't compile below:

use paho_mqtt as mqtt;
use std::{thread, time::Duration};
use futures::executor::block_on;

fn main() {
    let create_opts = mqtt::CreateOptionsBuilder::new()
        .server_uri("test.mosquitto.org:1883")
        .client_id("my_client_id")
        .finalize();

    let conn_opts = mqtt::ConnectOptionsBuilder::new_v5()
        .clean_start(true)
        .keep_alive_interval(Duration::from_secs(3))
        .properties(mqtt::properties![mqtt::PropertyCode::SessionExpiryInterval => 3600])
        .finalize();

    let paho_client =  mqtt::AsyncClient::new(create_opts).expect("err!!!");

    paho_client.set_connection_lost_callback(
    {

            let mut conn_opts = conn_opts.clone();
            conn_opts.set_clean_start(false);

            move |cli: &mqtt::AsyncClient| {
                println!("Connection Lost! - Reconnecting!");
                cli.connect(conn_opts);
            }
        }
    );

    block_on(async {
        paho_client.connect(conn_opts).await.expect("Connection failed");
    });

    loop {
        thread::sleep(Duration::from_secs(2));
    }

}

Also, do you have a rough timeline for your next release? I would love to use that new feature you mentioned. It would be significantly easier for my purposes.

fpagliughi commented 5 months ago

Oops. So sorry. I realized my mistake as I was writing the example, but I got distracted by the kid and didn't fix it.

The error is not complaining about moving the options into the closure, but rather moving the variable out of the closure! The Fn and FnMut are for functions that are meant to be called multiple times, as in this case where the connection can be lost, re-established, and then lost again. So you can't consume variables in the closure... as I am doing by giving (moving) the options to the connect() function.

The line in the closure should be:

cli.connect(conn_opts.clone());

So, all together:

use futures::executor::block_on;
use paho_mqtt as mqtt;
use std::{thread, time::Duration};

fn main() {
    let create_opts = mqtt::CreateOptionsBuilder::new()
        .server_uri("test.mosquitto.org:1883")
        .client_id("my_client_id")
        .finalize();

    let conn_opts = mqtt::ConnectOptionsBuilder::new_v5()
        .clean_start(true)
        .keep_alive_interval(Duration::from_secs(3))
        .properties(mqtt::properties![mqtt::PropertyCode::SessionExpiryInterval => 3600])
        .finalize();

    let paho_client = mqtt::AsyncClient::new(create_opts).expect("err!!!");

    paho_client.set_connection_lost_callback({
        let mut conn_opts = conn_opts.clone();
        conn_opts.set_clean_start(false);

        move |cli: &mqtt::AsyncClient| {
            println!("Connection Lost! - Reconnecting!");
            cli.connect(conn_opts.clone());
        }
    });

    block_on(async {
        paho_client
            .connect(conn_opts)
            .await
            .expect("Connection failed");
    });

    loop {
        thread::sleep(Duration::from_secs(2));
    }
}