dotnet / reactive

The Reactive Extensions for .NET
http://reactivex.io
MIT License
6.71k stars 750 forks source link

Observable.Interval().Take() hangs with ImmediateScheduler #1983

Open idg10 opened 1 year ago

idg10 commented 1 year ago

Library version: 6.0.0

This is arguably just an unsupported scenario, since Observable.Interval is designed to work with schedulers suitable for time-based operation, and ImmediateScheduler is not, in general, suitable for that.

However, there's a scenario that people might reasonably expect to work:

IObservable<long> source = Observable.Interval(TimeSpan.FromSeconds(1), ImmediateScheduler.Instance)
                       .Take(5);
source.Subscribe(i => Console.WriteLine($"OnNext({i}): {DateTime.Now}"));
Console.WriteLine($"Finished: {DateTime.Now}");

This initially seems to work, producing this output:

OnNext(0): 02/08/2023 08:56:22
OnNext(1): 02/08/2023 08:56:23
OnNext(2): 02/08/2023 08:56:24
OnNext(3): 02/08/2023 08:56:25
OnNext(4): 02/08/2023 08:56:26

But it then hangs. The call to Subscribe never returns. And internally, the interval continues merrily ticking away, delivering notifications into a Take operator that is no longer listening.

Although the ImmediateScheduler is not designed for timed operations, it does still implement the time-based IScheduler methods, and as the output above shows, they do work. They block, not returning until the specified time elapses, because that's what ImmediateScheduler does. (Well, a DateTimeOffset-based scheduler for more than 10 seconds into the future is not handled immediately, but that's not relevant to this issue.) But they do call you when you asked them to.

It's not totally unreasonable to expect this code to work. Take normally unsubscribes from its source once it has received the specified number of elements. Its source here is the Interval, so you'd expect that to stop trying to schedule any further work, meaning the ImmediateScheduler would be able to return, since all scheduled work has completed.

But it turns out that in this scenario, Take does not unsubscribe from its source once it has received the 5th element. And that's because of this code here:

https://github.com/dotnet/reactive/blob/5903ac6ace7956fd550189af2d7620a479733c02/Rx.NET/Source/src/System.Reactive/Linq/Observable/Timer.cs#L134-L147

These calls to SetUpstream are how the Timer (which is what Interval is using internally) arranges to get shut down when its subscriber unsubscribes. Logically, it is setting itself as the upstream source for Take in this example. Normally, when Take hits its count, it calls OnComplete on its subscriber, which then triggers a tear down of the subscription, at which point Take disposes its upstream, which is what's supposed to notify Take's source that it can stop.

That normally all works fine, but it doesn't work with ImmediateScheduler, because those calls to SetUpstream can't happen until the call to Schedule returns. The argument to SetUpstream in each case here is a call to the parent._scheduler. But if that's an ImmediateScheduler, it won't return until it's done. So those calls to SetUpstream can't occur until after the scheduler determines that it has no more work to do, but it can't discover that there's no more work to do if the calls to SetUpstream hasn't happened yet.

On the one hand, the basic problem here is that we're using a scheduler that doesn't work in the way Interval requires, so this could be dismissed as a non-bug.

On the other hand, this raises the question of whether a race condition exists in supported scenarios in which that SetUpstream might not occur before a downstream sink tries to shut down the timer, but fails to do so because it did that before the call to Schedule returns. E.g., if we're using a timer-friendly scheduler, might it determine that it can actually execute the timed work immediately? Or more subtly, what if it schedules work on a queue, then the system bogs down, and the thread that called Schedule doesn't return up the stack fast enough, and the scheduled work manages to run and then tries to shut down the subscription, but the timer runs forever because the call to SetUpstream happened after the sink we're calling SetUpstream on has already stopped. We might be OK because it might be using a disposable that handles this race correctly, but we should at least check.

So it would be worth verifying that the possible race is handled correctly with non-immediate schedulers. It's also worth reviewing the rational for the way the SchedulePeriodic extension method emulates periodic scheduling on schedulers that don't inherently support it—it was changed at some point in the past, and there's a lengthy and complex explanation of the thinking behind that change. But I think that fixing this for the immediate scheduler would be tricky because in the current design for periodic scheduling, the only way to cancel a periodically scheduled work item is to Dispose the object returned by SchedulePeriodic, and that's fundamentally incompatible with immediate scheduling. This is why the ImmediateScheduler is, in general, considered unsuitable for use in timer-based sources, so this is probably just going to be resolved as "by design".

