rsocket / rsocket-cpp

C++ implementation of RSocket
http://rsocket.io
Apache License 2.0
253 stars 99 forks source link

Handle exceptions thrown from map operators #768

Closed alexmalyshev closed 7 years ago

alexmalyshev commented 7 years ago

If the function passed to a map operator throws, then we should cancel the subscription, and pass the exception as the onError to the downstream (unless this is a Single, where we shouldn't cancel the subscription).

Improves the SingleOperator termination semantics, which haven't kept up with ObservableOperator and FlowableOperator.

alexmalyshev commented 7 years ago

I believe we should move the try/catch to the yarpl::flowable::Subscriber.

If we receive any exception from the user code, it should be caught and onError should be called, instead of propagating the error up.

On top of calling onError on the downstream, we also want to cancel the upstream when this happens because we're an operator. This can't really be done from the flowable::Subscriber interface.

EDIT: Thinking more about it, we totally can call cancel() on the Subscriber's subscription. Might just work...

alexmalyshev commented 7 years ago

I think we should base the general throwing from Subscriber::onNext() case in the SafeSubscriber/SubscriberBase implementation that @dymk is working on. I think it'd look something like:

void SafeSubscriber::onNext(T item) {
  if (auto const sub = subscription_.load()) {
    SUBSCRIBER_KEEP_SELF();
    try {
      onNextImpl(std::move(item));
    } catch (const std::exception& exn) {
      if (auto const sub = subscription_.exchange(nullptr)) {
        folly::exception_wrapper ew{std::current_exception(), exn};
        sub->cancel();
        onErrorImpl(std::move(ew));
      }
    }
  }
}

We can add this case for map() in the meantime, or wait for SafeSubscriber.