ReactiveX / RxJava

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Apache License 2.0
47.88k stars 7.61k forks source link

concatMap throw an exception if applied after a custom operator (Rx 2.x) #5557

Closed jcornaz closed 7 years ago

jcornaz commented 7 years ago

Hello,

We've got some trouble when using concatMap when a custom operator is used in the upstream.

I think a code example worth more than thousands words. So let's define a very simple custom operator that emits downstream the items.

import io.reactivex.Observable;
import io.reactivex.ObservableOperator;
import io.reactivex.Observer;
import io.reactivex.observers.DisposableObserver;

public class MyOperator implements ObservableOperator<Integer, Integer> {

  public static void main(String[] args) {
    Observable.range(0, 10)
        .lift(new MyOperator())
        .concatMap(Observable::just)
        .subscribe(System.out::println);
  }

  @Override
  public Observer<? super Integer> apply(Observer<? super Integer> observer) throws Exception {
    return new DisposableObserver<Integer>() {

      @Override
      public void onNext(Integer s) {
        observer.onNext(s);
      }

      @Override
      public void onError(Throwable e) {
        observer.onError(e);
      }

      @Override
      public void onComplete() {
        observer.onComplete();
      }
    };
  }
}

If you run the main function you will get a stack trace like that :

Exception in thread "main" java.lang.NullPointerException
    at io.reactivex.internal.operators.observable.ObservableConcatMap$SourceObserver.onNext(ObservableConcatMap.java:128)
    at MyOperator$1.onNext(MyOperator.java:21)
    at MyOperator$1.onNext(MyOperator.java:17)
    at io.reactivex.internal.operators.observable.ObservableRange$RangeDisposable.run(ObservableRange.java:64)
    at io.reactivex.internal.operators.observable.ObservableRange.subscribeActual(ObservableRange.java:35)
    at io.reactivex.Observable.subscribe(Observable.java:10838)
    at io.reactivex.internal.operators.observable.ObservableLift.subscribeActual(ObservableLift.java:57)
    at io.reactivex.Observable.subscribe(Observable.java:10838)
    at io.reactivex.internal.operators.observable.ObservableConcatMap.subscribeActual(ObservableConcatMap.java:52)
    at io.reactivex.Observable.subscribe(Observable.java:10838)
    at io.reactivex.Observable.subscribe(Observable.java:10824)
    at io.reactivex.Observable.subscribe(Observable.java:10727)
    at MyOperator.main(MyOperator.java:12)

Note : RxJava version is 2.1.1

akarnokd commented 7 years ago

You have to follow the Observable protocol:

onSubscribe oNext* (onError|onComplete)?

jcornaz commented 7 years ago

I think I do.

The example above only delegates the calls from the downstream observers to upstream. As long as the downstream does follow the observable protocol, the custom operator does it as well.

akarnokd commented 7 years ago

The example above only delegates the calls from the downstream observers to upstream.

The source is called upstream and the consumer is the downstream direction.

I think I do.

Show me where you call observer.onSubscribe(....) in MyOperator?

jcornaz commented 7 years ago

Actually my operator inherit from DisposableObserver, and I thought that was the purpose of this class, because I don't have to (and cannot) implement "onSubscribe" anymore, and also because the example given in the "Learning RxJava book" by Thomas Nield was written the same way.

But you are right, overriding DisposableObserver.onStart() by calling observer.onSubscribe() solves the problem.

May I suggest to make the method DisposableObserver.onStart() abstract to enforce its implementation ? (as it seems to be necessary)

Thank you for your help.

akarnokd commented 7 years ago

"Learning RxJava book" by Thomas Nield

Yep, I've noticed this error after the book was released; the review version looked different and there is only one round of review per chapter thus it slipped through.

May I suggest to make the method DisposableObserver.onStart() abstract

The class is not meant to be the basis for an operator implementation but being a base for end-consumers. Operator writers are expected to implement Observer & Disposable where the onSubscribe is still overridable.

jcornaz commented 7 years ago