akarnokd commented 1 year ago

This is the limitation of the IObservable - IObserver contract. Synchronous cancellation is not possible in general.

idg10 commented 1 year ago

@akarnokd wrote:

This is the limitation of the IObservable - IObserver contract

I've got two reasons for thinking that this is an inaccurate diagnosis of this problem:

  1. It's totally possible to fix this (see below)
  2. This works with other built-in schedulers. (If the problem were inherent to the IObservable - IObserver contract it would occur no matter which scheduler you use.)

So this isn't a case of "Can we fix this?". (The answer is: "Yes we can.") The question is "should we?"

In inclined to say that no, we should not fix this. ImmediateScheduler looks like a daft choice here, so the only purpose of this work would be to support something that I don't think people should be attempting. And it would entail modifying a code path that a lot of existing code uses, so there are risks to such a change.

I will now explain why I believe the fundamental issue here is the way in which Timer uses schedulers (and not in fact anything to do with the nature of the IObservable - IObserver contract).

The problem occurs because Timer calls the SchedulePeriodic extension method in a way that results in it running synchronously when the caller has specified the ImmediateScheduler. It's a problem if SchedulePeriodic runs synchronously, because the code paths involved make the assumption that the scheduler is going to return an IDisposable representing the work item before attempting to run the work item, providing an opportunity to set up all the cancellation logic necessary to enable auto-shutdown in cases like this.

It might seem like that's an unsurmountable problem with the ImmediateScheduler because by definition, it does things immediately. But in fact, there are cases where it is obliged to use a trampoline. It guarantees "Immediate at the outermost point of use", and it will also run immediately where possible when nested scheduling occurs, but it will use a trampoline if necessary to avoid re-entrancy.

And we can exploit that to make the Interval example shown above work.

Interval is a wrapper for Timer, and the following example shows a modification to the Timer.Periodic._ class's Run method (the same method linked to at the top of this issue). (There are some issues with this, so if we were to fix it, I wouldn't do it exactly this way. This is just to demonstrate that nothing in the IObservable - IObserver contract prevents us from fixing this.)

And just to be clear, this continues to provide immediate semantics if you choose the ImmediateSchedulerSubscribe won't return until it's done if you do that. All this does is enable auto-shutdown to work, so that if you do something like Timer(...).Take(5), the timer will correctly shut down when it's no longer needed, and the immediate call to Subscribe will return at that point.

