Closed thalesmaoa closed 5 months ago
Yes, apologies. This library pre-dates async/await, and although it initially came out with sync and async clients, the original async client was more like the style of async used in languages like C and Java which used callbacks to signal that an operation was completed. Newer features were added without completely breaking the old styles, and now there are three or four different ways to use the library, and it has gotten somewhat confusing.
Generally, the use of callbacks is not in the "Rusty" async/tokio style, and although they likely will remain in the library for a while, under the hood, their use by applications will be phased out to reduce the confusion.
But, for your issue... keep in mind there are two separate clients, one blocking, client
, and the other asynchronous, async_client
. You should use async_client
with Tokio or any other executor (smol, etc).
But, currently, none functions in async_client
are marked as async
. Many of them, however, return a Token
or related object (DeliveryToken
, SubscribeToken
, etc). Those Token objects are actually Rust futures that need to be awaited.
So hopefully that helps...
async_client
Many thanks, I was really afraid of pursuing the wrong path.
Basically, I follow the example and came out with the following code. I needed to change the subscribe point since it wasn't resubscribing after reconnection, but now seems fine.
set_connected_callback
also, added an assert, since it can accept different vector sizes for subscribe_many, but it fails without an error.
Here is my code:
#[tokio::main]
async fn main() {
// Initialize the logger from the environment
env_logger::init();
info!("Iniciando o programa");
let cli: AsyncClient = mqtt_func::connect_to_mqtt().await;
// Just wait for incoming messages.
let mut k:u32 = 0;
loop {
thread::sleep(Duration::from_millis(1000));
k = heartbeat(k);
}
}
pub async fn connect_to_mqtt() -> mqtt::AsyncClient{
let mut rng = rand::thread_rng();
let random_number: u32 = rng.gen_range(0..=100);
let host: String = "tcp://".to_owned() + BROKER_ADDRESS + ":1883";
let create_options = mqtt::CreateOptionsBuilder::new()
.server_uri(&host)
.client_id(CLIENT_NAME.to_string() + &random_number.to_string())
.finalize();
// Create a client.
let cli: AsyncClient = mqtt::AsyncClient::new(create_options).unwrap_or_else(|err| {
info!("Error creating the client: {:?}", err);
process::exit(1);
});
// Set a closure to be called whenever the client connection is established.
// This is called after the initial connection and also after successful
// reconnections.
cli.set_connected_callback(|_| {
info!("Connected.");
assert_eq!(TOPICS.len(), QOS.len());
cli.subscribe_many(TOPICS, QOS);
});
// Set a closure to be called whenever the client loses the connection.
// It just reports the state to the user, and lets the library try to
// reconnect.
cli.set_connection_lost_callback(|_| {
info!("Connection lost. Attempting reconnect...");
});
// Attach a closure to the client to receive callback
// on incoming messages.
cli.set_message_callback(on_message);
// Define the set of options for the connection
let lwt = mqtt::Message::new("test/lwt", "[LWT] Async subscriber lost connection", 1);
let conn_opts = mqtt::ConnectOptionsBuilder::new_v3()
.keep_alive_interval(Duration::from_secs(20))
.clean_session(true)
.automatic_reconnect(MIN_RECONNECT, MAX_RECONNECT)
.user_name(USER_NAME.to_string())
.password(PASSWORD.to_string())
.will_message(lwt)
.finalize();
// Make the connection to the broker
info!("Connecting to the MQTT server...");
cli.connect_with_callbacks(conn_opts, on_connect_success, on_connect_failure);
cli
}
This appears answered. I will close.
Hi there, I'm trying to convert my python code to rust using this library. However, after reading all the examples, I'm confused of which approach I should use.
From what I could understand, use callback is not async and async_subscribe must be followed. However, I can't understand how to make it async connect and autoreconnect without blocking my code. In the example there is always the await or the block flag.
Basically, I need to create two functions that run concurrent.
I'm trying to use Tokio, but I get stuck with
which is blocking my code.
Any suggestions of what I should read to understand this library flow?