dotnet / reactive

The Reactive Extensions for .NET
http://reactivex.io
MIT License
6.66k stars 747 forks source link

Implement a real throttle (not debounce, like the existing throttle) #395

Open cmeeren opened 7 years ago

cmeeren commented 7 years ago

Currently there is no operator to ignore elements from an observable sequence that follow a non-ignored element within a specified relative time duration. This is what would normally be called Throttle, while the existing Throtte operator is really a debounce operator (see e.g. The Difference Between Throttling and Debouncing).

Here is a sample implementation of the "real" throttle operator both with and without scheduler:

public static IObservable<TSource> AtMostOnceEvery<TSource>(
    this IObservable<TSource> source,
    TimeSpan dueTime)
{
    IObservable<TSource> delay = Observable.Empty<TSource>().Delay(dueTime);
    return source.Take(1).Concat(delay).Repeat();
}

public static IObservable<TSource> AtMostOnceEvery<TSource>(
    this IObservable<TSource> source,
    TimeSpan dueTime,
    IScheduler scheduler)
{
    IObservable<TSource> delay = Observable.Empty<TSource>().Delay(dueTime, scheduler);
    return source.Take(1).Concat(delay).Repeat();
}
akarnokd commented 7 years ago

That won't work if the source is cold because repeat will resubscribe and you get the very first element all over.

This should work for both cold and hot sources :

    class Program
    {
        static void Main(string[] args)
        {
            new int[] { 100, 200, 300, 400, 500, 600, 1000 }.ToObservable()
                .SelectMany(v => Observable.Timer(TimeSpan.FromMilliseconds(v)).Select(w => v))
                .ThrottleFirst(TimeSpan.FromMilliseconds(210), Scheduler.Default)
                .Subscribe(Console.WriteLine, () => Console.WriteLine("Done"));

            Console.ReadLine();
        }
    }

    static class OX
    {
        internal static IObservable<T> ThrottleFirst<T>(this IObservable<T> source, TimeSpan delay, IScheduler scheduler)
        {
            return source.Publish(o =>
            {
                return o.Take(1)
                .Concat(o.IgnoreElements().TakeUntil(Observable.Return(default(T)).Delay(delay, scheduler)))
                .Repeat()
                .TakeUntil(o.IgnoreElements().Concat(Observable.Return(default(T))));
            });
        }
    }
cmeeren commented 7 years ago

Thanks. I don't understand what's happening there because I haven't used Rx all that much, but I'd just like to add that if this is implemented, perhaps the word "debounce" should appear in the Throttle documentation.

akarnokd commented 7 years ago

See the ReactiveX page of Sample which lists other throttle-like implementations in other languages with marble diagrams: http://reactivex.io/documentation/operators/sample.html

cmeeren commented 7 years ago

Not sure why you refer me to that page. Sample seems different than what I describe: With Sample, an item is picked each X seconds (if a new item exists). If you sample every 10 seconds and an item is pushed 1 second after the previous sample, you'd have to wait 9 seconds for Sample to pick it up. I am talking about immediately acting on an item and then skipping all other items until the delay has completed.

akarnokd commented 7 years ago

You need to browse a little bit on that page:

Image

cmeeren commented 7 years ago

That's exactly what I suggested, isn't it? Possible clarification: When I said

I don't understand what's happening there

I was thinking about the specific implementation you posted above, but no matter - I assume we're on the same page as regards the desired behavior. :)

Matungos commented 7 years ago

I know is offtopic, but could you please share an implementation of throttleWithTimeout?? Because I'm trying to implement the debounce behavior without success.

sglienke commented 7 years ago

