reactive-streams / reactive-streams-jvm

Reactive Streams Specification for the JVM
http://www.reactive-streams.org/
MIT No Attribution
4.79k stars 527 forks source link

onError(Throwable) vs cancel() #271

Closed gregw closed 8 years ago

gregw commented 9 years ago

@viktorklang Bringing this discussion back here from http://cs.oswego.edu/pipermail/concurrency-interest/2015-June/014275.html

I understand your comments about the expense of bidirectional communication and general acknowledgements. However, I also think that leaving the establishment of a back flow for such things as recycling outside the standard is leaving a bit of a hole for interoperability. But let me raise that separately.

For now, I want to understand more about the error handling, specifically in a chain of Processors. For any particular processor, it is possible that something can go wrong and when it does both ends of the RS need to be informed. Currently the API allows for errors to be propagated by calling onError(Throwable) down stream and cancel() upstream.

This is sufficient for shutting down the stream, but my question is why is the exception passed down not up? Essentially the stream can be shutdown without passing the exception in either direction and the throwable is not needed:

Subscriber{
  ...
  void cancel();
}

Subscription{
  ...
  void onError();
}

So given that as a conceptual starting point, can the usability of the API be improved by telling one end or the other the details of a problem?

I'm struggling to think of an exception that the ultimate subscriber can use to improve it's implementation. All it really needs to know is that the stream is not completing normally.

On the other hand, I can think of several examples where passing the exception upstream to the origin publisher will help the usability of the API. If a publisher is delivering an unsuitable object (it is incomplete, illegal state, can't be serialized, contains types that have no json convertor, don't match DB schema etc.), then telling that publisher what is the problem will be valuable in helping them correct their Publisher. Sure the exception can be logged by either the Processor or the ultimate Subscriber, but that might be on a remote system to the Publisher and they may not see the log.

So if a Publisher is sending an unsuitable item, all they will see is a cancel call in return and no explanation why.

So if the exception is to be propagated, sending it upstream seams to me more appropriate than sending it downstream. But not opposed to sending it both ways:

Subscriber{
  ...
  void onError(Thowable);
}

Subscription{
  ...
  void onError(Throwable);
}

With the spec having suitable protections about looping errors.

viktorklang commented 9 years ago

@sbordet This is already possible by either using Publisher-Subscriber pairs or a Processor.

With respect to the current length of this thread and everyone's time, let's continue the, in my opinion, more interesting discussion on how to model using RS in #288

Cheers, √

sbordet commented 9 years ago

@viktorklang

a Processor won't fit the case because its semantic is typically to maintain the stream direction: takes from the left, emits to the right.

The feedback approach above, instead, has a Subscriber that takes from the left, but the feedback Publisher emits to the left, not to the right like a Processor would.

In any case, if you can outline how a Publisher/Subscriber pair or Processor would work in #288 that would be awesome.

Thanks !

maxwindiff commented 9 years ago

@gregw

Taking the REST API as an example again. Say if my application uses a REST client to talk to some service. If the client failed, there are actually a good number of things it can do to facilitate troubleshooting: it can throw an exception, log the reason of error or record an error metric, any of which may trigger some programmatic handling or human intervention. Even if there is no way to transmit an error back within the REST call, it does not prevent me from building an out-of-band error notification channel. I think the same can be said for a RS Subscriber.

Sidetrack: for your logging concerns, I think incompatible logging mechanisms isn't an real issue in modern web applications. Applications are usually configured with logging facades / adapters / aggregators to output all logs in a consistent way.

As for whether there is value in specifying a minimal error reporting mechanism to provide interoperability – maybe there is, but why it has to be part of reactive-streams?

benjchristensen commented 9 years ago

Finally catching up with this thread and want to weigh in. Hopefully my late contributions are of value to clarify and help resolve this discussion.

tl;dr – I agree with @viktorklang on this matter.

Purpose of Cancel

Cancel is not for signaling errors upstream. It is for cancelling. Most common use is stopping a data stream and releasing resources. Think stream.take(10). Cancelling on an error is just a consumer canceling interest, regardless of whether that reason is due to an error or not. The Publisher should not care why the cancel occurred. It's just a signal the Subscriber can send to the Publisher to terminate if not already terminated. If the Publisher is already terminated, then the cancel signal is unnecessary (since the Publisher already terminated itself and thus could have performed its own cleanup).

Use of onError

Error propagation down has many uses, same as try/catch with conditional handling of different Throwable types, including branching logic, retries and default data.

In RxJava apps we do error handling in our stream composition all the time (see https://github.com/ReactiveX/RxJava/wiki/Error-Handling for more concrete examples). The onError signal allows equivalent functionality of catch statements in synchronous imperative code.

For example, resuming with default data:

stream.map(item -> doStuff(item)).onErrorResumeNext(exception -> defaultData).subscribe(…)

Or retrying with backoff (http://reactivex.io/RxJava/javadoc/rx/Observable.html#retryWhen(rx.functions.Func1)):

stream.map(item -> doStuff(item)).retryWhen(attempts -> {
      return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
          System.out.println("delay retry by " + i + " second(s)");
          return Observable.timer(i, TimeUnit.SECONDS);
      }); 
  })
 .subscribe(…)

Either of those can do conditional error handling by inspecting the Throwable it receives using instanceof or even looking at the Throwable.getMessage() or getCause() data. Here is an example of retryWhen with conditional logic: https://gist.github.com/benjchristensen/fde2e7d2dad2c746a449#file-retrywhentestsconditional-java-L20

Years of Practical Evidence

