Pauan / rust-signals

Zero-cost functional reactive Signals for Rust
MIT License
675 stars 37 forks source link

Add a way to combine signals and streams? #36

Closed werner291 closed 2 years ago

werner291 commented 3 years ago

Greetings! I have a feature request:

would it be possible to have a method where, if you have a Stream<Item=A> and a Signal<Item=B>, some method sample could be used to obtain a Stream<Item=(A,B)> that "samples" the signal whenever the first stream fires?

The application could be, for instance, where you have a Signal that represents the current position of the mouse cursor, and a Stream representing mouse clicks, you could effectively get a Stream of mouse clicks annotated with position.

Pauan commented 3 years ago

I believe this should already be possible... something like this:

signal.throttle(move || {
    async move {
        let _ = stream.next().await;
    }
})

But I haven't investigated whether that works correctly in every situation, so having a manual sample does sound like a good idea.

Pauan commented 3 years ago

(A big question is whether sample should return a Signal<Item = A> or a Signal<Item = Option<A>>, both seem useful)

werner291 commented 3 years ago

My opinion: I think it should return a Stream, not a Signal.

A "sample" is a distinctly discrete concept, of which there are usually some finite amount (if dealing with finite time), so a Stream makes more sense in that case, since a Signal represents something continuous.

You can perhaps use some kind of latest method that'll turn a Signal into a Stream, where the item is None if nothing has been received yet.

werner291 commented 3 years ago

Perhaps my (somewhat unprompted) opinion is that the Signal and Stream type actually have quite a bit of redundancy between them. If I was to design a library as such, I'd have a method Signal::changes() -> Stream or Signal::updates() -> Stream, to make the separation between continuous-time-changing values and discrete events a bit clearer.

