dotnet / reactive

The Reactive Extensions for .NET
http://reactivex.io
MIT License
6.71k stars 750 forks source link

Bug in Observable.FromAsync #1664

Open tzehetner opened 2 years ago

tzehetner commented 2 years ago

Bug

Which library version?

System.Reactive 5.0.0

What are the platform(s), environment(s) and related component version(s)?

Windows, WPF, net5.0-windows10.0.19041

What is the use case or problem?

async Task DoSomethingOnBackgroundThreadAsync()
{
    await Task.Run(() => Thread.Sleep(1000));
}

//The following line is executed on the UI-thread, but unexpectedly the Handler is executed on
//a background thread, which led to hard to track down race conditions in our application.

Observable.FromAsync(() => DoSomethingOnBackgroundThreadAsync()).Subscribe(x => Handler());

A workaround for the problem is to pass DispatcherScheduler.Current to FromAsync, but it strongly feels like a bug, that should be fixed.

theodorzoulias commented 2 years ago

I think that you just need to add .ObserveOn(SynchronizationContext.Current) before subscribing to the sequence.

tzehetner commented 2 years ago

@theodorzoulias Thanks! Yes, that works. But it's very counterintuitive that I have to use ObserveOn, when the async method's continuation would actually run on the UI thread and the subscription is also done on the UI thread. This could lead to many bugs in many applications. Or is there perhaps a valid reason for this behavior?

theodorzoulias commented 2 years ago

@tzehetner I don't know why the FromAsync was designed this way. My guess is that it was introduced before the advent of async/await in C# (circa 2012), and so it was not influenced by the default behavior of await, which is to capture the current SynchronizationContext. Maybe some seasoned Rx aficionado could shed some historic light on this subject!

kmgallahan commented 2 years ago

@tzehetner Observable.FromAsync doesn't await the task, thus the continuation is executed on any available dispatcher.

fedeAlterio commented 3 months ago

This is a simple portable solution. For sure its possible to implement it more efficently, but at least the behavior is close to how awaiting tasks work.

public static IObservable<T> FromAsyncUsingAsyncContext<T>(Func<CancellationToken, Task<T>> asyncAction)
{
    if (asyncAction is null)
        throw new ArgumentNullException(nameof(asyncAction));

    return Observable.Defer(() =>
    {
        var synchronizationContext = SynchronizationContext.Current;
        if (synchronizationContext is null) 
        {
            return Observable.FromAsync(asyncAction);
        }

        return Observable.FromAsync(asyncAction, new SynchronizationContextScheduler(synchronizationContext));
    });
}