eclipse / paho.mqtt.rust

paho.mqtt.rust
Other
511 stars 102 forks source link

[Question] Which should be the best practice for async? #225

Closed thalesmaoa closed 2 months ago

thalesmaoa commented 4 months ago

Hi there, I'm trying to convert my python code to rust using this library. However, after reading all the examples, I'm confused of which approach I should use.

From what I could understand, use callback is not async and async_subscribe must be followed. However, I can't understand how to make it async connect and autoreconnect without blocking my code. In the example there is always the await or the block flag.

Basically, I need to create two functions that run concurrent.

fn mqtt(){} // Connect and reconnect automatically
fn processing(){} // This a function with a loop that will publish if server is connected
fn main{
// Crete a thread for mqtt
// Create a thread for processing 
}

I'm trying to use Tokio, but I get stuck with

cli.connect

which is blocking my code.

Any suggestions of what I should read to understand this library flow?

fpagliughi commented 4 months ago

Yes, apologies. This library pre-dates async/await, and although it initially came out with sync and async clients, the original async client was more like the style of async used in languages like C and Java which used callbacks to signal that an operation was completed. Newer features were added without completely breaking the old styles, and now there are three or four different ways to use the library, and it has gotten somewhat confusing.

Generally, the use of callbacks is not in the "Rusty" async/tokio style, and although they likely will remain in the library for a while, under the hood, their use by applications will be phased out to reduce the confusion.

But, for your issue... keep in mind there are two separate clients, one blocking, client, and the other asynchronous, async_client. You should use async_client with Tokio or any other executor (smol, etc).

But, currently, none functions in async_client are marked as async. Many of them, however, return a Token or related object (DeliveryToken, SubscribeToken, etc). Those Token objects are actually Rust futures that need to be awaited.

So hopefully that helps...

  1. Use async_client
  2. Await any function that returns a Token object
thalesmaoa commented 3 months ago

Many thanks, I was really afraid of pursuing the wrong path.

Basically, I follow the example and came out with the following code. I needed to change the subscribe point since it wasn't resubscribing after reconnection, but now seems fine.

set_connected_callback

also, added an assert, since it can accept different vector sizes for subscribe_many, but it fails without an error.

Here is my code:

#[tokio::main]
async fn main() {
    // Initialize the logger from the environment
    env_logger::init();
    info!("Iniciando o programa");

    let cli: AsyncClient = mqtt_func::connect_to_mqtt().await;

    // Just wait for incoming messages.
    let mut k:u32 = 0;
    loop {
        thread::sleep(Duration::from_millis(1000));
        k = heartbeat(k);
        }
    }

pub async fn connect_to_mqtt() -> mqtt::AsyncClient{
    let mut rng = rand::thread_rng();
    let random_number: u32 = rng.gen_range(0..=100);

    let host: String = "tcp://".to_owned() + BROKER_ADDRESS + ":1883";
    let create_options = mqtt::CreateOptionsBuilder::new()
        .server_uri(&host)
        .client_id(CLIENT_NAME.to_string() + &random_number.to_string())
        .finalize();

    // Create a client.
    let cli: AsyncClient = mqtt::AsyncClient::new(create_options).unwrap_or_else(|err| {
        info!("Error creating the client: {:?}", err);
        process::exit(1);
    });

    // Set a closure to be called whenever the client connection is established.
    // This is called after the initial connection and also after successful
    // reconnections.
    cli.set_connected_callback(|_| {
        info!("Connected.");
        assert_eq!(TOPICS.len(), QOS.len());
        cli.subscribe_many(TOPICS, QOS);
    });

    // Set a closure to be called whenever the client loses the connection.
    // It just reports the state to the user, and lets the library try to
    // reconnect.
    cli.set_connection_lost_callback(|_| {
        info!("Connection lost. Attempting reconnect...");
    });

    // Attach a closure to the client to receive callback
    // on incoming messages.
    cli.set_message_callback(on_message);

    // Define the set of options for the connection
    let lwt = mqtt::Message::new("test/lwt", "[LWT] Async subscriber lost connection", 1);

    let conn_opts = mqtt::ConnectOptionsBuilder::new_v3()
        .keep_alive_interval(Duration::from_secs(20))
        .clean_session(true)
        .automatic_reconnect(MIN_RECONNECT, MAX_RECONNECT)
        .user_name(USER_NAME.to_string())
        .password(PASSWORD.to_string())
        .will_message(lwt)
        .finalize();

    // Make the connection to the broker
    info!("Connecting to the MQTT server...");
    cli.connect_with_callbacks(conn_opts, on_connect_success, on_connect_failure);
    cli

}
fpagliughi commented 2 months ago

This appears answered. I will close.