@Matungos That is what Throttle already does (it's a bit confusing I have to say that RX.NET calls Debounce Throttle and Throttle Sample)

Bartolomeus-649 commented 6 years ago

What's the status of this?

And does all agree on the definitions: Throttle: Maximum number of calls during a specified time period. Debounce: Minimum time between calls.

DavidTangWei commented 6 years ago

@akarnokd hello~ Your implementation not work for me. My need is to find or create a rx operator like throttleFirst in java.Rx.

akarnokd commented 6 years ago

@DavidTangWei Here is a native implementation:

    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Hello World!");

            new int[] { 100, 200, 300, 400, 500, 600, 1000 }.ToObservable()
                .SelectMany(v => Observable.Timer(TimeSpan.FromMilliseconds(v)).Select(w => v))
                .ThrottleFirst(TimeSpan.FromMilliseconds(210), Scheduler.Default)
                .Subscribe(Console.WriteLine, () => Console.WriteLine("Done"));

            Console.ReadLine();
        }
    }

    static class ObservablesEx
    {
        public static IObservable<T> ThrottleFirst<T>(this IObservable<T> source, 
                TimeSpan timespan, IScheduler timeSource)
        {
            return new ThrottleFirstObservable<T>(source, timeSource, timespan);
        }
    }

    sealed class ThrottleFirstObservable<T> : IObservable<T>
    {
        readonly IObservable<T> source;

        readonly IScheduler timeSource;

        readonly TimeSpan timespan;

        internal ThrottleFirstObservable(IObservable<T> source, 
                  IScheduler timeSource, TimeSpan timespan)
        {
            this.source = source;
            this.timeSource = timeSource;
            this.timespan = timespan;
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            var parent = new ThrottleFirstObserver(observer, timeSource, timespan);
            var d = source.Subscribe(parent);
            parent.OnSubscribe(d);
            return d;
        }

        sealed class ThrottleFirstObserver : IDisposable, IObserver<T>
        {
            readonly IObserver<T> downstream;

            readonly IScheduler timeSource;

            readonly TimeSpan timespan;

            IDisposable upstream;

            bool once;

            double due;

            internal ThrottleFirstObserver(IObserver<T> downstream, 
                    IScheduler timeSource, TimeSpan timespan)
            {
                this.downstream = downstream;
                this.timeSource = timeSource;
                this.timespan = timespan;
            }

            public void OnSubscribe(IDisposable d)
            {
                if (Interlocked.CompareExchange(ref upstream, d, null) != null)
                {
                    d.Dispose();
                }
            }

            public void Dispose()
            {
                var d = Interlocked.Exchange(ref upstream, this);
                if (d != null && d != this)
                {
                    d.Dispose();
                }
            }

            public void OnCompleted()
            {
                downstream.OnCompleted();
            }

            public void OnError(Exception error)
            {
                downstream.OnError(error);
            }

            public void OnNext(T value)
            {
                var now = timeSource.Now.ToUnixTimeMilliseconds();
                if (!once)
                {
                    once = true;
                    due = now + timespan.TotalMilliseconds;
                    downstream.OnNext(value);
                } else if (now >= due)
                {
                    due = now + timespan.TotalMilliseconds;
                    downstream.OnNext(value);
                }

            }
        }
    }
DavidTangWei commented 6 years ago

@akarnokd Thanks~ It works. I need to use Rx.net Rx-Main -Version 2.2.5 in order to support .net4.0. So my only approach is to copy this implement to my source code.

obiben commented 6 years ago

@akarnokd 's implementation almost worked for me but not quite. What I'd expect from a throttle would be that if now >= due, a value be emitted immediately, and if not, the latest emitted value would be emitted at due time, rather than waiting until a value is emitted past due time.

I've added T queued; to ThrottleFirstObserver

and changed OnNext to:

            public void OnNext(T value)
            {
                var now = timeSource.Now.ToUnixTimeMilliseconds();
                if (!once)
                {
                    queued = default(T);
                    once = true;
                    due = now + timespan.TotalMilliseconds;
                    downstream.OnNext(value);
                }
                else if (now >= due)
                {
                    queued = default(T);
                    due = now + timespan.TotalMilliseconds;
                    downstream.OnNext(value);
                }
                else
                {
                    bool firstQueue = queued == null;
                    queued = value;
                    if (firstQueue)
                    {
                        timeSource.Schedule(due - now, (IScheduler s, double d) =>
                        {
                            OnNext(queued);
                            return null;
                        });
                    }
                }
            }
badre429 commented 5 years ago

any update

huybuidac commented 5 years ago

