dotnet / reactive

The Reactive Extensions for .NET
http://reactivex.io
MIT License
6.71k stars 750 forks source link

Add Window and Buffer overloads to allow specifying maximum time length and element count when window/buffer opening is controlled by another sequence #128

Open glopesdev opened 9 years ago

glopesdev commented 9 years ago

Basically, add the following overloads to Window (respectively for Buffer):

public static IObservable<IObservable<TSource>> Window<TSource, TWindowOpening>(
    this IObservable<TSource> source,
    IObservable<TWindowOpening> windowOpenings,
    TimeSpan timeSpan,
    IScheduler scheduler
)

public static IObservable<IObservable<TSource>> Window<TSource, TWindowOpening>(
    this IObservable<TSource> source,
    IObservable<TWindowOpening> windowOpenings,
    int count
)

public static IObservable<IObservable<TSource>> Window<TSource, TWindowOpening>(
    this IObservable<TSource> source,
    IObservable<TWindowOpening> windowOpenings,
    TimeSpan timeSpan,
    int count,
    IScheduler scheduler
)

This would allow using external events to specify the opening of overlapping windows while using the traditional element count and time length conditions to automatically close the windows.

Note that so far as I understand it, there is no trivial implementation for the _count_ overload, even if you wanted to implement this using the existing overload with window closing factory. It would be exceedingly useful to have this built into the framework for completeness.

akarnokd commented 4 years ago

Sounds like an uncommon usage to me. Would an implementation in a 3rd party library work for you?

glopesdev commented 4 years ago

window

Unless I am misunderstanding the purpose of the Window operators in general, this is not an uncommon usage at all.

Window operators are common and fundamental for all kinds of data analysis and signal processing. Above are some examples of the kind of stuff you can do with various windowing strategies. The last line is what is currently missing from the toolkit, and it's actually one of the most common applications of windowing in science and engineering.

Defining regular regions of interest to analyse patterns around known events is one of the most common data analysis technique for time-series data. Basically it allows you to ask questions of the sort: "what tends to happen to [seismograph, stock market, brain activity, insert your favorite time series here] data whenever this event happens?".

That this was considered important by the designers of the original Window API is revealed in the design of current TOpening, TClosing overloads. Unfortunately, for analysis purposes in most cases you don't really care about dynamic closing, as you want to average or visualize across comparable time intervals or comparable element counts, so it's an overkill. This proposal aims to rectify this and bring the convenience of count and timespan boundaries to the dynamic slicing behaviour of the Window operator.

If there is no time to implement this, feel free to leave the issue open and I might try to submit a pull request for future review.

akarnokd commented 4 years ago

I'm not questioning the existence of an use case, I'm questioning why this should be in Rx.NET proper. Just because there are other Window operators doesn't justify the inclusion of more. Also it is not just a question if we can write it but are we willing to keep supporting it once the original poster left. Third, my experience in the ReactiveX world tells me that users tend to overestimate how widely certain operators would be actually needed.

akarnokd commented 4 years ago

Here is an example how this could be implemented anywhere with the existing Window operator by using a feedback subject:

class Program
{
    static void Main(string[] args)
    {
        new int[] {
            100,
            600, 610, 620, 630, 640, 650,
            1100, 1700
        }.ToObservable()
        .SelectMany(t => Observable.Timer(TimeSpan.FromMilliseconds(t)).Select(_ => t))
        .Window(Observable.Never<int>(), 5, TimeSpan.FromMilliseconds(500), Scheduler.Default)
        .Do(_ => Console.WriteLine("== New window =="))
        .Merge()
        .Subscribe(Console.WriteLine);

        Console.ReadLine();
    }
}

static class Ops
{
    public static IObservable<IObservable<T>> Window<T, U>(
        this IObservable<T> source, 
        IObservable<U> boundary, 
        int maxSize, 
        TimeSpan maxTime, IScheduler scheduler)
    {
        return Observable.Defer(() =>
        {
            var boundarySubject = new Subject<int>();
            var index = 0;

            var complexBoundary = boundarySubject.Synchronize()
                .Merge(boundary.Select(v => 3))
                .Publish(shared =>
                {
                    return shared.Merge(
                        shared.Prepend(-1)
                        .Select(_ => Observable.Timer(maxTime, scheduler))
                        .Switch()
                        .Select(_ => 1)
                    );
                })
                .Do(_ => Interlocked.Exchange(ref index, 0))
                ;

            return Observable.Window(
                source,
                complexBoundary
            )
            .Select(window => {
                return window.Select(v => {
                    var obs = Observable.Return(v);
                    if (Interlocked.Increment(ref index) == maxSize)
                    {
                        obs = obs.Concat(Observable.Defer(() =>
                        {
                            boundarySubject.OnNext(0);
                            return Observable.Empty<T>();
                        }));
                    }
                    return obs;
                }).Concat();
            });
        });
    }
}
glopesdev commented 4 years ago

The judgment of whether or not this is a common case is subjective, and since we clearly come from very different domains it's probably not going to be useful at this point to argue about it.

I take your point about maintenance, which to be honest is enough to leave the issue in the backlog.

However, I stand by my original point that this is not about adding extra cases to Window. This is about taking the existing overloads which have already been presented as a convenience, and rounding up the options of the operator semantics for common variations.

At some point someone thought having both a Window(count) and a Window(timeSpan) was useful, rather than forcing every consumer of Window to write custom sequences just to slice an input stream into fixed size chunks. This indicates to me that the original designers of the API had exactly the applications I presented above in mind, but I may be confused, so if anyone can clarify to me why Window was made in the first place, and what were in fact its intended applications, that should help me to understand whether this issue should stay or go.

Finally, for reference, I leave here a more concise implementation of all overloads using existing combinators:

public IObservable<IObservable<TSource>> Window<TSource, TWindowOpening>(
    IObservable<TSource> source,
    IObservable<TWindowOpening> windowOpenings,
    TimeSpan timeSpan,
    IScheduler scheduler)
{
    return source.Window(windowOpenings, x => Observable.Timer(timeSpan, scheduler));
}

public IObservable<IObservable<TSource>> Window<TSource, TWindowOpening>(
    IObservable<TSource> source,
    IObservable<TWindowOpening> windowOpenings,
    int count)
{
    return source.Publish(ps => ps.Window(windowOpenings, x => ps.Take(count).Count()));
}

public IObservable<IObservable<TSource>> Window<TSource, TWindowOpening>(
    IObservable<TSource> source,
    IObservable<TWindowOpening> windowOpenings,
    TimeSpan timeSpan,
    int count,
    IScheduler scheduler)
{
    return source.Publish(ps => ps.Window(windowOpenings, x => Observable.Merge(
        Observable.Timer(timeSpan, scheduler),
        ps.Take(count).LongCount())));
}