Open seanreid1201 opened 1 year ago
The inner call to Subscribe
here catches any exceptions that occur during the attempt to create the subscription, and passes them on to the observer by calling its OnError
method.
However, in this case, that error is being dropped because the inner subscription is getting cancelled before the error reaches it.
Why is that inner exception getting cancelled? Well when your SingleValueSubject.Subscribe
calls Subscribe
on the IObservable<int>
returned by Return(1)
, that ends up wrapping its observer in a 'safe' observer, the main purpose of which is to automatically unsubscribe if the real observer's OnNext
throws an exception. So the sequence of events is this:
Return
operator produces its valueSubscribe
method (the 2nd one called by your code)Subscribe
method catches the exception and reports it to the observer's OnError
OnError
now has nowhere to goThe Subscribe
method thinks it has handled the exception by reporting it to the observer. But the observer is no longer listening - it unsubscribed because of the exception.
So I think there's a bit of a disconnect here between two different parts of the system. The safe subscription mechanism presumes that by delivering the error to its observer, it has done its work. But the safe observer system shuts everything down automatically in the case of an error, so the error that it allows out of its OnNext
ends up having nowhere to go.
And I think what makes this an odd case is that we the subscription failure is happening inside the OnNext
of the very observer that was going to be notified of the subscription failure. So you've actually got two things failing at once:
It's this combination that's problematic here, because it was the job of the (failed) observer to observe the failed subscription. (In a sense, it's reminiscent of throwing an exception in your exception handler: you lose the original exception if you do that. I know it's not quite the same, but the basic principle is that your error handler has failed, and that's why error handling isn't working.)
I'm going to look at some of the other issues that people have reported around exception handling, e.g. #1256 because I think this needs a coherent approach rather than a piecemeal response, and having only recently taken the reins of this project, I don't think I fully understand the original design thinking around this sort of error handling, and I want to make sure I've groked that before I either change anything, or offer advice on how to deal with this sort of problem
The basic idea of these safe mechanisms is to ensure that if exceptions occur from unexpected cases, subscriptions get shut down correctly, rather than being left in indeterminate states.
So I'll come back to this once I've reviewed some related issues.
OK, I've looked into this in more detail. First, note that the issue you've described—an exception being thrown but not observed—can be reproduced with a slightly simpler example:
var wrapped = new SingleValueSubject();
var threw = false;
wrapped.AsObservable().Subscribe(
_ =>
{
threw = true;
throw new Exception();
});
Console.WriteLine(threw);
What's interesting is that when I tried out some alternative implementations that seem like they should do the same thing, they each delivered one of two different behaviours. Your SingleValueSubject
is an IObservable<int>
that produces a single value (1) and then completes. If we replace it with this, which is also a source that produces a single value then completes:
var wrapped = new BehaviorSubject<int>(1).Take(1); // Option 2
then we still get the same behaviour: the exception appears to be swallowed. (So this shows that we don't even need nested subscription—with those two changes there's now only a single call to Subscribe
but we still see this exception swallowing.) However, if we replace the source with this:
var wrapped = Observable.Return(1); // Option 3
then the exception emerges like you'd expect. That's particularly surprising because that version seems very close to what your SingleValueSubject
does internally:
public IDisposable Subscribe(IObserver<int> observer) =>
Observable.Return(1).Subscribe(observer);
But the reason Option 3 works despite looking very similar to the code in SingleValueSubject
is that Rx takes a different code path when it recognizes that the source for the AsObservable
is an Rx-supplied implementation. (Specifically, it's looking for an internal interface, IProducer<T>
, which the Return
operator implements but which your SingleValueSubject
can't because IProducer<T>
is internal
to System.Reactive
.)
So why, then does Option 2 not also work as you expect? The Take
operator also implements IProducer<T>
. However, it turns out that BehaviorSubject<T>
does not. So although subjects are all implemented by System.Reactve
, it turns out that the subscribe logic doesn't recognize BehaviorSubject<T>
as being a special Rx type, so it effectively gets treated in the same way as your SingleValueSubject
: both are effectively treated as non-internal implementations of IObserver<T>
.
So here's the combination of factors that results in the behaviour you have observed:
Subscribe
call returnsIObserver<T>
implementationOnNext
caused by 1For example, in Option 2
above, the BehaviorSubject<int>
satisfies both 1 and 2, the use of AsObservable()
satisfies 3, and the callback you supply to Subscribe
satisfies 4.
In fact, the use of Take(1)
also satisfies 3, so if we're using Option 2
, we can also drop the call to AsObservable()
:
var wrapped = new BehaviorSubject<int>(1).Take(1);
wrapped.Subscribe( ... as before
That will still swallow the exception.
If we have neither the Take
nor AsObservable
:
var wrapped = new BehaviorSubject<int>(1);
wrapped.Subscribe( ... as before
we are no longer satisfying 3, so exceptions are no longer swallowed.
The reason Option 3
above also doesn't swallow exceptions is that although we satisfy 1, 3, and 4, we no longer satisfy 2.
so this provides an answer of sorts to your question:
is there a rule we should follow to avoid this happening in our codebase in future?
Yes: avoid creating all four of the conditions listed above. Condition 3 probably requires a little more explanation, although you could summarize it as "most Rx operators". The majority of Rx operators detect when their observers are not some known Rx-defined type, in which case they add exception handling to ensure that all resources are freed up when exceptions emerge unexpectedly from application code such as OnNext/Error/Completed
. Subscriptions are typically torn down automatically when that happens. There are a handful that don't do this because they don't need to. (E.g., the simple form of Return
doesn't, which is why, if you remove both the Take
and the AsObservable
from the equation, you no longer see the exception being swallowed—you've stopped satisfying condition 3.)
In practice, it's hard to avoid condition 3 most of the time, so you would need to look at 1, 2, or 4.
For example, if you modify your SingleValueSubject
like this:
class SingleValueSubject : IObservable<int>
{
public IDisposable Subscribe(IObserver<int> observer) =>
Observable.Return(1, CurrentThreadScheduler.Instance).Subscribe(observer);
}
to use the form of Return
that takes a scheduler, this will end up running the work of generating the single item through a scheduler, which in this example happens to defer it long enough that we no longer satisfy condition 1.
Arguably the best bet is always to avoid satisfying condition 4. There are many scenarios where Rx does just swallow exceptions that emerge from application-supplied code such as the IObserver<T>
methods or various callbacks. It can certainly be argued (and in https://github.com/dotnet/reactive/issues/1256 it has been argued) that this is inconsistent. However, it's an established fact at this point. Changing Rx at this point always to report exceptions that emerge from user code (even in the awkward cases where there's no apparent place to send them any more because all relevant observers have unsubscribed) would be a breaking change.
This leads to your condition:
If this is intended behaviour
and your assertion:
An exception which is thrown should not be caught and suppressed by Rx.
Well there are certainly known cases in which exceptions are caught and suppressed by Rx. For example, if the callback for a Select
operator happens to be in progress when someone calls Dispose
on the relevant subscription, and then that callback throws an exception after the call to Subscribe
has returned, there isn't anywhere obvious for us to send the exception. Arguably Rx should cause the process to crash at this point, but it hasn't done that for the last 10 years, and it's too late to change.
So in the general case, it is not true to say that Rx should never catch and suppress exceptions. It has done so by design in some scenarios for years, and even though we might consider that to be in some sense 'wrong', it would be a breaking change to change that.
By the way, it's worth noting that Rx does actually allow this exception to pass through your code. The exception emerges from the Subscribe
call you make inside SingleValueSubject
. If you modified the code like this you'd be able to see that happening:
class SingleValueSubject : IObservable<int>
{
public IDisposable Subscribe(IObserver<int> observer)
{
bool noError = false;
try
{
IDisposable result = Observable.Return(1).Subscribe(observer);
noError = true;
return result;
}
finally
{
if (!noError)
{
Console.WriteLine("Observable.Return(1).Subscribe threw exception; observer has been muted");
}
}
}
}
So if you had put a exception handling in the right place you would have seen it.
But what's slightly weird is that if you just let the exception out unhandled (like your code does), then it won't then emerge further up the stack.
The big question now is: is that a bug? I've identified the 4 conditions which, if met, will cause this behaviour to be observed, but is that the intended behaviour?
Although I've argued that in general we can't say that all exceptions must go unswallowed because there are specific cases where we swallow them by design, it's not clear to me that this is one of those "by design" scenarios (such as when application code has already unsubscribed all observers, and exceptions are emerging in a place where there's nothing above us on the stack to catch it).
This particular combination strikes me as a bit weird, and it's not obvious to me that it is what was intended.
It occurs because the 'safe' wrapping results in the application-supplied observer essentially being disabled. It gets wrapped in something called a 'sink', and failures that occur in an OnNext
call cause that sink to 'mute' itself, so that it won't forward on any more notifications to application code. But in the case where the OnNext
that fails happens inside a call to Subscribe
, it defeats the attempts of the Subscribe
logic to report the failure to the application-supplied OnError
.
That feels like an accident to me. In cases where you don't have this particular alignment of conditions, the error will be visible to the application.
But my question to you would be: given the description of how to avoid this, how important would changing the behaviour here be to you? I don't see an obvious straightforward fix here, so changing this is likely to change other things in unintended ways possibly leading to a backwards compatibility failure.
I could argue that when you throw from OnNext
, there are few guarantees because you're not really meant to do that, so although the behaviour here is weird, there isn't really a clearly-defined correct behaviour in any case.
And although that might not be a terribly satisfying answer, it might be less bad than causing other unintended behaviour changes in an attempt to change the behaviour when these particular conditions arise.
So unless people see this as a major issue, I'm inclined to leave the behaviour as it is. What do you think?
Thanks a lot for the detailed reply. It has given us a much better understanding of what's going on and what we can do to avoid this in our codebase in future.
I'll go through your list of four points and mention how they pertain to our specific use case.
In our application, we are mostly using observables to model things that could be described as "a value that can change", which is the first item on the list of things to avoid. Trying to change our design to avoid that would be impractical.
Skipping to the third point, as you say it too is impractical to avoid.
For the fourth point, our software is mostly written such that exceptions are not thrown in the first place, or where they are used, they are handled as locally as possible. In general, we do not expect exceptions to be thrown in observables at all. In other words, we already (try to) do point 4.
However, pretty much by definition all software has bugs, and so there will be times that we do throw an exception in a subscription to an observable and don't catch it. What we expect in that scenario is that, because nothing catches the exception, it bubbles up to the application level handler and terminates the process. This where we first observed this behaviour - from a bug that threw an exception that we did not expect to be thrown. In fact, we would likely classify the failure to crash as more severe than the original bug, because the application has now been left in an indeterminate state.
So the only thing we can do is to avoid point 2.
We took a look through the Rx source to see what it does regarding the special IProducer<T>
type. We saw that where that type is handled as a special case, so is ObservableBase<T>
. Going back to the original code I posted, if I change the SingleValueSubject
to inherit from ObservableBase<int>
, then the test fails as expected:
private class SingleValueSubject
: ObservableBase<int>,
IObservable<int>
{
protected override IDisposable SubscribeCore(IObserver<int> observer) =>
Observable.Return(1).Subscribe(observer);
}
So if we can simply say as a rule in our codebase, "All implementations of IObservable<T>
must do so by inheriting from ObservableBase<T>
", then that is easy to understand, easy to enforce, and prevents us encountering point 2 on the list, and we can avoid seeing exceptions disappearing in future.
Regarding your final questions, personally I'd like it if all exceptions not caught by my code just made the application crash, but clearly there are other users of the library and a lot of code out in the wild which that would break. So given that we have a workable solution I wouldn't complain if you were to close this as "as designed" or "won't fix" or similar.
Thanks again for taking the time to look into this.
I think it would be a stretch to describe this as being "as designed" because, having re-read what I wrote above, I don't think anyone intended this to be the behaviour.
On reflection, although it would be a breaking change to start propagating exceptions in places we currently swallow them, I think there's an argument that data loss is worse than breaking backwards compatibility. As you said:
we would likely classify the failure to crash as more severe than the original bug, because the application has now been left in an indeterminate state.
The fact that Rx conceals exceptions could lead to an application continuing to run after its internal state has become compromised. That does seem a serious enough problem to warrant a breaking change.
So I think I should see if this can be fixed in a way that doesn't look like to disrupt Rx's behaviour in non-exception scenarios. But if we do introduce this fix, I think it would make sense to do so in v7.0, because although I do think the current behaviour wasn't intended (and is therefore a bug of sorts) I think it's too big a change in behaviour to introduce without a major version bump. So I've provisionally put this on the v7.0 milestone.
(I'm not sure yet how we'd handle the cases where there's no obvious place for an unhandled exception to emerge. If I'm going to deal with this problem comprehensively, that also needs addressing, but I think it would entail something like a global event that provides a way to discover such problems.)
Since raising this bug we've added static analysis to our codebase to ensure that all types implementing IObservable<T>
do so by inheriting from ObservableBase<T>
. This has (we thought) eliminated all the cases where Rx swallows exceptions.
However we have recently found a new place where it happens, using only Rx methods.
As mentioned in the thread above, the Subject
types in Rx are neither ObservableBase
or IProducer
, and are therefore prone to swallowing exceptions. At first this doesn't seem like a problem because they are not intended for use in production code (and are documented as such). However it turns out that they are used to implement various of the reactive operators. And this means that some of the commonly used operators will just swallow exceptions. For example:
var observable = Observable.Create<int>(
observer =>
{
throw new Exception();
return Disposable.Empty;
});
observable.Subscribe(); // throws, because the exception bubbles out
var replay = observable.Replay();
replay.Connect();
replay.Subscribe(); // doesn't throw, because the ReplaySubject swallows it.
It seems pretty strange and unintuitive that just adding a Replay()
removes exception propagation.
There appear to be various methods in addition to Replay
that would exhibit this behaviour due to the use of Subject
, including Publish, PublishLast, and FromEvent.
If this bug is to be fixed in v7, would it be possible to make SubjectBase<T>
inherit from ObservableBase<T>
in that version as well? This would fix the code snippet above such that the exception bubbles out in both cases, and would also fix the numerous other extension methods in Rx that internally use Subjects.
Bug
System.Reactive version 5.0.0
Windows 10, .NET 6, Visual Studio 2022
We have a large codebase which was originally full of observable-like things. In the process of converting these to be observables, we came across a scenario where an exception was thrown which should have propagated out as nobody caught it, but instead it was suppressed silently by the Rx library. I've reduced the problem to a minimal example below.
An exception which is thrown should not be caught and suppressed by Rx.
The exception is suppressed.
Here's a unit test which should fail but instead passes:
I've tried searching for this but haven't come across anything. If this is intended behaviour, is there a rule we should follow to avoid this happening in our codebase in future?