There are over 5 years of precedence for variants of this type signature (onError down, cancel/unsubscribe/dispose up) being used in production systems with Rx.Net and RxJS, and I personally have used it for 3+ years with RxJava. In all that time, sending an error message upstream has never been desirable, or even wished for. Add to that the past year of many people using the Reactive Streams API as it approached and hit v1.0. That doesn't mean duplex error signals aren't sometimes needed – they indeed sometimes are – but it does demonstrate that those use cases have been addressed without strong reasoning to change the Publisher (Reactive Streams & JDK 9 Flow) /Observable (Rx) type signature.

I consider it important to have stable and proven API usage by many implementations before attempting a standardization, which is why the Rx (Reactive Extensions) and subsequent Reactive Streams efforts and many implementations with years of use are an important signal in deciding that this API does work without sharp edges.

Errors Don't Propagate Upstream in the Iterable Dual

A comment by @abersnaze above brought up the comparison with an Iterable and also InputStream. The Reactive Streams Publisher can be considered the dual of an Iterable (ignoring the batching of request(n) for now). This is a good way of explaining why cancel doesn’t pass a Throwable.

Consider a consumer pulling data from an Iterable, that has an exception while processing data. Is it going to pass that exception “up” to the Iterable? Even if the Iterable exposed such an API, what is the Iterable going to do about it?

List<Integer> myIterable = Arrays.asList(1, 2, 3);
Iterator<Integer> myIterator = myIterable.iterator();
while(myIterator.hasNext()) {
    try {
        doStuff(myIterator.next());
    } catch(Exception e) {
        // Choose to continue ... or re-throw with a detailed message,
        // but the error goes "down", not "up" to the Iterable.
        // Errors are not sent "up" at this point, even if it re-throws the exception here, 
        // it goes up the stack, not up to the Iterable behind the Iterator.
    }
}

Similarly, errors are sent downstream because the consumer can choose what to do with it. The upstream Publisher can’t do anything with it – it certainly can't resume sending onNext data down, as it has already emitted an error. As some have stated above, I agree that errors propagating upstream are either (a) a “bug” to be dealt with out-of-band after the Subscriber receives an error it can't do anything about, or (b) a use case for a full bidirectional channel which could be modeled with 2 unidirectional Reactive Streams (which I have successfully used to model a full duplex channel).

Summary

I recognize that some use cases exist where bi-directional error handling is useful, but Reactive Streams is not trying to be a full-duplex channel. It is bi-directional for control-flow reasons, but unidirectional for data – and errors via onError are just another form of data. Both cancel and request(n) flowing upstream are for control flow (cancellation and rate limiting).

If a full duplex channel is needed, then either a completely different type is needed, or two Reactive Stream Publisher instances going both directions can be used.

In short, I am not convinced that the Reactive Streams Publisher should be changed to support both cancel() and cancel(Throwable) semantics.

gregw commented 9 years ago

@benjchristensen I remain unconvinced this is generally the case for an API proposed to be included in Java9.... but I have to accept that I've been unable to convince anybody with experience of the API to even have a smidge of doubt that the abstraction is correct.

So unless the final points (below) convince somebody (anybody???) please close this issue and I'll use what time I have available (which is not very much at the moment) to lookat how the existing API can be used for the servlets and still handle error without needing a built in logging mechanism.

But to take the iterator example, to me, the parallel would be if the contract for the iterator interface did not allow any runtime exceptions to be thrown. Instead if a implementation of an iterator suffered from an internal exception, then it would required to just return false on the hasNext() call. In order for a user to work out what the problem is, they would first have to notice that it should not have returned false and then go look for a log message, hoping the reason was logged somewhere.

The parallel for the proposed solution to handle such issues is that Collections must create pairs of iterators, one providing the items and the other iterating over any exceptions that were created by bugs detected by the other iterator implementation. Of course this is never going to happen, because nobody expects bugs and they are not going to setup another iterator just to look at exceptions that are not expected.

I think I would be happier if the contract for onNext allowed it to throw IllegalStateException if it was called in request(0) state and to throw IllegalArgumentException if it believes the publisher has published something that it should not have (wrong type, bad validation , etc.) That way, third party & utility processors could be developed without the need to pick a logging mechanism (just as I can implement Iterator and communicate out bugs without resorting to logging).

Finally, it is one thing for an API to be obvious to its developers and those who have used it for 5 years. It is another thing entirely for it to be part of the standard language library, where it needs to obvious how to use it correctly.

I keep coming back to my experience as a pair of fresh eyes developing my first Subscriber. The very first onNext method I wrote started with a decrement of the request count, which I tested to make sure it did not go negative. I immediately got stuck because if it went negative I wanted to throw an IllegalStateException, but that is not allowed in the RS spec. OK I thought, this is like async Completion handler, I'll be able to pass the exception out with a callback.... Nope, all I have is cancel(). So what do I do with it? Nothing? Log it? It was a real WTF moment and has stalled my adoption of the API.. for an exception that should never happen!

I can now see that there are constructs that can be made to handle such exceptions within the paradigm - eg creating a RS that is like a completion handler that observes the RS passing the real data. I think such solutions can even be neat and cool - but they are not immediately obvious and for most implementations, developers are just not going to bother.

I'm betting that once this API is in Java9, that most onNext implementations will just throw a runtime exceptions and be damned.... thus breaking the spec from day one... but it is bug handling, so nobody will care. In fact it is neigh impossible to write a compliant onNext() because it will always be possible for it to throw a OOME or similar. Bugs will manifest as NPEs and the publishers are just going to have to deal with Runtime exceptions coming out of onNext().

Perhaps I'm trying too hard to respect the spec and I should just give up and also through runtime exceptions for such cases.

anyway, enough of my bitching about this.... I'll see if I can live with it the API as in in the other issue about servlet usage.... apologies for slow/no progress there, as I'm time poor at the moment.