I tried to create a DebounceOperator, is it OK?

    sealed class DebounceOperator<T> : IObservable<T>
    {
        readonly IObservable<T> _source;
        readonly TimeSpan _timespan;
        internal DebounceOperator(IObservable<T> source, TimeSpan timespan)
        {
            _source = source;
            _timespan = timespan;
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            var parent = new DebounceHandler(observer, _timespan);
            var d = _source.Subscribe(parent);
            parent.OnSubscribe(d);
            return d;
        }

        sealed class DebounceHandler : IDisposable, IObserver<T>
        {
            readonly IObserver<T> _downstream;
            readonly TimeSpan _timespan;
            IDisposable upstream;

            double due;

            private TaskCompletionSource<bool> _tcs;

            internal DebounceHandler(IObserver<T> downstream, TimeSpan timespan)
            {
                _downstream = downstream;
                _timespan = timespan;
            }

            public void OnSubscribe(IDisposable d)
            {
                if (Interlocked.CompareExchange(ref upstream, d, null) != null)
                {
                    d.Dispose();
                }
            }

            public void Dispose()
            {
                var d = Interlocked.Exchange(ref upstream, this);
                if (d != null && d != this)
                {
                    //Console.WriteLine("Audit Dispose <------");
                    d.Dispose();
                }
                if (_tcs != null && !_tcs.Task.IsCompleted)
                {
                    _tcs.SetResult(false);
                }
            }

            public void OnCompleted()
            {
                if (_tcs != null && !_tcs.Task.IsCompleted)
                {
                    _tcs.SetResult(false);
                }
                _downstream.OnCompleted();
            }

            public void OnError(Exception error)
            {
                if (_tcs != null && !_tcs.Task.IsCompleted)
                {
                    _tcs.SetResult(false);
                }
                _downstream.OnError(error);
            }

            public void OnNext(T value)
            {
                if (_tcs != null && !_tcs.Task.IsCompleted)
                {
                    _tcs.SetResult(false);
                }
                Audit(value).ConfigureAwait(false);
            }

            private async Task Audit(T value)
            {
                var localTcs = _tcs = new TaskCompletionSource<bool>();
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
                Task.Run(async () =>
                {
                    await Task.Delay(_timespan).ConfigureAwait(false);
                    if (_tcs == localTcs && !localTcs.Task.IsCompleted)
                    {
                        localTcs.SetResult(true);
                    }
                    else
                    {
                        localTcs.SetResult(false);
                    }
                }).ConfigureAwait(false);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
                try
                {
                    var valid = await localTcs.Task.ConfigureAwait(false);
                    if (valid)
                    {
                        //Console.WriteLine("Audit OK???");
                        _downstream.OnNext(value);
                        var now = DateTime.Now.Ticks;
                        due = now + _timespan.Ticks;
                    }
                    else
                    {
                        //Console.WriteLine("Audit false???");
                    }
                }
#pragma warning disable RECS0022 // A catch clause that catches System.Exception and has an empty body
                catch
                {
                    //Console.WriteLine("Audit exception???");
                }
#pragma warning restore RECS0022 // A catch clause that catches System.Exception and has an empty body
                if (localTcs == _tcs)
                {
                    _tcs = null;
                }
            }
        }
    }
akarnokd commented 4 years ago

Closing due to inactivity.

badre429 commented 4 years ago

@akarnokd
we still need this behavior a real throttle (not debounce, like the existing throttle)

akarnokd commented 4 years ago

@badre429 Please refresh my memory. What exact emission pattern do you need? Does any of the example work for you?

In general, I'm not convinced the original operator requested has to be part of Rx.NET proper but you are welcome to implement any custom operator in your project.

mikhail-khalizev commented 4 years ago

Currently dotnet Throttle looks like js debounce. And as I understand it, there is no analogue of js throttle in detnet now.

obiben commented 4 years ago

Turns out that Sample does what you'd expect from Throttle

mikhail-khalizev commented 4 years ago

No, their behavior is different.

C# Sample:

Samples the observable sequence at each interval. Upon each sampling tick, the latest element (if any) in the source sequence during the last sampling interval is sent to the resulting sequence.

So, if source observable sequence has a large gap, then Sample will continue to generate elements.

But js Throttle:

Emits a value from the source Observable, then ignores subsequent source values for a duration > determined by another Observable, then repeats this process.

So, when source observable sequence has a large gap, js Throttle does not generate elements.

obiben commented 4 years ago

Oh OK there might be a slight difference then.

Worth noting that Sample will not continue to generate elements plural. It will generate the latest element from source once, then nothing on further intervals. It's also worth noting that after a large gap it'll emit instantly on the first new item generated like Throttle should, and start sampling again. If nothing else is generated during the next gap, nothing else will be emitted.

mikhail-khalizev commented 4 years ago

Yes, you right.

PS. It would be nice to add in the comments to the Sample method:

