rust-lang / futures-rs

Zero-cost asynchronous programming in Rust
https://rust-lang.github.io/futures-rs/
Apache License 2.0
5.34k stars 617 forks source link

Debounce / sample stream combinators #210

Open NeoLegends opened 7 years ago

NeoLegends commented 7 years ago

I don't know whether the suggestion is in the reach (since the implementations are probably going to require a timer) and in the spirit (since this is more of a FRP suggestion) of this library, but having a sample and debounce stream combinator would be really nice-to-have, especially in scenaria with user interaction.

Debounce works by ignoring values of a stream until a certain amount of time has passed without any new events. It is used by e.g. web developers when processing user input and ignoring the changes while the user is typing. Sample works in a similar fashion: instead of ignoring all values the stream produces, it passes on values from the input stream at specified intervals. It can be used to ignore most intermediate values, yet provide the user with some feedback from time to time.

alexcrichton commented 7 years ago

Sounds plausible to me! If it can't be implemented here then these may perhaps be best implementable at the tokio-core layer which has more access to scheduling and timers and such.

NeoLegends commented 7 years ago

Great! Since most FRP libraries feature a ton of combinators (way more than futures-rs) and I'm sure not all of those combinators are applicable in Rust or don't really fit well into the zero-cost model, I suggest taking a look at http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html or https://msdn.microsoft.com/en-us/library/system.reactive.linq.observable(v=vs.103).aspx to get an idea of what is possible and which combinators could be ported.

NeoLegends commented 7 years ago

When the combinators to be ported are figured out, I could definetely start implementing some of them.

alexcrichton commented 7 years ago

Oh I at least personally don't mind implementing some more heavyweight combinators. Just because the core Future trait is zero-cost doesn't mean that everything around it needs to be as well!

To that end I'd be down for basically implementing any useful combinator you've seen from other libraries, in my mind they're all welcome!

NeoLegends commented 7 years ago

Alright! How would we go about implementing the combinators at the tokio-core level? Using a FutureExt-trait seems ugly but is the only real way that comes to my mind.

alexcrichton commented 7 years ago

Yeah if we want to implement these at the tokio-core layer we'd want an extension trait (or perhaps just free function constructors). I'd be fine exploring though to see if we can implement them at the futures layer as well. With std::time::Instant going all the way to tokio-core may not be necessary? Unsure though.

NeoLegends commented 7 years ago

I already thought about implementing the combinators only with std::time primitives, but the problem with that is that e.g. debounce needs to fire its last result after the timeout has passed (asynchronously). This means we cannot rely on the underlying stream to notify the task of new items, which is why we're going to need a timer that unpark()s the task at the right time.

siriux commented 7 years ago

Wouldn't be a good idea to have Timeout and Interval from tokio-core directly in futures-rs? They can be useful for more than just IO, for example in GUIs.

NeoLegends commented 7 years ago

I guess the problem with that is that the timers themselves depend on a specific event loop, while futures-rs is agnostic to which one you use. Unsure though.

siriux commented 7 years ago

I see. Maybe it's a stupid idea but, could futures-rs include an abstraction that can be implemented on top of cpupool, tokio-core, ...? This way it would be usable with or without IO with the same interface.

alexcrichton commented 7 years ago

@siriux that's partly the topic of https://github.com/alexcrichton/futures-rs/issues/198 if we could add park_timeout generically (requiring event loops to implement it)

carllerche commented 7 years ago

There also is tokio-timer which is a free standing futures timer and is not bound to an event loop

carllerche commented 7 years ago

Personally, I wonder if there should be a futures-ext crate that can absorb additional dependencies and can act as a staging for combinators and other utilities as the landscape settles and it becomes more clear where everything goes

tailhook commented 7 years ago

There also is tokio-timer which is a free standing futures timer and is not bound to an event loop

I strongly believe that timeouts should work on the main loop. And if there is a Future::timeout() combinator that sends timer into another thread, it will be very tempting to use it in the networking code. Which is usually a wrong idea, I think.

(I'm not sure was it your point or not, though)

NeoLegends commented 7 years ago

Is there a rough ETA for the 0.2 release? https://github.com/tokio-rs/tokio-core/issues/69 has the unified timer story set for the 0.2 milestone, and we could just wait with the debounce / sample / buffer-for-time-combinators until that issue is resolved..? Then we wouldn't need an additional futures-ext crate, which would lead to re-doing work as the underlying APIs change anyway.

alexcrichton commented 7 years ago

@NeoLegends unfortunately not yet, currently, but hopefully soon!

NeoLegends commented 5 years ago

Throttle has been merged into https://github.com/tokio-rs/tokio/commit/e166c4d91231c5af00df903b16b91c5c364f9b27.

Review pending on debounce / sample https://github.com/tokio-rs/tokio/pull/747

Nemo157 commented 5 years ago

One idea to decouple this from the underlying timers implementation: use an impl FnMut() -> impl Future<Output = ()> to get the timer (alternatively an impl Stream<Item = ()> with the additional constraint that it will buffer at most a single item if not polled faster than its production interval). You could then provide the generic implementation in futures:

trait StreamExt {
    fn throttle(self, timer: impl FnMut() -> impl Future<Output = ()>) -> impl Stream<Item = Self::Item> {
        ...
    }
}

(with some massaging of actual generic constraints since we can't yet use impl Fn() -> impl Something) and specific frameworks can simply wrap this with a function that uses their native timer functionality

trait tokio::StreamExt {
    fn throttle(self, duration: Duration) -> impl Stream<Item = Self::Item> {
        futures::StreamExt::throttle(self, move || Delay::new(clock::now() + duration))
    }
}

This would avoid duplicating the underlying functionality across multiple frameworks.

NeoLegends commented 5 years ago

Sounds like a valid point. Could you crosspost this to the relevant PR in the tokio repo?

Nemo157 commented 5 years ago

@NeoLegends thinking about it a little more, I think it's likely that if this is implemented in futures it would be as part of futures 0.3, so tokio would probably want to have its own 0.1 implementation in the meantime anyway. When updating tokio to 0.3 in the future it could be migrated to use the futures implementation. It should even be a non-breaking change to migrate tokio to wrap the futures implementation if it provides the same functionality.

NeoLegends commented 5 years ago

I agree. Ideally you'd want to have this in futures, not in tokio.

I'm not really familiar with futures 0.3, does it require executors to provide a timer now?