dotnet / reactive

The Reactive Extensions for .NET
http://reactivex.io
MIT License
6.57k stars 736 forks source link

Issue with CancellationToken lifetime in Observable.Using (async version) #2089

Open DenomikoN opened 3 months ago

DenomikoN commented 3 months ago

Bug

In the async version of Observable.Using, in observable factory delegate cancellationToken gets canceled not at the moment of destroying subscription. The synchronous version looks ok as there are no cancellation tokens.

Reproduced in System.Reactive.Linq 6.0.0, x64, Win11, dotnet version 8.0.100

using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using static System.Reactive.Linq.Observable;

public class Program
{
    public static async Task Main()
    {
        var enumeration = Observable.Using<int, IDisposable>(
                async (cancellation) => {
                    cancellation.Register(() => Console.WriteLine("Resource factory cancellation called"));
                    await Task.Yield();
                    Console.WriteLine("Created resource");
                    return Disposable.Create(() => Console.WriteLine("Disposed resource"));
                },

                async (resource, cancellation) => {
                    cancellation.Register(() => Console.WriteLine("Resource observable cancellation called")); // why is this called not when subscription disposing?

                    Console.WriteLine("Init resource observable");
                    await Task.Yield();
                    // return -1 immediately and then tick with 1 second interval
                    var underlyingObservable = Observable.Return(-1)
                                                        .Concat(
                                                            Observable
                                                            .Interval(TimeSpan.FromSeconds(1))
                                                            .Select((t,i) => i).Take(10)
                                                        );
                    return new AnonymousObservable<int>((observer) => {
                        Console.WriteLine("Subscribed");
                        return new CompositeDisposable(
                            underlyingObservable.Subscribe(observer),
                            Disposable.Create(() => Console.WriteLine("Unsubscribed"))
                        );
                    });
                }
            );

        enumeration = enumeration.Publish().RefCount();

        var subscription = enumeration.Subscribe(
            num => Console.WriteLine("Next:{0}", num),
            ex => Console.WriteLine("Error:{0}", ex),
            () => Console.WriteLine("Completed")
        );

        await Task.Delay(5000);

        subscription.Dispose();
    }
}

Actual output:

Created resource
Init resource observable
Resource factory cancellation called
Subscribed
Next:-1
Resource observable cancellation called  << here is the problem
Next:0
Next:1
Next:2
Next:3
Unsubscribed
Disposed resource

Expected output:

Created resource
Init resource observable
Resource factory cancellation called
Subscribed
Next:-1
Next:0
Next:1
Next:2
Next:3
Resource observable cancellation called
Unsubscribed
Disposed resource
idg10 commented 2 weeks ago

in observable factory delegate cancellationToken gets canceled not at the moment of destroying subscription

The cancellation token's purpose is not to tell you that the subscription has ended (although under certain circumstances, unsubscription will trigger the thing that it's actually telling you about). Its purpose is to tell the callback that the result the callback was meant to produce is no longer required. This is related to the subscription only insofar as if someone unsubscribes before your callback returns, cancellation may occur. But there are no guarantees at all about what that token does after your callback returns.

Arguably we have a documentation bug here, because where the docs say that these tokens allow:

cancellation at any stage of the resource acquisition or usage

it is at best unclear, and possibly misleading, to say "usage" here. You could reasonably interpret the resource as being "used" for the entire lifetime of the subscription. I think this should really have said:

cancellation at any stage of resource acquisition, or of the creating a new IObservable<TResult>> from that resource

So we use the resource to create an observable. (That's the cancellable part of "using" the resource.) And then we go on to use the resulting IObservable<TResult>>. (And for that stage, it's the call to Dispose that tells us when it's all over, not the cancellation token.)

The design intent is that the cancellation token's state is only meaningful for as long as the callback is running. Once the callback has returned, there's no longer any work left to cancel.

Why am I saying that's the design intent?

  1. there's already a separate mechanism for dealing with this longer type of "usage" (the call to Dispose)
  2. looking at the code, it's clear that these cancellation tokens are only intended to have any meaning for as long as the callbacks are in progress
  3. the unit tests are consistent with this interpretation

So in your example, I wouldn't have expected the cancellation tokens to be signalled at all. However, as the documentation says, this method supports:

best-effort cancellation

So that means two things:

  1. no guarantees about when cancellation will occur
  2. no guarantees about whether or not the token will be cancelled

Since cancellation is described as 'best effort' here, the guarantees are very weak. The only thing that is guaranteed is that each cancellation token won't be signalled while the callback is running if the result of the callback is still required.

There's definitely something a bit peculiar about the behaviour this example produces. I would not have expected the cancellation token to get signalled when nothing needed to be cancelled. (And both of your callbacks run to completion, and the subscription continue to run for some time after that. It's definitely too late to cancel a callback if that callback already returned!) But since the behaviour of the token after it has served its useful purpose is, strictly speaking, undefined, technically it is not a bug the token may be signalled long after it's useful, and that's what you're actually seeing here.

I just want to clarify why I'm saying it's occurring after it's useful (i.e., later than we might reasonably expect), when you're reporting that it's coming earlier than you think it should.

The behaviour you describe as expected behaviour:

Created resource
Init resource observable
Resource factory cancellation called
Subscribed
Next:-1
Next:0
Next:1
Next:2
Next:3
Resource observable cancellation called
Unsubscribed
Disposed resource

is also not what I would have expected here.

Here's the output I was expecting:

Created resource
Init resource observable
Subscribed
Next:-1
Next:0
Next:1
Next:2
Next:3
Unsubscribed
Disposed resource

I.e., I wasn't expecting either of the cancellation tokens to get cancelled, because both of the callbacks ran to completion (meaning that there was no work left to cancel).

If you interpret the documentation ("any stage of the resource acquisition or usage") to extend to the full lifetime of the subscription, your interpretation makes sense. But this has never been what the implementation was actually trying to achieve. So that's why I'm classifying this as a documentation bug.

If you want to know when the resource is no longer in use, the call to Dispose tells you that.

I think it would be cleaner if they never got cancelled in this situation. But there's every chance that code out there somewhere is relying on the fact that they do get cancelled after the work they were meant to govern completes, so I think we're kind of stuck with that behaviour.

So in conclusion:

  1. if I were to modify the behaviour here, it would be to prevent these cancellation tokens from getting cancelled at all in this particular scenario (i.e., neither the behaviour you observed, nor the behaviour you said you expected)
  2. the current behaviour is odd but not outright wrong, and it's possible that some code relies on the current behaviour, so changing it could be counterproductive
  3. the documentation for this method needs to be more clear about what these cancellation tokens are actually for