dotnet / reactive

The Reactive Extensions for .NET
http://reactivex.io
MIT License
6.68k stars 749 forks source link

Window and Buffer operators omit items which are released immediately #2091

Closed adamjones2 closed 2 months ago

adamjones2 commented 6 months ago

I'm trying to partition a stream into windows according to a predicate on the elements. That is, implement a function like

public static IObservable<IObservable<T>> Window<T>(this IObservable<T> source, Func<T, bool> isStartOfNewWindow);

(Incidentally, I feel like this is a common use case that should be part of the library.) I looked in the IntroToRx docs and found the recommended approach is this:

public static IObservable<IObservable<T>> Window<T>(this IObservable<T> source, Func<T, bool> isStartOfNewWindow)
{
    var shared = source.Publish().RefCount();
    var windowEdge = shared.Where(isStartOfNewWindow).Publish().RefCount();
    return shared.Window(windowEdge, _ => windowEdge);
}

A simple test reveals this does appear to work well:

var source = Observable.Interval(TimeSpan.FromSeconds(1));
var windowed = source.Window(x => x == 0 || x % 5 == 2).SelectMany(o => o.ToList());
windowed.Subscribe(xs => Console.WriteLine(string.Join(", ", xs)));

This prints 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, etc as expected.

However, if I now prepend some items to the source sequence, it does not work correctly:

var source = Observable.Interval(TimeSpan.FromSeconds(1)).Prepend(-1); // Prepend -1
var windowed = source.Window(x => x == -1 || x % 5 == 2).SelectMany(o => o.ToList()); // Change x == 0 condition to x == -1, as that's now the first item
windowed.Subscribe(xs => Console.WriteLine(string.Join(", ", xs)));

This prints the same as the first example - ie. ignoring the added -1, even though that should now participate in the first window and the first line should be -1, 0, 1. I observe the same behaviour with the analogous Buffer operator. I also notice defining source instead by

var source = Observable.Defer(async () => { return Observable.Return(-1L); }).Concat(Observable.Interval(TimeSpan.FromSeconds(1)));

has the same bad behaviour, but

var source = Observable.Defer(async () => { await Task.Delay(1); return Observable.Return(-1L); }).Concat(Observable.Interval(TimeSpan.FromSeconds(1)));

does not, and includes the -1 correctly. I obviously don't want to be introducing artificial delays into my streams though as a solution.

What is the correct way to implement the function I need, regardless of the timing of the events in the input sequence? If it's what I already did, can a fix be implemented in Window and Buffer for this behaviour?

adamjones2 commented 6 months ago

I also note

var source = Observable.Return(-1L).ObserveOn(ThreadPoolScheduler.Instance).Concat(Observable.Interval(TimeSpan.FromSeconds(1)));

works, but the windowing function can't modify the source that way without putting the entire thing on that scheduler, so this is only a hack of the input to workaround the function's deficiency and not a true solution. Perhaps useful for diagnosis though.

EDIT: I've also realised this causes the subscribe action of the original sequence (Observable.Interval(TimeSpan.FromSeconds(1))) to be offloaded onto another thread as well, which is causing issues for my use case, so I can't use it even as a workaround.

adamjones2 commented 6 months ago

In the end I had to hand-spin the implementation of the function a different way, but I'd still appreciate some insight into this and would suggest the below as a candidate for inclusion into the library.

public static IObservable<IObservable<T>> Window<T>(this IObservable<T> source, Func<T, bool> isWindowStart) => 
    Observable.Create<IObservable<T>>(observer =>
    {
        Subject<T>? currentWindow = null;
        return source.Subscribe(Observer.Create<T>(
            next =>
            {
                if (currentWindow == null || isWindowStart(next))
                {
                    currentWindow?.OnCompleted();
                    currentWindow = new Subject<T>();
                    observer.OnNext(currentWindow);
                }
                currentWindow.OnNext(next);
            },
            ex => { currentWindow?.OnError(ex); observer.OnError(ex); },
            () => { currentWindow?.OnCompleted(); observer.OnCompleted(); }
        ));
    });
idg10 commented 3 months ago

We can make a relatively simple change to your original example, i.e. this:

public static IObservable<IObservable<T>> Window<T>(this IObservable<T> source, Func<T, bool> isStartOfNewWindow)
{
    var shared = source.Publish().RefCount();
    var windowEdge = shared.Where(isStartOfNewWindow).Publish().RefCount();
    return shared.Window(windowEdge, _ => windowEdge);
}

We can replace the last line with this:

return shared.Window(_ => windowEdge);

The significance of this single-argument Window overload is that it guarantees to partition the input elements. I.e., each input element will appear in exactly one window.

There is an inherent limitation with the openings/closing overload, which is what your example (and also the IntroToRx sample on which that was based) uses:

public static IObservable<IObservable<TSource>> Window<TSource, TWindowOpening, TWindowClosing>(
    this IObservable<TSource> source,
    IObservable<TWindowOpening> windowOpenings,
    Func<TWindowOpening, IObservable<TWindowClosing>> windowClosingSelector)

It's impossible for this overload to make that promise of always delivering each item into exactly one window. This turns out to be a necessary (although perhaps not entirely obvious) upshot of the fact that this overload is designed specifically to allow gaps or overlaps.

The only reason you would specify openings and closings separately is if you didn't necessarily want strict partitioning. This overload allows gaps between windows (in which case any items falling between windows will be dropped, by design), or overlaps (in which case the same element will appear in multiple windows if those windows overlap).

Since this openings/closings overload accepts two distinct observables, and since Rx does not offer any way for two distinct observables to emit items at exactly the same time, it's not actually possible to use this openings/closings overload as a more general version of the closings-only overload shown above. You might expect that in situations where the closings callback returns the same observable source as was passed as the openings source, it would be possible for Rx to detect that this is actually the same thing, and to therefore somehow detect when two events emerging from these two sources are somehow "the same" event. But in general that can't work because Window subscribes to each observable returned by the closing selector. In general in Rx, multiple subscriptions to the same source are allowed to produce different sequences. (Rx has no way of detecting when something is in fact a 'hot' source, where all subscribers receive exactly the same events at logically the same time.)

If events in Rx were timestamped, then it would be possible to say definitely whether two events occurred at exactly the same time, but since they are not, there is no concept of precisely simultaneous events.

So this openings/closings overload is inherently imprecise (because when what we might think of as 'the same' event emerges through multiple subscriptions, these multiple deliveries happen at slightly different times). Fundamentally, because there's no way to represent the idea that two distinct events happened at precisely the same time, this openings/closings operator is always going to be a bit fuzzy around the edges.

So why does introtorx.com suggest the openings/closings form? I can't provide a definitive answer to that because that example dates back to the original edition of the book, which I did not write.

When I made updates to the book to produce the 2nd (current) edition, I'm afraid I did not notice that the example you found can go wrong in this way.

I've created a new issue identifying that example as a doc bug: #2130

If you were able to try this change out and see if it works for you that would be great—if it turns out that there are other reasons you can't do it this way, it would be good to know.

adamjones2 commented 2 months ago

Hi @idg10, many thanks for your detailed and thought out response, it's appreciated! That context all makes sense. I assume by

return shared.Window(_ => windowEdge);

you mean to say

return shared.Window(windowEdge);

as I can't find an overload like the former. I've tested it and it does appear to work, thanks! Simple solution in the end, don't know how I missed that one.

idg10 commented 2 months ago

Sorry, yes, that is what I meant.

Since the original issue you reported is an unavoidable aspect of the overload accepting separate open/close observables, and since using the non-overlapping overload sounds like it's working for you, I'm going to close this issue.