Closed paul-scharnofske-AP closed 7 months ago
Rx makes very few guarantees about the context in notifications will be delivered. So in most cases, you should make no assumptions about the context in which your subscriber callbacks (including callbacks passed to Do
) will run.
There are exceptions of course—ObserveOnDispatcher
would be pretty pointless if you couldn't rely on it delivering notifications to its direct subscribers via the dispatcher.
However, it only make that guarantee for its direct subscribers, mainly because it can't do anything more than that. All it sees is an IObserver<T>
. It has no control over what the observer does next. So if that observer chooses to, say, start a new thread, and then deliver notifications to a downstream subscriber on that thread, the ObserveOnDispatcher
step is powerless to do anything about that, because this is something that happened downstream.
With that in mind, I'm going to add some annotations to your code to clarify what you can or can't rely on at each stage:
Observable
.Return(1)
// If we subscribe here, we've got no control over what thread notifications happen on.
.ObserveOnDispatcher()
// If we subscribe here, we're guaranteed that notifications occur on the dispatcher thread
// The `Do` callback is effectively a subscription, so this callback runs on the dispatcher.
.Do(_ => Console.WriteLine($"(thread: {Thread.CurrentThread.ManagedThreadId}) before 'FromAsync'"))
// If we subscribe here, then I don't think the documentation guarantees anything at all,
// but as it happens, the `Do` operator always uses whatever thread the incoming notification
// arrived on, so in fact, subscribers will receive notifications on the dispatcher thread here too.
// Even though that's not a documented promise, I can't see us ever changing it, because too
// many apps likely rely on it.
.Select(_ =>
// Select invokes its projection callback on the same thread as it received the
// notification, so this next bit of code will run on the dispatcher thread (and to be
// more precise, within the dispatcher's SynchronizationContext.
// However it's basically irrelevant which thread we call the `ToObservable` method
// on. In general in Rx, the context from which you subscribe to an observable source
// has no bearing on the context in which that source will deliver events.
// (That's precisely why we have both `SubscribeOn` and `ObserveOn`.)
// So you should be aware that there's absolutely no reason to think the `IObservable<T>`
// returned by `ToObservable` here is going to remember anything about the context
// from which it was called. If you want an IObservable that's going to deliver events
// on a particular context, you'll need to use one of the ObserveOnXxx methods.
Example().ToObservable())
// If we subscribe here, we're *still* going to get notifications on the dispatcher thread
// because `Select` just uses whatever thread it received a notification on to deliver the projected
// value downstream. (Again, I don't think that's documented, but I'm confident it will
// never change.)
.Concat()
// So this is where it gets interesting. I believe the specific overload of Concat here will be
// the one that takes an input of type IObservable<IObservable<TSource>>, and produces
// a flattened IObservable<TSource> as a result. It happens to wait for each input source
// to complete before it moves onto the next, but that's irrelevant here because your source
// only ever produces a single observable, which itself only ever produces a single value.
// I'd call Concat a somewhat idiosyncratic way to achieve this, because it suggests you're
// stitching together items in order from a bunch of streams, when in fact you're just collecting
// a single item.
// It would be more idiomatic to replace the Select with a SelectMany and drop the Concat.
// (The 'Many' is still a bit misleading, but SelectMany is the slightly more idiomatically common
// way to flatten nested observables.)
//
// The reason this is interesting from a scheduler perspective is that this operator is actually
// the end of the line for the original notification that started from the Return operator.
// This Concat operator isn't going to forward the notification it received from its upstream.
// Instead, it *subscribes* to the observable source it receives from its upstream, and then
// returns.
// Only when one of those newly set up subscriptions receives a notification will Concat have
// anything to forward. (And in this specific instance, there will be exactly one such subscription,
// and it will only ever produce one item.)
// So at that point, the context in which Concat runs is *not* the one established by your
// earlier call to ObserveOnDispatcher. It's now the one established by the Select callback:
// Example().ToObservable()
// And since you didn't tell Rx you wanted this observable to deliver its notifications through
// any particular scheduler, you can't safely make any assumptions about the context in
// which notifications will be delivered. If you wanted the observable produced by your
// Select callback to deliver through the dispatcher you should have said so.
// That's why this does not run on the dispatcher thread:
.Do(_ => Console.WriteLine($"(thread: {Thread.CurrentThread.ManagedThreadId}) after 'Concat'"))
.Subscribe();
The key observation here is that Select
breaks the chain. More generally, anything that delivers events not directly from its upstream, but by subscribing to other observables, will break the causality chain.
If you changed that Select
callback to use this:
Example().ToObservable().ObserveOnDispatcher()
you would then explicitly be telling Rx that you do want it to capture dispatcher you're on at the point where that Select
callback runs, and use that to deliver notifications to any subscribers to that stream.
Basically, any time you introduce completely new observable sources (which is what ToObservable
is doing here), Rx won't capture any sort of context, because in general we don't. You have to tell it you want that.
And the reason we don't capture context automatically in all cases is that a) it's fairly expensive, and b) in cases where you really didn't need work to be run on the UI thread, it can ruin the responsiveness of the UI if that work does in fact happen on the UI thread.
So observable sources that generate new events out of thin air either use the most efficient, naturally available context (so the current task scheduler in this case), or they will accept a scheduler, and will pick a default scheduler if you don't supply one (e.g. Return
works that way, as do the various timer operators).
So we don't consider this behaviour to be a bug. If you want to capture the current dispatcher and have all subscribers receive notifications through that, use ObserveOnDispatcher
. If for some reason you need to be more general and want to capture the current synchronization context use the SynchronizationContextScheduler
in conjunction with ObserveOn
.
Thanks for the elaborate response.
Bug
If you use
Task.ToObservable()
without specifying a scheduler, then the following code will run: https://github.com/dotnet/reactive/blob/57e4c425f16712c00818373b2246c95821f2aafe/Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs#L244-L252 https://github.com/dotnet/reactive/blob/57e4c425f16712c00818373b2246c95821f2aafe/Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs#L307-L322 https://github.com/dotnet/reactive/blob/57e4c425f16712c00818373b2246c95821f2aafe/Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs#L79-L111 The following snippet concerns me:This means that after the task completes, it will continue on the task scheduler instead of where you came from.
I find this behavior very unexpected, especially since this is used internally in
StartAsync
which is used byFromAsync
:Actual Output
Expected Output
What makes this worse is the line
scheduler ??= ImmediateScheduler.Instance;
which means that there's a race condition where if the underlying task is finished, it won't finish the thread but otherwise it does:Details
5.0.0 or 6.0.0
Target Framework: net6.0-windows10.0.19041 (with net6.0 too but then
ObserveOnDispatcher
is missing)