eclipse / paho.mqtt.rust

paho.mqtt.rust
Other
511 stars 102 forks source link

Client get stuck when publishing message inside mqtt callback subscription function. #212

Closed ram-maker closed 9 months ago

ram-maker commented 9 months ago

My general requirement of the program is: 1.Creating Mqtt client 2.subscribing to mqtt topic 3.recieveing payload from subscribed topic via callback function 4.publishing to specific topic inside callback function. However when I try to publish somwthing inside callback function. It gets stuck.

Here is the full program use paho_mqtt::{ AsyncClient, CreateOptionsBuilder, ConnectOptionsBuilder, Message, MessageBuilder, }; use std::error::Error; use std::time::Duration; use tokio::time::sleep; use tokio::task;

fn message_callback(client: &AsyncClient, message: Option) { if let Some(payload) = &message { client.publish(message_for_new_user).wait();//My program get stuck here in wait section, If I remove wait() the program doesnt stuck.However that doesnt publish the message on given topic } }

async fn subscribe_to_topic(client: &AsyncClient) -> Result<(), Box> { let topic_to_subscribe = "some subscribing topic"; client.subscribe( topic_to_subscribe, 0, ).await?; client.set_message_callback(message_callback); Ok(()) }

[tokio::main]

async fn main() -> Result<(), Box> { let mqtt_host = "some host; let mqtt_username = "some username"; let mqtt_password = "some password"; let client_id = get_client_id(); let create_opts = CreateOptionsBuilder::new() .server_uri(mqtt_host) .client_id("some client id") .finalize(); let client = AsyncClient::new(create_opts)?; let conn_opts = ConnectOptionsBuilder::new() .user_name(mqtt_username) .password(mqtt_password) .finalize(); client.connect(conn_opts).await?; subscribe_to_topic(&client).await?;

loop {
}

}

fpagliughi commented 9 months ago

Yes. You can not cal a blocking function like .wait() from inside a callback. It will deadlock the thread, as you have seen.

The callbacks are fired from the thread of the Paho library that is processing the incoming packets. So when you block it, you essentially stop the library from working.

But removing the .wait() should work. This works fine for me, running on Ubuntu, with a broker working on localhost (but chose any other broker and it should still work...)

use paho_mqtt::{
    AsyncClient, ConnectOptionsBuilder, CreateOptionsBuilder, Message, Result,
};

fn message_callback(client: &AsyncClient, message: Option<Message>) {
    if let Some(_msg) = &message {
        let new_msg = Message::new("echo", "Some payload", 1);
        let _ = client.publish(new_msg);
    }
}

async fn subscribe_to_topic(client: &AsyncClient) -> Result<()> {
    let topic_to_subscribe = "test";
    client.subscribe(topic_to_subscribe, 0).await?;
    client.set_message_callback(message_callback);
    Ok(())
}

#[tokio::main]
async fn main() -> Result<()> {
    let mqtt_host = "localhost";
    let create_opts = CreateOptionsBuilder::new()
        .server_uri(mqtt_host)
        .client_id("xyz")
        .finalize();
    let client = AsyncClient::new(create_opts)?;
    let conn_opts = ConnectOptionsBuilder::new()
        .finalize();
    client.connect(conn_opts).await?;
    subscribe_to_topic(&client).await?;

    loop {}
}

with my Cargo dependencies:

[dependencies]
paho-mqtt = "0.12.3"
tokio = { version = "1", features = ["full"] }
fpagliughi commented 9 months ago

...but also note that callbacks are from the original version of this library, which predates async/await and modern futures. These days, you don't need to use callbacks. Using a stream of messages may be easier, less error-prone, and more common usage with tokio.

The app could look like this:

use paho_mqtt::{AsyncClient, ConnectOptionsBuilder, CreateOptionsBuilder, Message, Result};
use futures_util::stream::StreamExt;

#[tokio::main]
async fn main() -> Result<()> {
    let mqtt_host = "localhost";
    let create_opts = CreateOptionsBuilder::new()
        .server_uri(mqtt_host)
        .client_id("xyz")
        .finalize();

    let mut client = AsyncClient::new(create_opts)?;

    let mut strm = client.get_stream(None);

    let conn_opts = ConnectOptionsBuilder::new().finalize();
    client.connect(conn_opts).await?;

    let topic_to_subscribe = "test";
    client.subscribe(topic_to_subscribe, 0).await?;

    while let Some(msg_opt) = strm.next().await {
        if let Some(_msg) = msg_opt {
            let new_msg = Message::new("echo", "Some payload", 1);
            client.publish(new_msg).await?;
        }
    }

    Ok(())
}

with the additional dependency on futures_util:

futures-util = "0.3"

The async_subscribe.rs example is a slightly more complete example of this method.

ram-maker commented 9 months ago

...but also note that callbacks are from the original version of this library, which predates async/await and modern futures. These days, you don't need to use callbacks. Using a stream of messages may be easier, less error-prone, and more common usage with tokio.

The app could look like this:

use paho_mqtt::{AsyncClient, ConnectOptionsBuilder, CreateOptionsBuilder, Message, Result};
use futures_util::stream::StreamExt;

#[tokio::main]
async fn main() -> Result<()> {
    let mqtt_host = "localhost";
    let create_opts = CreateOptionsBuilder::new()
        .server_uri(mqtt_host)
        .client_id("xyz")
        .finalize();

    let mut client = AsyncClient::new(create_opts)?;

    let mut strm = client.get_stream(None);

    let conn_opts = ConnectOptionsBuilder::new().finalize();
    client.connect(conn_opts).await?;

    let topic_to_subscribe = "test";
    client.subscribe(topic_to_subscribe, 0).await?;

    while let Some(msg_opt) = strm.next().await {
        if let Some(_msg) = msg_opt {
            let new_msg = Message::new("echo", "Some payload", 1);
            client.publish(new_msg).await?;
        }
    }

    Ok(())
}

with the additional dependency on futures_util:

futures-util = "0.3"

The async_subscribe.rs example is a slightly more complete example of this method.

Thank you . This asynchronous code seems to be get done for my requirement.

fpagliughi commented 9 months ago

Good. I should document the examples better. There are a lot of them now, so it's hard to find exactly what you're looking for.

Feel free to re-open this if there are any more problems.