(Please forgive my nitpicking, I'm kinda obsessed with FRP.)

I remember an FRP library the Purescript ecosystem where every Signal(-equivalent) was effectively just a wrapper around a Stream(-equivalent), and the Signal was effectively treated as a function over time (that you couldn't necessarily evaluate without some preconditions). IIRC, their implementation was O(n) in the size of the FRP signal, though, regardless how much actually got updated.

Pauan commented 3 years ago

Oh, well if it returns a Stream then it doesn't belong in the futures-signals crate, instead it needs to go into the futures crate.

There is currently a zip method which behaves similarly to what you want, except it waits for both Streams to have a value. And there has been some discussion about adding in a sample method.

So you should ask them about adding in a combinator, probably with a signature similar to this:

fn sample<St>(self, other: St) -> impl Stream<Item = (Self::Item, St::Item)> where St: Stream;

You can then easily use to_stream to combine that with Signals.

You can perhaps use some kind of latest method that'll turn a Signal into a Stream, where the item is None if nothing has been received yet.

That already exists, it's the to_stream method. If you only want the updates... that's as easy as doing signal.to_stream().skip(1)

Note that to_stream is lossy (because all Signal methods are lossy, it's a part of the Signal contract, and is one of the many fundamental differences between Signal and Stream).

Similarly there is from_stream which converts from a Stream into a Signal. Naturally it returns an Optionbecause the Stream might not have a value yet.

Perhaps my (somewhat unprompted) opinion is that the Signal and Stream type actually have quite a bit of redundancy between them.

That's not the case, they have a completely different contract, mathematical concept, behavior, and method APIs. Just compare SignalExt and StreamExt.

It was very much so an intentional choice to separate them, they cannot be combined, that would cause fundamental breakages (at the mathematical level, not just the implementation level).

There are some FRP libraries that try to combine them (such as Rx and old Elm), but they are fundamentally broken and have many major bugs because it just doesn't work.

werner291 commented 3 years ago

Can you link me to a paper or some other publication that shows that both are fundamentally incompatible concepts? The fact that the old Purescript library I was talking about has O(n) implementation seems to suggest that you're right.

That they would be separate, or that signals are "lossy" is something that makes sense.

I'll go have a look at that zip method, then. Combined with to_stream it should do exactly what I'm looking for.

Pauan commented 3 years ago

Can you link me to a paper or some other publication that shows that both are fundamentally incompatible concepts?

I don't know of any papers, but this video explains some (not all) of the issues:

https://www.youtube.com/watch?v=Agu6jipKfYw

In particular, foldp and merge are really problematic for Signals (but work perfectly for Streams). Various other FRP systems try really hard to fix the foldp problem, but I realized that foldp is just fundamentally incompatible with Signals, and so the solution is to just not have foldp (or merge) at all.

Here are some examples of the fundamental conceptual differences between Signal and Stream:

Mathematically a Signal is like a function Time -> a, and a Stream is a lazy List (Time, a).

The best way to think of it is like this: a Signal is an async RwLock<A>, and a Stream is an async Vec<A>.

This is why things like foldp and merge don't work for Signal: they must be run on every change, but Signal is a single value, so it doesn't output every change. In the case of continuous time it's not even possible to run on every change, since the time slices can be arbitrarily small (and run in any order, even backwards).

And similarly, flatten has completely different behavior for Signal and Stream: stream.flatten() will concatenate the Streams, whereas signal.flatten() will dynamically switch from one Signal to another Signal.

And filter has different behavior, since a Signal must always have a value, so it returns Option<A> instead of A.

There are a bunch of other Stream methods which don't make any sense on Signal:

These differences are fundamental to the design, they're not implementation details.

I'll go have a look at that zip method, then. Combined with to_stream it should do exactly what I'm looking for.

Not quite, since zip will wait for both the Stream and the Signal to update. So if the Stream updates but the Signal hasn't updated, then it will wait for the Signal to change.

Instead what you want is for it to wait for the Stream to update, but ignore the Signal updates (that's what sample would do).

werner291 commented 3 years ago

Ok... I think you're misunderstanding what I mean when sample should return a Stream. What I mean is that sample should return a Stream, which outputs exactly one value for every value on the input Stream, somehow paired up with the value of the Signal at the time at which it receives the value from the Stream. Doesn't matter if the Signal is lossy: just use the latest value.

Yes, I understand the semantic difference between a Signal and a Stream, with a Signal being a (continuous) time-varying function and a Stream a discrete series of value, and that any kind of state machine that receives updates is going to be inconsistent if it relies on non-loss of updates. Honestly, a for_each already kinda doesn't make sense, since it adds the implicit assumption that changes to the Signal are discrete in nature.

From a "mathematical" point of view: I do believe you can define a Stream<Item=A> as a Signal<Item=Option<A>>, by saying that the Signal briefly flashes to Some<A>, and whatever thing observes the signal is handed the exact time value at which that happens, with it being None when sampled without knowing the exact time of the event. Also, since sending two events on a Signal "simultaneously" technically happens sequentially (they have an order in the Steam, it semantically would make sense to just have it flash to Some twice in a row at times that are more or less as far apart as the set commands, without it being lossy. (Though I totally understand simply using the latest value, you've got a Stream anyway if you want to be lossless.)

Pauan commented 3 years ago

@werner291 Yes, I understood you perfectly, and zip does not do what you want, but sample does. So you need to ask them to add in a sample method to the futures crate.

If you don't believe me, please do try using zip, and you will quickly find out why it doesn't work the way you want.

Honestly, a for_each already kinda doesn't make sense, since it adds the implicit assumption that changes to the Signal are discrete in nature.

Yes, that's a good point, but I can't think of a better name.

I do believe you can define a Stream as a Signal<Item=Option>, by saying that the Signal briefly flashes to Some, and whatever thing observes the signal is handed the exact time value at which that happens,

You cannot, because it will skip values, but a Stream is not allowed to skip values, which is why it must be a lazy list (or similar).

In particular, there is no guarantee that the observer will notice exactly when it changes to Some, and so it will "miss" some of the updates.

What you are describing might be useful, but it's distinct from a Stream.

Also, since sending two events on a Signal "simultaneously" technically happens sequentially (they have an order in the Steam, it semantically would make sense to just have it flash to Some twice in a row at times that are more or less as far apart as the set commands, without it being lossy.

It's not possible to send two events on a Signal simultaneously, but even if it were possible then one of the events would be lost, because that is inherent to how Signals behave.

werner291 commented 3 years ago

In this particular implementation, such a Signal that blinks on/off, it would skip values, yes. I guess I was just saying that the underlying mathematics don't really require a Signal to be lossy, since Stream values never really occur exactly simultaneously; it makes sense that in the semantics of your implementation, you'd consider poll to be instantaneous, though, and say that all values produced during a poll execution have the same time.

As for for_each, I'd probably have taken the pedantic approach of simply not providing a way to observe the Signal. Perhaps there could be some DiscreteSignal trait, that can be converted into a Stream, which you can then observe with a callback. That way, not all signals (such as ones that represent a continuousky-changing value) can be observed.

Also, I really think it makes more sense to have sample be in futures-signals, not in futures. Semantically, I think that looking up the "current value" of a Stream makes no sense, which is what "sampling" is. I guess you could say you could sample the "lates"value of the stream, but that sounds like it would work better if you'd decompose that into alatestmethod (effectivelyto_signal`) and sample that.

Pauan commented 3 years ago

I guess I was just saying that the underlying mathematics don't really require a Signal to be lossy

That's incorrect, since mathematically a Signal is a function from Time -> a, and Time is infinitely granular. So how would you determine the right time to call the function? If you call the function too early or too late then you miss the event. And how long would the "blip" last? 1ms? 5ms? 0.00000001ms?

As for for_each, I'd probably have taken the pedantic approach of simply not providing a way to observe the Signal.

That literally defeats the entire point. Even pure mathematical representations of Signals allow you to observe the Signal (by calling the function repeatedly with ever-increasing Time values).

Conceptually for_each could be implemented like this:

async fn for_each<F>(self, f: F) {
    // Keep polling the Signal every 100ms
    loop {
        f(self.poll()).await;
        sleep(100).await;
    }
}

Of course it's not actually implemented like that (it's drastically more efficient), but theoretically it could be.

Perhaps there could be some DiscreteSignal trait, that can be converted into a Stream, which you can then observe with a callback.

In the current implementation Signals are already discrete (for efficiency), but that's an implementation detail. Even though it's discrete, that's not enough to make to_stream lossless, it's still lossy, because the lossiness is conceptual, not based on the implementation.

From a conceptual standpoint it makes no sense to talk about a "discrete Signal". How would a "discrete Signal" be different from a Stream?

Semantically, I think that looking up the "current value" of a Stream makes no sense, which is what "sampling" is.

The "current value" would simply be the most recent value of the Stream, it is perfectly well defined and makes sense, because Streams are already strictly ordered based on time.

I guess you could say you could sample the "latest" value of the stream, but that sounds like it would work better if you'd decompose that into a latest method (effectively to_signal) and sample that.

Yes... and that's exactly what would happen. You would call stream.sample(signal.to_stream()) and it will behave exactly as you want.

I already linked to a thread where they were willing to implement sample, because it's an operator that makes sense on Stream. And they were already thinking about implementing sample years before futures-signals existed.

werner291 commented 3 years ago

That's incorrect, since mathematically a Signal is a function from Time -> a, and Time is infinitely granular. So how would you determine the right time to call the function? If you call the function too early or too late then you miss the event. And how long would the "blip" last? 1ms? 5ms? 0.00000001ms?

The "blip" would be instantaneous, taking no time at all from a semantic point of view, whereby parties interested in the event would be notified with the correct value to use as the time parameter. In this sense a Signal is a function over time on a conceptual level, but an interested party would indeed always just get None if they just called it out of the blue.

That literally defeats the entire point. Even pure mathematical representations of Signals allow you to observe the Signal (by calling the function repeatedly with ever-increasing Time values).

Depends on what you mean by "observe". Here, I meant it as in the "observer pattern", whereby an interested party is actively notified of changes in the value in a push-based fashion, not a party that repeatedly polls for changes. Without some underlying knowledge that the signal represents a value that changes in clearly-defined, discrete steps, requesting updates on the value (without specifying some interval) would indeed be meaningless.

From a conceptual standpoint it makes no sense to talk about a "discrete Signal". How would a "discrete Signal" be different from a Stream?

A "discrete signal" would be a signal that is always defined from the moment it is created, and remains constant over time, except for when it changes instantaneously when something happens. Perhaps a "step function" is more standard terminology.

Yes... and that's exactly what would happen. You would call stream.sample(signal.to_stream()) and it will behave exactly as you want

Streams only have a meaningful value at discrete points in time (conceptually, they're a list of time-value pairs [(t1,x1),(t2,x2),...]), so to "sample" their value at an arbitrary point in time (other than t1,t2,...) sounds wrong to me.

If defined on two Streams, the implementation of that function would probably memorize the last value received from the Stream that represents the value that we wish to sample. In other words: internally, there's a memory value that changes over time, that will be requested (conceptually: the time-varying function is evaluated, using the reception time of the new value as the argument). Sounds an awful lot like a Signal, not a Stream.

werner291 commented 3 years ago

Also, note how in Signal's contract it says this:

a Signal must always return Poll::Ready(Some(...)) the first time it is polled, no exceptions

That's an essential feature that distinctly lacks from Stream. I can't imagine implementing a sample method on two Streams without that guarantee. Otherwise, we'd either return Stream<Option<..>>, or we'd have to have some awkward logic to deal with the value-being-sampled Stream not being ready yet.

Once you have that above guarantee, the implementation becomes greatly simplified.

Pauan commented 3 years ago

The "blip" would be instantaneous, taking no time at all from a semantic point of view, whereby parties interested in the event would be notified with the correct value to use as the time parameter.

That's not how Signals work from a mathematical (or implementation) perspective... what you're describing is a Stream, which is a List (Time, a)

Here, I meant it as in the "observer pattern", whereby an interested party is actively notified of changes in the value in a push-based fashion, not a party that repeatedly polls for changes.

That's purely an implementation detail done for performance, it doesn't affect the semantics. Even with futures-signals the notification does not include the time or value, it has to be polled (it's a hybrid push-pull system, which is the optimal system). Pure push systems are broken and incorrect.

A "discrete signal" would be a signal that is always defined from the moment it is created, and remains constant over time, except for when it changes instantaneously when something happens.

Yes, that's what a Signal is, but that's not what you were describing before. A Signal does not keep track of individual changes, it simply knows what its value is at any given time. A Stream on the other hand does keep track of every discrete change.

If defined on two Streams, the implementation of that function would probably memorize the last value received from the Stream that represents the value that we wish to sample.

Yes, and that's quite normal (for example, fold also stores internal state which is kept in between updates).

Sounds an awful lot like a Signal, not a Stream.

You could use Signals for it, yes, but there's also nothing wrong with using Streams for it either.

Otherwise, we'd either return Stream<Option<..>>, or we'd have to have some awkward logic to deal with the value-being-sampled Stream not being ready yet.

Indeed, things do get a bit complicated if the sampled Stream doesn't have a value yet: it could either return None, or it could behave like zip and wait for the sampled Stream to have a value. Both options seem viable to me.

Once you have that above guarantee, the implementation becomes greatly simplified.

No, the implementation has to handle it either way, because a Signal can return Poll::Pending (meaning that it didn't change), so it has to store the old value for it (exactly the same as for a Stream). There shouldn't be much difference implementation-wise.

werner291 commented 3 years ago

That's not how Signals work from a mathematical (or implementation) perspective... what you're describing is a Stream, which is a List (Time, a)

I was describing how, from a mathematical perspective, you could implement a stream by using a signal. One that would not have to be lossy, assuming that values cannot be created simultaneously, which they cannot be if we assume that they originate from some sequential process.

That's purely an implementation detail done for performance, it doesn't affect the semantics. Even with futures-signals the notification does not include the time or value, it has to be polled (it's a hybrid push-pull system, which is the optimal system). Pure push systems are broken and incorrect.

You're defining a callback that gets called every time there's a "new" value; be it that, or every time there's a clock tick. Either way, if you allow the updates/callbacks/etc to come from the Signal, you're requiring the Signal to have at least some notion of discrete updates; it effectively rules out continuous change on a semantic level. Obviously, you're never going to get something like actual continuous semantics through the Signal::poll_change method, but that's on the implementation side; it's something I'd hide from the user if I were to write a similar library (or make explicit with some StepFunction trait).

Indeed, things do get a bit complicated if the sampled Stream doesn't have a value yet: it could either return None, or it could behave like zip and wait for the sampled Stream to have a value. Both options seem viable to me.

That problem goes away completely if you sample a Signal, since it always has a value. It just seems to fit far more naturally than a Stream.

No, the implementation has to handle it either way, because a Signal can return Poll::Pending (meaning that it didn't change), so it has to store the old value for it (exactly the same as for a Stream). There shouldn't be much difference implementation-wise.

The simplification comes from the fact that you don't have to handle the case where the thing you're sampling doesn't have a value. Of course, you do have to keep the latest value in memory, but that's an invisible implementation detail that is not reflected at all in the interface, and which could (presumably) be implemented in some different way without breaking user code.

werner291 commented 3 years ago

I took a stab at a possible implementation. I'm relatively new to async in Rust, but this seems to work (at least in the little test case below):


use futures_signals::signal::{Mutable, Signal, SignalExt};
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::stream::{Stream, StreamExt};
use pin_project_lite::pin_project;
use futures::stream::Fuse;

pin_project! {
    #[derive(Debug)]
    #[must_use = "streams do nothing unless polled"]
    struct Sample<Sig: Signal,Str: Stream> {
        #[pin]
        signal: Sig,
        #[pin]
        stream: Fuse<Str>,
        // This technically has to be an option since the value can't be known at construction without the Context.
        signal_latest: Option<Sig::Item>,
        stream_queued: Option<Str::Item>
    }
}

impl<Sig: Signal, Str: Stream> Stream for Sample<Sig,Str> where Sig::Item : Clone {
    type Item = (Sig::Item, Str::Item);

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {

        let mut this = self.project();

        if let Poll::Ready(Some(a)) = this.signal.poll_change(cx) {
            // This is guaranteed to be true the first time poll_next is called.
            *this.signal_latest = Some(a);
        }

        // We're guaranteed to have a signal value at this point,
        // so we can just pair them up and return the poll result.
        if let Poll::Ready(a) = this.stream.poll_next(cx) {
            if let Some(a) = a {
                Poll::Ready(Some((this.signal_latest.as_ref().expect("A signal should always return Poll::Ready(Some(a)) upon first poll.").clone(), a)))
            } else {
                Poll::Ready(None)
            }
        } else {
            Poll::Pending
        }

    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        self.stream.size_hint()
    }
}

fn sample<A,B,Si,St>(signal:Si, stream:St) -> Sample<Si,St> where Si: Signal<Item=A>, St: Stream<Item=B>  {
    Sample {
        signal,
        stream: stream.fuse(),
        signal_latest: None,
        stream_queued: None
    }
}

#[cfg(test)]
mod tests {

    use super::*;
    use futures::stream;

    #[test]
    fn sample_test() {

        let (tx, rx) = tokio::sync::mpsc::channel(20);
        let mt = Mutable::new(1);

        let mut samples = sample(mt.signal(), tokio_stream::wrappers::ReceiverStream::new(rx));

        futures::executor::block_on(async {

            for i in 0i32..10 {
                tx.send(i).await.unwrap();
            }

            for i in 0i32..10 {
                assert_eq!(Some((1,i)),samples.next().await);
            }

            mt.set(2);
            mt.set(3);

            for i in 0i32..10 {
                tx.send(i).await.unwrap();
            }

            for i in 0i32..10 {
                assert_eq!(Some((3,i)),samples.next().await);
            }
        });
    }
}
Pauan commented 3 years ago

I was describing how, from a mathematical perspective, you could implement a stream by using a signal. One that would not have to be lossy, assuming that values cannot be created simultaneously, which they cannot be if we assume that they originate from some sequential process.

And I am explaining to you how it is impossible. It does not matter how the values are created, what matters is how they are consumed, and the type of Signal makes it impossible to consume them in the way that you're describing.

Either way, if you allow the updates/callbacks/etc to come from the Signal, you're requiring the Signal to have at least some notion of discrete updates; it effectively rules out continuous change on a semantic level.

No, it doesn't. Of course the polling is done discretely (because it must, on conventional hardware), but the Signal is still continuous.

The notification is simply an optimization to avoid polling too much, it does not affect the semantics at all.

I'm really suspecting that you are just not understanding how Signals work (and are implemented).

Obviously, you're never going to get something like actual continuous semantics through the Signal::poll_change method, but that's on the implementation side; it's something I'd hide from the user if I were to write a similar library (or make explicit with some StepFunction trait).

There isn't any fundamental difference between poll_change and calling a Time -> a function (except for optimization reasons the poll_change can return Poll::Pending so it can avoid unnecessary processing, but that does not change the semantics, it's just an optimization).

That problem goes away completely if you sample a Signal, since it always has a value. It just seems to fit far more naturally than a Stream.

That is very true, however you are making other tradeoffs in exchange for that, because now you are stuck with a Signal only, but Streams can do things that Signals cannot. For example, you might want to use fold or merge with the sampled Stream.

There are valid reasons for any of the combinations of sample: two Streams, two Signals, a Stream and Signal, a Stream and closure, returning a Signal, returning a Stream, etc. etc.

They're all useful, just in different situations. In your specific situation, you would probably want the zip behavior of having it wait for the second Stream to have a value, so that way stream.sample(signal.to_stream()) will work correctly.

The simplification comes from the fact that you don't have to handle the case where the thing you're sampling doesn't have a value.

It's not really a simplification, the implementation is quite simple either way.

werner291 commented 3 years ago

Ok... I think it's starting to click how Signals work.

So, basically, poll_change simply asks: "Did the value of this signal change since the last time I called it?", whereby "pending" means that it may change in the future, whereas "Ready(Some())" means a new value is available., and "Ready(None)" means that, not only has no change occurred, but the value won't change anymore.

Am I understanding that right?

If that's the case, a continuous function would just... Always return Ready(Some()), with whatever the value happens to be at the time of calling?

Dangit, that's actually quite clever.

Ok... I'm sorry for being so stubborn. I guess I saw Signals as mostly a thin wrapper around Streams, but I suppose that's just by how incredibly similar their implementations are, even if they're slightly different.

Pauan commented 3 years ago

Am I understanding that right?

Yes.

If that's the case, a continuous function would just... Always return Ready(Some()), with whatever the value happens to be at the time of calling?

That's correct.

But even a continuous function might choose to return Poll::Pending if it knows that the new value is equal to the old value (e.g. using PartialEq, or using some sort of external knowledge).

For example, the dedupe method returnsPoll::Pending if the new value (at the time of calling poll_change) is == to the old value (from the previous time poll_change was called).

But a simple continuous function (such as returning the current time) would just always return Poll::Ready(Some(value)).

For optimization purposes, the function should only notify when the value has probably changed (it can notify more often, but I work really hard to make futures-signals as efficient as possible).

I guess I saw Signals as mostly a thin wrapper around Streams, but I suppose that's just by how incredibly similar their implementations are, even if they're slightly different.

The trait definitions are indeed very similar, but the contract, behavior, and implementations are completely different.

werner291 commented 3 years ago

From reading the implementations, I'm noticing two major principles in how Stream and Signal differ:

Are there other major differences?

Pauan commented 3 years ago

If a downstream entity doesn't poll, nothing gets recomputed even if values change at the source, preventing a lot wasted effort.

That is also true for Stream, since Stream (and Future) are also poll-based. So if you don't poll a Stream, then nothing happens.

Are there other major differences?

Stream will return Poll::Ready(Some(value)) multiple times (once per value), whereas with Signal it will only return it once (because it always returns the most recent value). Signal never keeps past values. This is why Signal is always lossy, and there is no way to make it lossless.

This is a major difference in the trait contract. Just like how Haskell has "typeclass laws", Rust has "trait contracts". These contracts describe the behavior of the trait. Because these contracts go beyond the actual type, even two traits with the same type definition can have different contracts (and thus different behavior).


You have to look beyond the definition of the traits themself, because their different contracts causes every other method and implementation to be different.

For example, the implementation of flatten is completely different (Stream will concatenate the Streams, but Signal will switch from one Signal to another). This is because of the fundamental conceptual difference between Stream and Signal.

So even when a Signal and Stream have the same methods (like flatten or filter) they are often implemented quite differently, because of the different semantics.

And like I explained before, there are a lot of methods which Stream has that don't make sense on Signal (and vice versa).


The mpsc implementations of Signal and Stream vary a lot:


Also, in the next version of futures-signals (0.4) the definition of the Signal trait will change:

With Stream, it will keep polling repeatedly until it returns either Poll::Pending or Poll::Ready(None). But with Signal it will always only poll once, and will then wait for it to be notified before it polls again.

justinlovinger commented 2 years ago

The inability to sample Signals and produce a Stream paired with the value of a Signal at the time of a Stream event is an issue I've been struggling with while trying to use futures-signals.

Reflex FRP, a very mathematically rigorous FRP architecture, can do this, in multiple flavors. See everything under -- Transform Event to Event by sampling Behavior or Dynamic, https://github.com/reflex-frp/reflex/blob/develop/Quickref.md#functions-producing-event. Note that what Reflex calls Behavior is like a more mathematically pure Signal, and what they call Event is called Stream here, as far as I can tell. Dynamic is a combination of a Behavior and an Event, not unlike Signal in that it produces an event when the Behavior changes value.

Note, Reflex also has a sample function to directly get the value of a Behavior at a point in time, but that isn't feasible with Rust's lack of monadic side effect control. The best we can do is attach.

Pauan commented 2 years ago

@JustinLovinger This thread has discussed several different implementations of sample. Which one do you need? Can you say the type?

Also, I'm curious about your use case.

Note, Reflex also has a sample function to directly get the value of a Behavior at a point in time, but that isn't feasible with Rust's lack of monadic side effect control. The best we can do is attach.

As far as I can tell, sample just gives you the current value of a continuous function. I see no reason why futures-signals can't do that.

justinlovinger commented 2 years ago

Currently, I need tag, attach, and sample. Although, tag and sample can be implemented in terms of attach or attachWith. We may be able to implement attach in terms of sample, but I would have to double check. I actually need the PromptlyDyn variants, but I don't think there would be a distinction in futures-signals.

You are correct about sample. If futures-signals can implement sample for Signal, that would be great.

My use case is simply to write FRP applications, emphasis on the Functional, like I could with Reflex FRP in Haskell. As currently implemented, Mutable is more like Behavior or Dynamic than Signal is. You can sample the current value of a Mutable with .get, but you can't sample a Signal. You can only be notified of a Signal change like an Event. The problem with Mutable is you can't define it declaratively, you can't define it functionally. For example, I want to define a variable like,

let points = map_ref! {
    let x = points_earned.signal(),
    let y = points_spent =>
    *x - *y
}

However, I currently have to jump through hoops if I want a button press, a Steam, or Event in Haskell terms, to have different behavior depending on the value of points at the time of the button press.

Pauan commented 2 years ago

Currently, I need tag, attach, and sample. Although, tag and sample can be implemented in terms of attach or attachWith.

So it seems like attach just gives you the value of the Behavior at the time that an event happens, correct?

You can only be notified of a Signal change like an Event.

It's important to understand that a Signal is not at all like an Event.

A Signal does (eventually) let you know when it changes, but it doesn't give you the value. You must always poll a Signal to retrieve its value, because Signals behave like continuous functions (i.e. like a Behavior).

The notification is solely used as an optimization, it does not affect the semantics at all. You can skip the notification entirely and do a poll-based system instead (e.g. poll the Signal once every 5 seconds or whatever).

Signals are essentially Behaviors, they are treated semantically like continuous functions, and it's quite easy to convert a continuous function into a Signal.

The closest thing to an Event is a Stream, which is an ordered asynchronous collection of discrete values over time.

The problem with Mutable is you can't define it declaratively, you can't define it functionally.

Mutable is just one way of defining a Signal. If you want to convert an external source (like a button click) into a Signal then you probably want to use channel instead:

pub fn clicks() -> impl Signal<Item = u32> {
    let (sender, receiver) = channel();

    let mut number_of_clicks = 0;

    subscribe_to_button_clicks_somehow(move || {
        number_of_clicks += 1;
        let _ = sender.send(number_of_clicks);
    });

    receiver
}

Now you can just call clicks() to get a Signal. Unlike Mutable, this is "pure" (all side effects are hidden, so it's essentially referentially transparent).

If futures-signals can implement sample for Signal, that would be great.

If I did implement it, it would probably be a method on Broadcaster, which is something I had been planning for a while now.

But I think that's separate from attach.

justinlovinger commented 2 years ago

So it seems like attach just gives you the value of the Behavior at the time that an event happens, correct?

attach gives you a tuple containing the value of the Behavior and the value of the Event when the Event fires.

It's important to understand that a Signal is not at all like an Event.

I'm referring to a Signal from the user facing perspective, not how it's implemented internally. A user can be notified of a new value in a Signal, but a user cannot sample the value of a Signal. The ability to sample is the defining characteristic of a Behavior. A Signal is isomorphic to an Event from the user perspective.

Signals are essentially Behaviors, they are treated semantically like continuous functions, and it's quite easy to convert a continuous function into a Signal.

A Behavior cannot notify when it changes value, by definition, only a Dynamic can do that. If a Signal can notify when it changes, it is not a Behavior.

The closest thing to an Event is a Stream, which is an ordered asynchronous collection of discrete values over time.

You are correct that a Stream is a representation of an Event. However, a Signal is also a representation of an Event for the reasons stated above. The difference is a Signal discards some values to achieve a more efficient implementation.

Mutable is just one way of defining a Signal. If you want to convert an external source (like a button click) into a Signal then you probably want to use channel instead:

I am aware of channel, and I'm using it where appropriate. However, the example you gave still can't sample because you can't sample a Signal.

P.S. For clarity:

Type Features
Behavior Can sample
Event Can notify
Dynamic Can sample and notify
Pauan commented 2 years ago

attach gives you a tuple containing the value of the Behavior and the value of the Event when the Event fires.

Right, that's what I thought.

I'm referring to a Signal from the user facing perspective, not how it's implemented internally. The ability to sample is the defining characteristic of a Behavior.

I am also talking about it from the user's perspective, not the implementation. From a user-facing semantic perspective, Signals are Behaviors.

A user can be notified of a new value in a Signal, but a user cannot sample the value of a Signal.

That's not true, users can call poll_changed (though it's a little fiddly right now).

But even if Signals couldn't be sampled, that doesn't automatically make it an Event, it just makes it a less-useful Behavior.

A Signal is isomorphic to an Event from the user perspective.

No, it is not, Event has completely different user semantics than a Signal. That's why I mentioned it, because you seem to have a misunderstanding.

A Behavior cannot notify when it changes value, by definition, only a Dynamic can do that. If a Signal can notify when it changes, it is not a Behavior.

futures-signals (and most FRP libraries) don't make that distinction, a continuous function (i.e. a Behavior) in futures-signals is simply a Signal that always notifies.

I'm talking about the general definition of Behavior (i.e. Conal Elliot), not Reflex specifically. Every FRP system does things slightly differently, but there is a lot of overlap in the concepts.

The difference is a Signal discards some values to achieve a more efficient implementation.

It is not, you are fundamentally misunderstanding how Signals work both at the implementation level and the user semantic level.

A Signal is not a "Stream that discards some values to be more efficient", it is much deeper than that.

justinlovinger commented 2 years ago

I'm just referring to the definitions in Reflex FRP, what I'm most familiar with.

Regardless, we need to be able to sample Signals, like you can with Reflex Behaviors and Dynamics, to implement a lot of useful FRP patterns.

If I did implement it, it would probably be a method on Broadcaster, which is something I had been planning for a while now.

That's acceptable, given you can easily turn a Signal into a Broadcaster. However, we have to be careful the Broadcaster still works even if it never spawns a Signal that gets hooked into the larger Futures engine. If Broadcaster can be used to sample, there will be use cases where a user wants to only sample a Broadcaster without, say, passing broadcaster.signal() into dominator.

But I think that's separate from attach.

You can implement attach in terms of sample, like event.map(|x| (sample(signal), x)).

Pauan commented 2 years ago

However, we have to be careful the Broadcaster still works even if it never spawns a Signal that gets hooked into the larger Futures engine.

That's the plan, yeah. Broadcaster has its own Waker system which is independent of the executor. That's why I was planning to put it into Broadcaster.

You can implement attach in terms of sample, like event.map(|x| (sample(signal), x)).

Yes, but that's less efficient, and won't work with Rust's executor system.

justinlovinger commented 2 years ago

and won't work with Rust's executor system.

Can you elaborate? I'm not sure how sample would work if event.map(|x| (sample(signal), x)) wouldn't work. In that case, it may be better to only implement attach and sample like signal.attach(event).map(|(sampled_value, event_value)| ...).

P.S. in case it wasn't clear, attach only fires when event fires. You can tell this from the Reflex definition of attach because a Reflex Behavior can't fire.

Also, there is the question of whether event should be a Stream or a Signal. Ideally, there would be a variant for each.

Pauan commented 2 years ago

Can you elaborate?

Polling a Future / Stream / Signal in Rust requires a Context. But there is no Context in your code example.

The way to get a Context is to create a new combinator, which is why we can create a new attach combinator which can access the Context, but we can't create a free-floating function (like sample) which accesses the Context.

Perhaps one way of looking at it is to say that Future/Stream/Signal are Monads, and the Context is hidden state which is passed through the Monad.

So because sample is just a plain function, not a Monad, it can't access the hidden Context state. Instead you need to create a new Stream Monad (like attach) which can access the hidden Context state.

sample like signal.attach(event).map(|(sampled_value, event_value)| ...).

That code returns a Stream, not a value like you probably intended. Once a value is inside of the Future/Stream/Signal Monad, the only way to get it out again is with something like for_each (or block_on).

Also, there is the question of whether event should be a Stream or a Signal. Ideally, there would be a variant for each.

It can't be a Signal, because a Signal is a continuous function. So how would it decide when to sample?

justinlovinger commented 2 years ago

Polling a Future / Stream / Signal in Rust requires a Context. But there is no Context in your code example.

Yes, this is why I originally said, "[sample] isn't feasible with Rust's lack of monadic side effect control. The best we can do is attach".

That code returns a Stream, not a value like you probably intended.

I intended it to return a Stream.

It can't be a Signal, because a Signal is a continuous function. So how would it decide when to sample?

Stream was actually my preference as well.

From a practical standpoint, you could sample event of type Signal when it changes value. However, you can always turn a Signal into a Stream, so the point is moot.

Pauan commented 2 years ago

Yes, this is why I originally said, "[sample] isn't feasible with Rust's lack of monadic side effect control. The best we can do is attach".

I'm not really sure what you mean. Your code is equivalent to this:

fmap (\x -> (sample signal, x)) event

That also doesn't work in Reflex, because the fmap lambda must return a regular value, not a Monad, but sample returns a Monad.

I don't see any functions in Reflex which would allow you to define attach like that.

I intended it to return a Stream.

Oh, okay, well that isn't how sample works, but if it's acceptable for you.

From a practical standpoint, you could sample event of type Signal when it changes value.

No, you can't, because it's a continuous function. It can notify even if the value didn't change, it can notify multiple times for 1 change, or it might not notify at all even if it did change. And then there are Signals that continuously notify.

As I have said repeatedly, Signals are not Events, they are Behaviors. They do not have the behavior or guarantees of a Stream.

Pauan commented 2 years ago

Ah, I see, it's the push function that makes it possible. That is equivalent to Stream::then.

So if there was an fn sample(self) -> impl Future<Item = Self::Item> then it would work:

event.then(move |event_value| {
    async move {
        let sampled_value = sample(signal).await;
        (sampled_value, event_value)
    }
})

This works fine, since sample is returning the Future monad.

justinlovinger commented 2 years ago

I'm not really sure what you mean. Your code is equivalent to this:

The Reflex sample function is here, https://github.com/reflex-frp/reflex/blob/develop/Quickref.md#functions-producing-behavior. It takes a Behavior and return a value wrapped in a monad. Everything in Reflex happens inside a monadic context, hence why it can call sample. Haskell doesn't even have a concept of values that can change outside of monadic contexts, so it wouldn't make sense to sample a value without returning a monad.

P.S. monads are regular values in Haskell, and you can return them with fmap. There are multiple ways to flatten or combine monads of monads or functors of monads.

Oh, okay, well that isn't how sample works, but if it's acceptable for you.

No, but that is how attach works. You can get behavior equivalent to sample using attach, and vice versa, so the implementation of either works for me.

So if there was an fn sample(self) -> impl Future<Item = Self::Item> then it would work:

That looks good for sample. It's semantically strange to "await" the "value at the current time", but I understand why it has to be implemented like that.

Pauan commented 2 years ago

Everything in Reflex happens inside a monadic context

But the monad for Event is different than the monad for sample, which is why you can't combine them that way. You need something like push which will evaluate the sample monad inside the Event monad.

You can get behavior equivalent to sample using attach, and vice versa, so the implementation of either works for me.

No, because you also need push in addition to those, just sample + fmap is not enough. It is push that is the key.

Thankfully Rust Streams already have push, so it isn't an issue.

P.S. monads are regular values in Haskell

I am aware, they are also regular values in Rust. I actually learned Haskell before I learned Rust. But in order to evaluate them they need to be in the correct monad context. fmap does not do that, but push does.

It's semantically strange to "await" the "value at the current time", but I understand why it has to be implemented like that.

Why is it semantically strange? A Future returns a single value which resolves at a particular time, it's basically like a one-shot Event. Since Events are all about discrete values at a particular time, it makes sense to me.

.await is just how Rust does monadic >>= for Future, it doesn't mean "in the future", because the Future can potentially return a value immediately.

async {
    let x = foo().await;
    let y = bar().await;
    x + y
}

That is equivalent to this Haskell code:

do x <- foo
   y <- bar
   return x + y
justinlovinger commented 2 years ago

Why is it semantically strange?

It's semantically strange because it sounds weird in English to say "wait for the current time". I only mention it because it could trip up new users.

Pauan commented 2 years ago

Well, because of Rust's multi-threading, there could actually be a noticeable delay between calling the function and receiving the value, and there can also be OS-level delays, so it's not too far off, semantically speaking.

Especially because a Signal can run arbitrary code, so retrieving the current value can block for a potentially infinite amount of time.

Though I agree it's a little weird to use Future for that, normally Future is used for things that are substantially longer in time-frame.

Pauan commented 2 years ago

So after giving it a bit of thought, I think the API should be like this:

fn sample_stream_cloned<A: Stream>(self, stream: A) -> impl Stream<Item = (Self::Item, A::Item)>
justinlovinger commented 2 years ago

Would it make more sense to call it attach_to_stream_cloned? That API fits the definition of Reflex attach, and sample_stream_cloned makes it sound like you're sampling the Stream. Other than the name, the API looks good to me.

Pauan commented 2 years ago

It's a method on Signal, so it's sampling the Signal based on the Stream. A more accurate name would also be longer.

I'm not going to use Reflex's naming conventions, this is a Rust library so I use Rust naming conventions, futures-signals was never intended as a clone of any other FRP library.

justinlovinger commented 2 years ago

It's worth noting, sample means something very different in the Rust Futures ecosystem. In Rust, sample is a type of throttle, https://docs.rs/futures-time/latest/futures_time/stream/trait.StreamExt.html#method.sample, https://github.com/rust-lang/futures-rs/issues/210.

Pauan commented 2 years ago

Oh, nice, so there's already a sample method, with very similar behavior.

signal.sample_stream_cloned(stream) is almost exactly the same as signal.to_stream().sample(stream) except that sample_stream_cloned also gives you the Stream's value.

"Sample" is a commonly used word in many languages and FRP systems, it is a type of throttling and it has the correct semantic meaning.

Pauan commented 2 years ago

Version 0.3.31 has been released, it has a sample_stream_cloned method on SignalExt, so I'm going to consider this issue as solved.