Worth noting that Sample will not continue to generate elements plural. It will generate the latest element from source once, then nothing on further intervals. It's also worth noting that after a large gap it'll emit instantly on the first new item generated. If nothing else is generated during the next gap, nothing else will be emitted.

Because with the comment that there is now, I got the impression that it would generate upon each sampling tick.

doctorpangloss commented 3 years ago

I'm not sure why this ticket is closed, the names for these things wound up being wrong.

softlion commented 1 year ago

Up. 2022 still no throttle.

softlion commented 1 year ago

@obiben

https://github.com/dotnet/reactive/issues/395#issuecomment-414795239 and if not, the latest emitted value would be emitted at due time, rather than waiting until a value is emitted past due time.

I don't agree. You are creating another operator that adjust the delay between emitted items. That's not what throttle is. Throttle really trash the other items emitted too fast.

softlion commented 1 year ago

A better debounce, also fixing the return bug in the above version.

using System.Reactive.Concurrency;

namespace Utilities;

    // public class TestProgram
    // {
    //     public void TestThrottleFirst(string[] args)
    //     {
    //         Console.WriteLine("Starting test");
    //
    //         new int[] { 100, 200, 300, 400, 500, 600, 1000 }.ToObservable()
    //             .SelectMany(v => Observable.Timer(TimeSpan.FromMilliseconds(v)).Select(w => v))
    //             .ThrottleFirst(TimeSpan.FromMilliseconds(210))
    //             .Subscribe(Console.WriteLine, () => Console.WriteLine("Done"));
    //     }
    // }

public static class ReactiveExtensions
{
    /// <summary>
    /// Ignores all items following another item before the 'delay' window ends 
    /// </summary>
    public static IObservable<T> ThrottleFirst<T>(this IObservable<T> source, TimeSpan delay, IScheduler? timeSource = null) 
        => new ThrottleFirstObservable<T>(source, delay, timeSource ?? Scheduler.Default);

    sealed class ThrottleFirstObservable<T> : IObservable<T>
    {
        readonly IObservable<T> source;
        readonly IScheduler timeSource;
        readonly TimeSpan timespan;

        internal ThrottleFirstObservable(IObservable<T> source, TimeSpan timespan, IScheduler timeSource)
        {
            this.source = source;
            this.timeSource = timeSource;
            this.timespan = timespan;
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            var parent = new ThrottleFirstObserver<T>(observer, timespan, timeSource);
            source.Subscribe(parent, parent.DisposeCancel.Token);
            return parent;
        }
    }

    sealed class ThrottleFirstObserver<T> : IDisposable, IObserver<T>
    {
        readonly IObserver<T> downstream;
        readonly TimeSpan delay;
        readonly IScheduler timeSource;

        DateTimeOffset nextItemTime = DateTimeOffset.MinValue;

        internal CancellationTokenSource DisposeCancel { get; } = new();

        internal ThrottleFirstObserver(IObserver<T> downStream, TimeSpan delay, IScheduler timeSource)
        {
            downstream = downStream;
            this.timeSource = timeSource;
            this.delay = delay;
        }

        public void Dispose() => DisposeCancel.Cancel();
        public void OnCompleted() => downstream.OnCompleted();
        public void OnError(Exception error) => downstream.OnError(error);

        /// <summary>
        /// Always emit 1st value
        /// Wait 'delay' before emitting any new value
        /// Ignores all values in between
        /// </summary>
        public void OnNext(T value)
        {
            var now = timeSource.Now;
            if (now >= nextItemTime)
            {
                nextItemTime = now.Add(delay);
                downstream.OnNext(value);
            }
        }
    }
}
quinmars commented 1 year ago

@obiben

#395 (comment) and if not, the latest emitted value would be emitted at due time, rather than waiting until a value is emitted past due time.

I don't agree. You are creating another operator that adjust the delay between emitted items. That's not what throttle is. Throttle really trash the other items emitted too fast.

Emitting the last value is a very important feature of the Throttle and Sample operator. Imagine, a user is typing some input string, let say "Hello World" and your application is performing some heavy/time-consuming operation on that. So you want to reduce the elements of the input stream. Depending on your preferences, this can start with the first key stroke ("H") or after some characters (e.g. "Hello"). But in any case I can think of, you want to perform the operation on the last full string ("Hello World"). Out of curiosity, what is your use case here?

idg10 commented 1 year ago

I'm reopening this because it's one of the areas we want to deal with, as described in https://github.com/dotnet/reactive/discussions/1868