ReactiveX / RxJava

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Apache License 2.0
47.83k stars 7.6k forks source link

NullPointerException when exception occurs in map() on Flowable created with Flowable.fromPublisher #5639

Closed mofleury closed 6 years ago

mofleury commented 6 years ago

Hello,

Disclaimer : I am pretty new to rxJava, so this error may be due to a misuse on my side. Still, I did not find documentation about this, and after a little bit of research, I am now thinking that there may be something wrong in the library.

Some details

Here is my use case :

What I expect : exceptions thrown in the transformation phase are received in the error handler provided at subscription What I see : in case of error, A NullpointerException is thrown from BasicFuseableSubscriber

I looked a little in the source code, and it appears that BasicFuseableSubscriber is expecting to have an active subscription in its fail method, but this subscription is assigned in onSubscribe method, which is never called in my case. Here is a code snippet to reproduce the case :

    @Test
    public void demonstrateBasicFuseableSubscriberNullPointerException() {

        Flowable.fromPublisher((Publisher<String>) subscriber -> subscriber.onNext("hello"))
                .map(m -> {
                    throw new IllegalArgumentException("don't know what to do");
                })
                .subscribe(System.out::println, Throwable::printStackTrace);

    }

and the associated stack trace:

java.lang.NullPointerException
    at io.reactivex.internal.subscribers.BasicFuseableSubscriber.fail(BasicFuseableSubscriber.java:110)
    at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:66)
    at com.swissquote.findata.apps.feed.ArchiveFileReader.lambda$readArchive$0(ArchiveFileReader.java:26)
    at io.reactivex.internal.operators.flowable.FlowableFromPublisher.subscribeActual(FlowableFromPublisher.java:29)
    at io.reactivex.Flowable.subscribe(Flowable.java:12995)
    at io.reactivex.internal.operators.flowable.FlowableMap.subscribeActual(FlowableMap.java:38)
    at io.reactivex.Flowable.subscribe(Flowable.java:12995)
    at io.reactivex.Flowable.subscribe(Flowable.java:12931)
    at io.reactivex.Flowable.subscribe(Flowable.java:12852)
        at .... Test.demonstrateBasicFuseableSubscriberNullPointerException

It is possible that my Publisher is not correct, i.e. that it should call onSubscribe somehow, but I don't have a subscription object to pass it, so I suppose that onSubscribe is supposed to be called automatically when I subscribe to the flowable. In any case, if this is due to a bad usage, some more helpful error message should probably be provided by the library.

I hope that this report contains enough information, I will do my best to provide more if I can, thank you!

akarnokd commented 6 years ago

Flowable.fromPublisher is meant to interoperate with other libraries and not for creating Flowables from scratch. In this case, you are responsible to follow the Reactive-Streams specification. The crash happens because you didn't call subscriber.onSubscribe. You should use Flowable.create() instead.

mofleury commented 6 years ago

OK, it makes more sense now.

I tried to think a little on how to avoid such mistakes for newbies, but no real idea, as it looks like it would introduce lots of complexity to detect... Maybe just a hint in the javadoc that points to Observable.create?

Thanks for the quick answer!

artem-zinnatullin commented 6 years ago

See #5640 (cross-linking for discoverability)

mofleury commented 6 years ago

wow, thanks for taking taking this feedback into account!