Async version of Observable.Create does not signal cancellation token when unsubscribed if subscribeAsync implementation has thread blocking operations #2052
System.Reactive 6.0.0 (latest stable, works in preview 6.0.1 as well)
Tested on Windows (will be able to test on Linux in a few days, but the issue is probably not platform-specific).
When using Observabe.Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync), the CancellationToken passed to subscribeAsync action should normally be signaled when the resulting sequence is unsubscribed from.
However, I've encountered a situation when this behavior is not working - it seems to break if subscribeAsync contains thread blocking operations, such as blocking device reads or a simple Thread.Sleep.
Minimal example:
using System.Reactive.Concurrency;
using System.Reactive.Linq;
var o = Observable.Create<int>((observer, ct) =>
{
Int32 i = 0;
while (!ct.IsCancellationRequested)
{
Console.WriteLine("Still going");
observer.OnNext(i++);
Thread.Sleep(250); // This was SerialPort.ReadByte in real code
// Will work with await Task.Delay(250);
}
Console.WriteLine($"Observable.Create loop exited (IsCancellationRequested = {ct.IsCancellationRequested})");
return Task.CompletedTask;
});
Console.WriteLine("Subscribing");
var subscription = o
.SubscribeOn(TaskPoolScheduler.Default) // Doesn't really matter
.Subscribe(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("Complete"));
await Task.Delay(1000);
Console.WriteLine("Disposing subscription");
subscription.Dispose();
Console.WriteLine("Disposed");
await Task.Delay(1500);
This will produce the following output:
Subscribing
Still going
0
Still going
1
Still going
2
Still going
3
Disposing subscription
Disposed
Still going <---- this shouldn't happen
Still going
Still going
Still going
Still going
Still going
Note that the use of TaskPoolScheduler here is just for conciseness. The issue will persist without any schedulers - for example if exception is thrown in some downstream subscriber (which is how I encountered this issue in the first place).
I understand that having blocking calls in Observable.Create is non-ideal, but also don't see any reason for the cancellation token behavior to depend on this implementation detail.
Another remark - adding a single Task.Yield at the beginning of subscribeAsync solves the problem. Unfortunately I don't understand the intricate details of how Sinks and other internal stuff works yet, so not sure why this is happening.
Bug
System.Reactive 6.0.0 (latest stable, works in preview 6.0.1 as well) Tested on Windows (will be able to test on Linux in a few days, but the issue is probably not platform-specific).
When using
Observabe.Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync)
, theCancellationToken
passed tosubscribeAsync
action should normally be signaled when the resulting sequence is unsubscribed from. However, I've encountered a situation when this behavior is not working - it seems to break ifsubscribeAsync
contains thread blocking operations, such as blocking device reads or a simpleThread.Sleep
.Minimal example:
This will produce the following output:
Note that the use of TaskPoolScheduler here is just for conciseness. The issue will persist without any schedulers - for example if exception is thrown in some downstream subscriber (which is how I encountered this issue in the first place).
I understand that having blocking calls in Observable.Create is non-ideal, but also don't see any reason for the cancellation token behavior to depend on this implementation detail.
Another remark - adding a single Task.Yield at the beginning of
subscribeAsync
solves the problem. Unfortunately I don't understand the intricate details of how Sinks and other internal stuff works yet, so not sure why this is happening.