public void Run(Periodic parent, TimeSpan dueTime)
{
    //
    // Optimize for the case of Observable.Interval.
    //
    if (dueTime == _period)
    {
        // Experimental hack to ensure the following:
        //  1)  that we are able to return an IDisposable before doing anything
        //      (even if the caller told us to use the ImmediateScheduler)
        //      so that in cases where this is nested inside subscription
        //      via some Producer-based downstream operator (e.g. Take),
        //      the downstream has a chance to call SetUpstream before we
        //      start our periodic timer.
        //  2)  that when we kick off the SchedulePeriodic against the
        //      ImmediateScheduler, we force it into a position where it
        //      has to defer the start of that timer, meaning that we're
        //      able to get hold of the IDisposable that will enable us
        //      to cancel it.

        CurrentThreadScheduler.Instance.Schedule(
            (This: this, parent, dueTime),
            static (innerCtScheduler, p) => {
                // In cases where subscription has come in through a downstream Producer-based sink
                // this callback will effectively run inside Producer<TTarget, TSink>.SubscribeRaw via the
                // CurrentThreadScheduler trampoline. (And if there was no trampoline already above
                // us on the stack, CurrentThreadScheduler.Instance.Schedule will just invoke
                // us synchronously.)
                //
                return p.parent._scheduler.Schedule(
                    p.This,
                    p.dueTime,
                    static (IScheduler innerScheduler, _ @this) =>
                    {
                        // This callback is invoked by whatever scheduler the caller specified.
                        // If that's the ImmediateScheduler, we don't want to call SchedulePeriodic
                        // from here, because it won't return until it's done, meaning we will have
                        // no way of obtaining an IDisposable through which to cancel it.
                        //
                        // But we can force it to defer execution of the SchedulePeriodic work item
                        // by doubly-nesting through the innerScheduler.
                        //
                        // If the caller specified ImmediateScheduler, the innerScheduler here will
                        // be an ImmediateScheduler.AsyncLockScheduler. If we schedule a work item
                        // through that inner scheduler from this context, it will just invoke it
                        // immediately (because ImmediateScheduler always tries to do that). However,
                        // it detects re-entrancy, and if you're already in a work item scheduled
                        // by that inner scheduler and you try to schedule another work item, THAT
                        // work item does not run immediately (because if it did, there would be
                        // a high risk of stack overflow in recursive scheduling scenarios).
                        // So in those cases, the ImmediateScheduler.AsyncLockScheduler queues up
                        // the work (the queuing actually happens inside AsyncLock, which also
                        // supplies its own trampoline to drain the queue), meaning it returns
                        // immediately, enabling us to get hold of an IDisposable representing
                        // the periodic work item.
                        //
                        // So that way, we can set it as our Upstream. And since going through
                        // CurrentThreadScheduler.Instance.Schedule above means that any Producer-based
                        // downstream will have us as an upstream, this enables auto-shutdown
                        // (e.g., when a Take determines it has reached the end) to work, because
                        // we had a chance to set up all of the necessary upstreams before beginning
                        // to run the period work item.
                        //
                        // We still get immediate semantics when using the ImmediateScheduler -
                        // a call to the downstream operator's Subscribe won't return until
                        // completion. But because of the deferred kick-off here, all upstreams
                        // are in place, meaning the auto-teardown works, and Subscribe will
                        // return once we are done.
                        return innerScheduler.Schedule(@this, static (IScheduler innerScheduler, _ @this) =>
                        {
                            // Let's just recap how we got here.
                            // We used CurrentThreadScheduler.Instance.Schedule to kick off the
                            // outermost work item, meaning that if subscription occurred
                            // via a Producer-based downstream, the initial Subscribe call will
                            // have returned all the way up to the most downstream call to
                            // Producer.SubscribeRaw, and the outer work item will have been
                            // invoked by the CurrentThreadScheduler trampoline from inside
                            // that SubscribeRaw. Critically, this means that the corresponding
                            // Sink.Run that ultimately called this Run method will have completed
                            // its call to SetUpstream, because this Run method has already
                            // returned.
                            // Then we used the parent._scheduler to run a work item. If the
                            // caller specified ImmediateScheduler, that will have executed
                            // our work item synchronously. That work item then called
                            // innerScheduler.Schedule, and again if the caller specified
                            // ImmediateScheduler, that will also have run synchronously,
                            // and that's how we end up here. Here's a simplified version
                            // of the call stack that led us here:
                            //  Here
                            //   innerScheduler.Schedule (ImmediateScheduler.AsyncLockScheduler)
                            //    parent._scheduler.Schedule (ImmediateScheduler)
                            //     CurrentThreadScheduler Trampoline in outermost Producer-based downstream
                            //      Application call to Subscribe
                            //  
                            // Two important points:
                            //  1) We're still inside the app's call to Subscribe, so this still looks
                            //      'immediate' to the app, which is what it asked for
                            //  2) The ImmediateScheduler.AsyncLockScheduler's AsyncLock is busy, so
                            //      any further nested scheduling will be queued (but will be executed
                            //      as soon as we return from this callback, so it still looks
                            //      immediate to the app)
                            //
                            // The consequence of 2 is that this call to SchedulePeriodic will queue
                            // up its work item, enabling us to stash the IDisposable representing
                            // that work item as our upstream before that work item begins, making
                            // it possible for us to cancel the periodic work item.
                            // (With most schedulers, SchedulePeriodic would have returned us an
                            // IDisposable without these contortions. It's only the ImmediateScheduler
                            // that requires this level of persuasion to make a periodic work item
                            // cancellable.)
                            IDisposable d = innerScheduler.SchedulePeriodic(@this, @this._period, static @this => @this.Tick());
                            @this.SetUpstream(d);
                            return d;
                        });
                    });
            });
    }
    else
    {
        SetUpstream(parent._scheduler.Schedule(this, dueTime, static (innerScheduler, @this) => @this.InvokeStart(innerScheduler)));
    }
}

And just to prove that it really does work, I ran the example shown at the top of this work item after making this change:

OnNext(0): 04/08/2023 08:00:24
OnNext(1): 04/08/2023 08:00:25
OnNext(2): 04/08/2023 08:00:26
OnNext(3): 04/08/2023 08:00:27
OnNext(4): 04/08/2023 08:00:28
Finished: 04/08/2023 08:00:28

That illustrates that we still get the "immediate" semantics (the position of the Finished message shows that the call to Subscribe doesn't return until all work is complete) and that it does now return once it has finished its work.