Closed mickdekkers closed 4 years ago
Thanks for checking out the crate!
Yes, Client
is not Sync
. I would've thought it should be Send
but now I think about it I'm not sure how I can even offer that.
The easiest way to solve your problem would probably be to have two Client
s and so two MQTT connections: one for publishing and one for subscribing. The reason is that Client::read_subscriptions
deliberately takes &mut self
(or else what would simultaneous read_subscriptions
calls return?) and so returns a Future
that is mutually exclusive from all other async fn
Future
s on Client
, including Client::publish
.
Thanks again for your help! I've been tinkering with this on and off for the past few weekends, and I've mostly got it working now š
Using tokio::task::LocalSet
worked around the issue for the most part. However, it occurred to me that I would need to break the subscription reading loop at some point based on an outside signal (e.g. CTRL+C or program shutdown).
To accomplish this, I figured a tokio::sync::oneshot
would do nicely. Wrapping the Client::read_subscriptions
call in a Stream
allows you to use futures::stream::select_all
to combine it with the oneshot
so you can have a single loop that consumes the combined stream, handling subscription messages and the oneshot
message as they come in.
Unfortunately, I couldn't get futures::stream::select_all
to accept the subscription stream. The compilation failed with a similar error as in my original post:
error[E0277]: `std::cell::RefCell<mqtt_async_client::util::free_pid_list::FreePidList>` cannot be shared between threads safely
--> src\main.rs:51:67
|
51 | sub_stream.map(|result| StreamEvent::Sub(result)).boxed(),
| ^^^^^ `std::cell::RefCell<mqtt_async_client::util::free_pid_list::FreePidList>` cannot be shared between threads safely
|
= help: within `mqtt_async_client::client::client::Client`, the trait `std::marker::Sync` is not implemented for `std::cell::RefCell<mqtt_async_client::util::free_pid_list::FreePidList>`
= note: required because it appears within the type `mqtt_async_client::client::client::Client`
= note: required because of the requirements on the impl of `std::marker::Send` for `&mqtt_async_client::client::client::Client`
= note: required because it appears within the type `for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10, 't11> {&'r mut mqtt_async_client::client::client::Client, &'s mut mqtt_async_client::client::client::IoTaskHandle, mqtt_async_client::client::client::IoTaskHandle, &'t0 mut mqtt_async_client::client::client::IoTaskHandle, &'t1 mut tokio::sync::mpsc::bounded::Receiver<mqttrs::packet::Packet>, tokio::sync::mpsc::bounded::Receiver<mqttrs::packet::Packet>, impl core::future::future::Future, impl core::future::future::Future, (), mqttrs::packet::Packet, mqttrs::packet::Packet, mqttrs::publish::Publish, mqttrs::publish::Publish, mqttrs::utils::QosPid, mqttrs::utils::Pid, &'t4 mqtt_async_client::client::client::Client, &'t5 mut mqtt_async_client::client::client::Client, mqttrs::utils::Pid, mqttrs::packet::Packet, &'t6 mqttrs::packet::Packet, &'t7 mqttrs::packet::Packet, impl core::future::future::Future, impl core::future::future::Future, ()}`
= note: required because it appears within the type `[static generator@DefId(28:157 ~ mqtt_async_client[de98]::client[0]::client[0]::{{impl}}[0]::read_subscriptions[0]::{{closure}}[0]) 0:&mut mqtt_async_client::client::client::Client for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10, 't11> {&'r mut mqtt_async_client::client::client::Client, &'s mut mqtt_async_client::client::client::IoTaskHandle, mqtt_async_client::client::client::IoTaskHandle, &'t0 mut mqtt_async_client::client::client::IoTaskHandle, &'t1 mut tokio::sync::mpsc::bounded::Receiver<mqttrs::packet::Packet>, tokio::sync::mpsc::bounded::Receiver<mqttrs::packet::Packet>, impl core::future::future::Future, impl core::future::future::Future, (), mqttrs::packet::Packet, mqttrs::packet::Packet, mqttrs::publish::Publish, mqttrs::publish::Publish, mqttrs::utils::QosPid, mqttrs::utils::Pid, &'t4 mqtt_async_client::client::client::Client, &'t5 mut mqtt_async_client::client::client::Client, mqttrs::utils::Pid, mqttrs::packet::Packet, &'t6 mqttrs::packet::Packet, &'t7 mqttrs::packet::Packet, impl core::future::future::Future, impl core::future::future::Future, ()}]`
= note: required because it appears within the type `std::future::GenFuture<[static generator@DefId(28:157 ~ mqtt_async_client[de98]::client[0]::client[0]::{{impl}}[0]::read_subscriptions[0]::{{closure}}[0]) 0:&mut mqtt_async_client::client::client::Client for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10, 't11> {&'r mut mqtt_async_client::client::client::Client, &'s mut mqtt_async_client::client::client::IoTaskHandle, mqtt_async_client::client::client::IoTaskHandle, &'t0 mut mqtt_async_client::client::client::IoTaskHandle, &'t1 mut tokio::sync::mpsc::bounded::Receiver<mqttrs::packet::Packet>, tokio::sync::mpsc::bounded::Receiver<mqttrs::packet::Packet>, impl core::future::future::Future, impl core::future::future::Future, (), mqttrs::packet::Packet, mqttrs::packet::Packet, mqttrs::publish::Publish, mqttrs::publish::Publish, mqttrs::utils::QosPid, mqttrs::utils::Pid, &'t4 mqtt_async_client::client::client::Client, &'t5 mut mqtt_async_client::client::client::Client, mqttrs::utils::Pid, mqttrs::packet::Packet, &'t6 mqttrs::packet::Packet, &'t7 mqttrs::packet::Packet, impl core::future::future::Future, impl core::future::future::Future, ()}] = note: required because it appears within the type `impl core::future::future::Future`
= note: required because it appears within the type `impl core::future::future::Future`
= note: required because it appears within the type `for<'r, 's, 't0> {&'r mut mqtt_async_client::client::client::Client, mqtt_async_client::client::client::Client, impl core::future::future::Future, impl core::future::future::Future, ()}`
= note: required because it appears within the type `[static generator@src\main.rs:42:23: 45:18 client:&mut mqtt_async_client::client::client::Client for<'r, 's, 't0> {&'r mut mqtt_async_client::client::client::Client, mqtt_async_client::client::client::Client, impl core::future::future::Future, impl core::future::future::Future, ()}]`
= note: required because it appears within the type `std::future::GenFuture<[static generator@src\main.rs:42:23: 45:18 client:&mut mqtt_async_client::client::client::Client for<'r, 's, 't0> {&'r mut mqtt_async_client::client::client::Client, mqtt_async_client::client::client::Client, impl core::future::future::Future, impl core::future::future::Future, ()}]>`
= note: required because it appears within the type `impl core::future::future::Future`
= note: required because it appears within the type `std::option::Option<impl core::future::future::Future>`
= note: required because it appears within the type `futures_util::stream::unfold::Unfold<&mut mqtt_async_client::client::client::Client, [closure@src\main.rs:41:62: 46:14], impl core::future::future::Future>`
= note: required because it appears within the type `futures_util::stream::stream::map::Map<futures_util::stream::unfold::Unfold<&mut mqtt_async_client::client::client::Client, [closure@src\main.rs:41:62: 46:14], impl core::future::future::Future>, [closure@src\main.rs:51:32: 51:65]>`
I've been watching the repo and saw you pushed https://github.com/fluffysquirrels/mqtt-async-client-rs/commit/6010ac6c5c87c10a418994a5c353f387b9adfd54 yesterday which makes Client
Send
. I just tried it and this fixed the above compilation error š
I was also able to get rid of the use of LocalSet
and go back to using tokio::spawn
again.
This is what I've got so far:
use futures::{
future,
stream::{self, StreamExt},
}; // 0.3.4
use mqtt_async_client::{
client::{Client, Publish, QoS, ReadResult, Subscribe, SubscribeTopic},
Error,
}; // rev = "6010ac6c5c87c10a418994a5c353f387b9adfd54"
use std::time::Duration;
use tokio::sync; // 0.2.11
const HOST: &str = "test.mosquitto.org";
const TOPIC: &str = "dee90669-79bd-458f-bead-3a11afbc2354";
const PORT: u16 = 1883;
#[derive(Debug)]
enum StreamEvent {
Sub(Result<ReadResult, Error>),
Shutdown,
}
// For debugging purposes, to display the payload as text
#[derive(Debug)]
struct Utf8Message<'a> {
topic: &'a str,
payload: std::borrow::Cow<'a, str>,
}
impl<'a> Utf8Message<'a> {
fn from_read_result(read_result: &ReadResult) -> Utf8Message {
Utf8Message {
topic: read_result.topic(),
payload: String::from_utf8_lossy(read_result.payload()),
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (sub_stop_tx, sub_stop_rx) = sync::oneshot::channel();
let sub_handle = tokio::spawn(async {
let mut sub_client = Client::builder()
.set_host(String::from(HOST))
.set_port(PORT)
.build()?;
sub_client.connect().await?;
sub_client
.subscribe(Subscribe::new(vec![SubscribeTopic {
topic_path: String::from(TOPIC),
qos: QoS::AtLeastOnce,
}]))
.await?;
let sub_stream = stream::unfold(&mut sub_client, |client| {
async {
let result = client.read_subscriptions().await;
Some((result, client))
}
});
let shutdown_stream = stream::once(sub_stop_rx);
let mut combined_stream = stream::select_all(vec![
sub_stream.map(StreamEvent::Sub).boxed(),
shutdown_stream.map(|_| StreamEvent::Shutdown).boxed(),
]);
while let Some(event) = combined_stream.next().await {
match event {
StreamEvent::Sub(read_result) => {
match read_result {
Ok(read_result) => println!(
"Subscription client received message: {:?}",
Utf8Message::from_read_result(&read_result)
),
Err(Error::Disconnected) => {
println!("Subscription client was disconnected.")
}
Err(err) => println!("Subscription client error: {:?}", err),
};
}
StreamEvent::Shutdown => {
println!("Shutting down.");
// FIXME: clean shutdown, need sub_client here
// sub_client.disconnect().await?;
break;
}
}
}
Ok::<(), Error>(())
});
let pub_handle = tokio::spawn(async {
let mut pub_client = Client::builder()
.set_host(String::from(HOST))
.set_port(PORT)
.build()?;
pub_client.connect().await?;
// Wait a bit, then publish a message
tokio::time::delay_for(Duration::from_secs(2)).await;
println!(
"Publish client publishing message \"hello\" to topic {}",
TOPIC
);
pub_client
.publish(&Publish::new(
String::from(TOPIC),
String::from("hello").into_bytes(),
))
.await?;
// Wait a bit, then publish another message
tokio::time::delay_for(Duration::from_secs(2)).await;
println!(
"Publish client publishing message \"there\" to topic {}",
TOPIC
);
pub_client
.publish(&Publish::new(
String::from(TOPIC),
String::from("there").into_bytes(),
))
.await?;
// Disconnect when done
pub_client.disconnect().await?;
Ok::<(), Error>(())
});
let shutdown_handle = tokio::spawn(async {
// Send shutdown command to subscription read loop after 8 seconds
tokio::time::delay_for(Duration::from_secs(8)).await;
sub_stop_tx.send(()).unwrap();
Ok::<(), Error>(())
});
future::try_join_all(vec![sub_handle, pub_handle, shutdown_handle])
.await
.unwrap()
.into_iter()
.collect::<Result<Vec<_>, Error>>()?;
Ok(())
}
There's one thing I haven't figured out yet: how I might cleanly disconnect()
the sub_client
when StreamEvent::Shutdown
is received in the loop. The problem is that the stream::unfold
block takes an &mut Client
and holds onto it for as long as combined_stream
is alive, so I can't call Client::disconnect()
in the loop that reads combined_stream
because that also takes &mut Client
. Do you know how I might be able to solve this?
That's a good question.
I wrapped sub_stream
down to the while loop in braces to scope the streams, then added a disconnect()
call after that.
use futures::{
future,
stream::{self, StreamExt},
}; // 0.3.4
use mqtt_async_client::{
client::{Client, Publish, QoS, ReadResult, Subscribe, SubscribeTopic},
Error,
}; // rev = "6010ac6c5c87c10a418994a5c353f387b9adfd54"
use std::time::Duration;
use tokio::sync; // 0.2.11
const HOST: &str = "test.mosquitto.org";
const TOPIC: &str = "dee90669-79bd-458f-bead-3a11afbc2354";
const PORT: u16 = 1883;
#[derive(Debug)]
enum StreamEvent {
Sub(Result<ReadResult, Error>),
Shutdown,
}
// For debugging purposes, to display the payload as text
#[derive(Debug)]
struct Utf8Message<'a> {
topic: &'a str,
payload: std::borrow::Cow<'a, str>,
}
impl<'a> Utf8Message<'a> {
fn from_read_result(read_result: &ReadResult) -> Utf8Message {
Utf8Message {
topic: read_result.topic(),
payload: String::from_utf8_lossy(read_result.payload()),
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (sub_stop_tx, sub_stop_rx) = sync::oneshot::channel();
let sub_handle = tokio::spawn(async {
let mut sub_client = Client::builder()
.set_host(String::from(HOST))
.set_port(PORT)
.build()?;
sub_client.connect().await?;
sub_client
.subscribe(Subscribe::new(vec![SubscribeTopic {
topic_path: String::from(TOPIC),
qos: QoS::AtLeastOnce,
}]))
.await?;
{
let sub_stream = stream::unfold(&mut sub_client, |client| {
async {
let result = client.read_subscriptions().await;
Some((result, client))
}
});
let shutdown_stream = stream::once(sub_stop_rx);
let mut combined_stream = stream::select_all(vec![
sub_stream.map(StreamEvent::Sub).boxed(),
shutdown_stream.map(|_| StreamEvent::Shutdown).boxed(),
]);
while let Some(event) = combined_stream.next().await {
match event {
StreamEvent::Sub(read_result) => {
match read_result {
Ok(read_result) => println!(
"Subscription client received message: {:?}",
Utf8Message::from_read_result(&read_result)
),
Err(Error::Disconnected) => {
println!("Subscription client was disconnected.")
}
Err(err) => println!("Subscription client error: {:?}", err),
};
}
StreamEvent::Shutdown => {
println!("Shutting down.");
// FIXME: clean shutdown, need sub_client here
// sub_client.disconnect().await?;
break;
}
}
}
}
sub_client.disconnect().await?;
Ok::<(), Error>(())
});
let pub_handle = tokio::spawn(async {
let mut pub_client = Client::builder()
.set_host(String::from(HOST))
.set_port(PORT)
.build()?;
pub_client.connect().await?;
// Wait a bit, then publish a message
tokio::time::delay_for(Duration::from_secs(2)).await;
println!(
"Publish client publishing message \"hello\" to topic {}",
TOPIC
);
pub_client
.publish(&Publish::new(
String::from(TOPIC),
String::from("hello").into_bytes(),
))
.await?;
// Wait a bit, then publish another message
tokio::time::delay_for(Duration::from_secs(2)).await;
println!(
"Publish client publishing message \"there\" to topic {}",
TOPIC
);
pub_client
.publish(&Publish::new(
String::from(TOPIC),
String::from("there").into_bytes(),
))
.await?;
// Disconnect when done
pub_client.disconnect().await?;
Ok::<(), Error>(())
});
let shutdown_handle = tokio::spawn(async {
// Send shutdown command to subscription read loop after 8 seconds
tokio::time::delay_for(Duration::from_secs(8)).await;
sub_stop_tx.send(()).unwrap();
Ok::<(), Error>(())
});
future::try_join_all(vec![sub_handle, pub_handle, shutdown_handle])
.await
.unwrap()
.into_iter()
.collect::<Result<Vec<_>, Error>>()?;
Ok(())
}
Does that help?
Oh, of course! That's a clever solution, thanks a lot for your help! š
Unfortunately, for me, this kind of problem still requires "clever" solutions rather than becoming straightforward.
For future reference I've found the Rust IRC (##rust on Freenode) and Rust sub-reddit very useful for this type of question.
Hello! I really like what you're building here, it looks very promising! I'm hoping to use this crate to connect my PC to my home automation system over MQTT. For my use case, I would like to both publish messages as needed and react to incoming messages on subscribed topics.
I figured I could use
tokio::spawn
to have a task that reads subscriptions in a loop running continuously, but the issue I'm running into is thatClient
is notSync
, resulting in a compilation error when I try to use it withtokio::spawn
.This is the code I'm using:
When I try to
cargo build
the code, I get this compilation error:Do you know how I might be able to get this working? I'm still fairly new to Rust (I started learning about a month ago), so I'm probably missing something š