Although, we don't understand why the concatMap throws an exception if the custom operator has not the same type upstream and downstream.

Example :

public class MyOperator implements ObservableOperator<String, Integer> {

  public static void main(String[] args) {
    Observable.range(0, 10)
        .lift(new MyOperator())
        .concatMap(Observable::just)
        .blockingSubscribe(System.out::println);
  }

  @Override
  public Observer<? super Integer> apply(Observer<? super String> observer) throws Exception {
    return new Observer<Integer>() {

      @Override
      public void onSubscribe(Disposable d) {
        observer.onSubscribe(d);
      }

      @Override
      public void onNext(Integer i) {
        observer.onNext(Integer.toString(i));
      }

      @Override
      public void onError(Throwable e) {
        observer.onError(e);
      }

      @Override
      public void onComplete() {
        observer.onComplete();
      }
    };
  }
}

Throws :

io.reactivex.exceptions.OnErrorNotImplementedException: java.lang.Integer cannot be cast to java.lang.String
    at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
    at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
    at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:74)
    at io.reactivex.internal.util.NotificationLite.acceptFull(NotificationLite.java:291)
    at io.reactivex.internal.operators.observable.ObservableBlockingSubscribe.subscribe(ObservableBlockingSubscribe.java:63)
    at io.reactivex.internal.operators.observable.ObservableBlockingSubscribe.subscribe(ObservableBlockingSubscribe.java:103)
    at io.reactivex.Observable.blockingSubscribe(Observable.java:5057)
    at MyOperator.main(MyOperator.java:13)
Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
    at io.reactivex.internal.operators.observable.ObservableConcatMap$SourceObserver.drain(ObservableConcatMap.java:213)
    at io.reactivex.internal.operators.observable.ObservableConcatMap$SourceObserver.onSubscribe(ObservableConcatMap.java:103)
    at MyOperator$1.onSubscribe(MyOperator.java:22)
[...]

And in this last example, as far as we understand, the observable protocol is respected. Including the onSubscribe.

We've found a workaround by specifying a scheduler with "subscribeOn" in the chain. But we don't fully understand why is that happening.

akarnokd commented 7 years ago

OnErrorNotImplementedException: this means when you consume via subscribe or blockingSubscribe, you have to specify the onError Consumer.

jcornaz commented 7 years ago

Yes. But the problem is not the OnErrorNotImplementedException. The problem is the cause error (ClassCastException).

If you prefer replace the main by :

  public static void main(String[] args) {
    Observable.range(0, 10)
        .lift(new MyOperator())
        .concatMap(Observable::just)
        .subscribe(
            System.out::println,
            Throwable::printStackTrace
        );
  }

and you will get :

java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
    at io.reactivex.internal.operators.observable.ObservableConcatMap$SourceObserver.drain(ObservableConcatMap.java:213)
    at io.reactivex.internal.operators.observable.ObservableConcatMap$SourceObserver.onSubscribe(ObservableConcatMap.java:103)
    at MyOperator$1.onSubscribe(MyOperator.java:25)
    at io.reactivex.internal.operators.observable.ObservableRange.subscribeActual(ObservableRange.java:34)
    at io.reactivex.Observable.subscribe(Observable.java:10838)
    at io.reactivex.internal.operators.observable.ObservableLift.subscribeActual(ObservableLift.java:57)
    at io.reactivex.Observable.subscribe(Observable.java:10838)
    at io.reactivex.internal.operators.observable.ObservableConcatMap.subscribeActual(ObservableConcatMap.java:52)
    at io.reactivex.Observable.subscribe(Observable.java:10838)
    at io.reactivex.Observable.subscribe(Observable.java:10824)
    at io.reactivex.Observable.subscribe(Observable.java:10753)
    at MyOperator.main(MyOperator.java:13)
akarnokd commented 7 years ago

This is not allowed: observer.onSubscribe(d);

jcornaz commented 7 years ago

Ah ok. Thank you for that enlightenment. All is clear now.