dotnet / reactive

The Reactive Extensions for .NET
http://reactivex.io
MIT License
6.71k stars 750 forks source link

Could we have Throttle by task? #1270

Open Thaina opened 4 years ago

Thaina commented 4 years ago

Feature request

I don't know if this operator could be fabricated from other function. But I want a function that could be Throttle the input from stream while the current handler wasn't finished, but not throttle by timing

Suppose there is a http post function that listen to UI input change and send that data to server

ISubject<string> nameTextField;
nameTextField.Throttle(async(newName) => {
    await HTTPPost(newName);
    return newName;
}).Subscribe((newName) => {
    Console.WriteLine("Updated name to : " + newName);
});

What I would like it to happen is, this function would act like Select but it would buffer data stream to emit only one last output at a time 1 - If the data was sent to this function for the first time, it will trigger the function immediately 2 - If data was sent to this function, but the previous execution still not finished. Then it would kept that data and wait until the previous execution finished then trigger the function again 3 - If yet another data was sent, it would dispose the previous cache and kept the new one in the waiting queue instead

So as the example. If there is any changed from user input, it will send HTTP Post to server immediately. But if there was internet lagging while user was changing that text field multiple time. I don't want HttpPost to be called parallel, but if the previous one finished I would like to Post the latest value that user have changed

Could we have this kind of function or there was a way to do this already?

quinmars commented 4 years ago

You are looking for SwitchFirst see for example #118. Unfortunately there is no implementation for this operator in Rx.NET. In my opinion this is one of the "missing" operators that would be worth to add to Rx.NET.

Hong-Xiang commented 3 years ago

Is this related to rxjs's exhaustMap? There seems to be an implementation in F#'s Rx.Net wrapper FSharp.Control.Reactive. Implementation is here.

It's similar to implementation mentioned in #118, also with cost of introducing mutable values.

quinmars commented 2 years ago

I just published a nuget package MoreRx which provides an implementation for SkipFirst. It will subscribe to the first incoming Observable, only when this has been completed it will subscribe to the last incoming Observable. Every other observable inbetween will be dropped without any subscription.