Open nothingIsSomething opened 2 years ago
What's the issue that you're having? That code seems reasonable to me, though it usually doesn't make sense to convert a Stream into a Signal, because when you convert a Stream into a Signal, the Signal will only contain the most-recent message, all other messages are dropped.
As it says in the tutorial, it doesn't make sense if you need the intermediate values, well, in this case I'm only interested in the last message, not in buffering all the incoming messages, I guess I gain in performance with the signal, I need to learn more about the streams and signals, I'm trying to find the differences and similarities with rxjs to gain understanding faster.
this is the error that I get:
the method `signal` exists for struct `futures_signals::signal::Broadcaster<futures_signals::signal::FromStream<futures::stream::SplitStream<tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>>>>`, but its trait bounds were not satisfied
method cannot be called on `futures_signals::signal::Broadcaster<futures_signals::signal::FromStream<futures::stream::SplitStream<tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>>>>` due to unsatisfied trait bounds
note: the following trait bounds were not satisfied:
`std::option::Option<std::result::Result<tokio_tungstenite::tungstenite::Message, tokio_tungstenite::tungstenite::Error>>: std::marker::Copy`rustc(E0599)
client.rs(22, 33): method cannot be called on `futures_signals::signal::Broadcaster<futures_signals::signal::FromStream<futures::stream::SplitStream<tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>>>>` due to unsatisfied trait bounds
option.rs(515, 1): doesn't satisfy `_: std::marker::Copy`
I guess I gain in performance with the signal
That depends on what you're trying to do. When you convert a Stream into a Signal, it's still pulling all of the Stream values like usual, so the performance is the same, it's just dropping the intermediate values.
The buffer comes from the Stream itself, so the buffer still exists either way. And when you use StreamExt methods like map
it doesn't create a new buffer, so the performance should be the same in most situations.
In addition to that, Broadcaster
is pretty costly in performance, so in your code snippet it might actually be faster to use Streams.
this is the error that I get
That error is saying that the message type doesn't implement Copy
, and so you can't use it with signal()
.
If the message type implements Clone
then you can use signal_cloned()
instead.
If the message type doesn't implement Copy
or Clone
then you have to use signal_ref(|x| { ... })
which gives you a reference to the message.
All of this is only needed because you're using Broadcaster
, if you didn't use Broadcaster
then it becomes much simpler and faster:
let future = from_stream(read).for_each(move |message| {
if let Some(message) = message {
// ...
}
async {}
});
If you want to continue to use Broadcaster
, then I suggest doing something like this:
let broadcaster = Broadcaster::new(from_stream(read).map(|message| {
message.map(|message| message.to_string())
}));
This converts the message into a String
, so now you can use broadcaster.signal_cloned()
, because String
implements Clone
.
I'm trying to find the differences and similarities with rxjs to gain understanding faster.
RxJS is closer to Streams than it is to Signals. In RxJS they combine the concept of Streams and Signals together, which causes tons of bugs and problems. In Rust the two concepts are very cleanly separated:
A Stream is an ordered sequence of 0 or more values over time.
A Stream will emit every value, without missing anything.
A Stream can be empty.
A Signal is a single value which changes over time.
A Signal automatically drops intermediate values, it only keeps the most recent value.
A Signal can never be empty, it must always have a value.
You can think of it as being like the difference between having a mutable Vec
and a mutable variable:
Vec
can be empty, but a variable must always have a value.Vec
can contain multiple values, but a variable only contains one value (the most recent value).Vec
s together in various ways (chain, zip, concat), but you can't do that with a variable.a + b
) but you can't do that with a Vec
.In this analogy, a Stream is like an asynchronous Vec
, whereas a Signal is like an asynchronous variable. They have very different behavior and APIs, because they do different things, they serve different purposes. Signals do not replace Streams, instead they complement Streams, because both are useful.
it's all very clear to me now! thank you so much! :)
hello I'm new in rust, very used to rxjs, could you help me with a simple example please? , I have already read the crate tutorial but I'm still a bit lost, I'm trying to create a signal from a tcpStream.