dotnet / reactive

The Reactive Extensions for .NET
http://reactivex.io
MIT License
6.68k stars 748 forks source link

CancellationToken in ToAsyncEnumerable can cause NullReferenceException #1726

Open slavashar opened 2 years ago

slavashar commented 2 years ago

Bug

It look like there is a race condition in ToAsyncEnumerable which can cause NullReferenceException when the cancellation token is requested.

Which subcomponent library (Ix, Async.Ix)?

Ix

Which library version?

System.Linq.Async 6.0.1

What are the platform(s), environment(s) and related component version(s)?

Windows, .NET6

What is the use case or problem?

Cancellation of ObservableAsyncEnumerable.

What is the expected outcome?

OperationCanceledException is thrown.

What is the actual outcome?

NullReferenceException is thrown intermittently.

What is the stacktrace of the exception(s) if any?

 ---> System.NullReferenceException: Object reference not set to an instance of an object.
   at System.Linq.AsyncEnumerable.ObservableAsyncEnumerable`1.MoveNextCore() in /_/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToAsyncEnumerable.Observable.cs:line 88
   at System.Linq.AsyncIteratorBase`1.MoveNextAsync() in /_/Ix.NET/Source/System.Linq.Async/System/Linq/AsyncIterator.cs:line 70
   at System.Linq.AsyncIteratorBase`1.MoveNextAsync() in /_/Ix.NET/Source/System.Linq.Async/System/Linq/AsyncIterator.cs:line 75
   at Program.<<Main>$>g__ConsumeAsync|0_2(Subject`1 subject, CancellationToken ct) in C:\Users\shara\source\repos\ToAsyncEnumerableTest\Program.cs:line 30

Do you have a code snippet or project that reproduces the problem?

using System.Reactive.Subjects;

Parallel.For(1, 10, _ => RunAsync().Wait());

async Task RunAsync()
{
    // Set custom context to avoid sychronous execution of enumerable continuation
    SynchronizationContext.SetSynchronizationContext(new CustomSynchronizationContext());

    using var subject = new Subject<int>();
    using var cts = new CancellationTokenSource();

    var task = ConsumeAsync(subject, cts.Token);

    subject.OnNext(1);
    cts.Cancel();

    try
    {
        await task;
    }
    catch (OperationCanceledException)
    {
        // ignore
    }
}

async Task ConsumeAsync(Subject<int> subject, CancellationToken ct)
{
    await foreach (var _ in subject.ToAsyncEnumerable().WithCancellation(ct))
    {
    }
}

internal class CustomSynchronizationContext : SynchronizationContext
{
}
theodorzoulias commented 2 years ago

I can reproduce it. This is the line where the NRE is thrown:

if (_values!.TryDequeue(out _current!))
omaskery commented 10 months ago

Hi, is there any known workaround for this, or plans to merge the linked PR in the near future? This is currently affecting me with .NET 7, System.Interactive 6.0.1.