smallrye / smallrye-reactive-streams-operators

Implementation of the MicroProfile Reactive Streams Operators specification
https://www.smallrye.io/smallrye-reactive-streams-operators/
Apache License 2.0
20 stars 9 forks source link

Race exposed by RS TCK: PublisherVerification.required_spec109_mustIssueOnSubscribeForNonNullSubscriber(PublisherVerification.java:477) #63

Closed hutchig closed 5 years ago

hutchig commented 5 years ago

We run a LOT of batch tests on Liberty which includes smallrye reactive streams operators 1.0.0.

One (only one) of these many test runs threw up:

runAllTckTests:
      java.lang.NullPointerException: Actually not but can't throw other exceptions due to RS
at io.reactivex.Flowable.subscribe(Flowable.java:14645)
at io.reactivex.internal.operators.flowable.FlowableSkipWhile.subscribeActual(FlowableSkipWhile.java:32)
at io.reactivex.Flowable.subscribe(Flowable.java:14636)
at io.reactivex.Flowable.subscribe(Flowable.java:14586)
at io.smallrye.reactive.streams.utils.WrappedProcessor.subscribe(WrappedProcessor.java:26)
at org.reactivestreams.tck.PublisherVerification$11.run(PublisherVerification.java:483)
at org.reactivestreams.tck.PublisherVerification.activePublisherTest(PublisherVerification.java:1138)
at org.reactivestreams.tck.PublisherVerification.required_spec109_mustIssueOnSubscribeForNonNullSubscriber(PublisherVerification.java:477)
at org.reactivestreams.tck.IdentityProcessorVerification.required_spec109_mustIssueOnSubscribeForNonNullSubscriber(IdentityProcessorVerification.java:311)

Test is here: https://github.com/reactive-streams/reactive-streams-jvm/blob/master/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java
::required_spec109_mustIssueOnSubscribeForNonNullSubscriber Calling:

    @BackpressureSupport(BackpressureKind.SPECIAL)
    @SchedulerSupport(SchedulerSupport.NONE)
    public final void subscribe(FlowableSubscriber<? super T> s) {
        ObjectHelper.requireNonNull(s, "s is null");
        try {
            Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);

            ObjectHelper.requireNonNull(z, "The RxJavaPlugins.onSubscribe hook returned a null FlowableSubscriber. Please check the handler provided to RxJavaPlugins.setOnFlowableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(z);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Subscription has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");  //<<<<<<<<<<<<<<<<<<<   This is the line that trips........lets chase the npe.initCause(e) 'e' done
            npe.initCause(e);
            throw npe;
        }
    }

Looking into the Exception in the 'cause' 'e' we see a problem in a Processor's subscribe method call:

[err] java.lang.IllegalStateException: Illegal transition - subscribe happened in the COMPLETE state
[err]   at io.smallrye.reactive.streams.utils.ConnectableProcessor.subscribe(ConnectableProcessor.java:61)
[err]   at [internal classes]
[err]   at org.reactivestreams.tck.PublisherVerification$11.run(PublisherVerification.java:483)
[err]   at org.reactivestreams.tck.PublisherVerification.activePublisherTest(PublisherVerification.java:1138)
[err]   at org.reactivestreams.tck.PublisherVerification.required_spec109_mustIssueOnSubscribeForNonNullSubscriber(PublisherVerification.java:477)
[err]   at org.reactivestreams.tck.IdentityProcessorVerification.required_spec109_mustIssueOnSubscribeForNonNullSubscriber(IdentityProcessorVerification.java:311)
[err]   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

This is doubly interesting as if it was single threaded it would be logically impossible:

/*[0]*/  if (!state.compareAndSet(State.IDLE, State.HAS_SUBSCRIBER)) {
            // We were not in the idle state, the behavior depends on our current state
            // For failure and completed, we just creates an empty subscription and immediately
            // report the error or completion
            if (state.get() == State.FAILED) {
                manageSubscribeInFailedState(subscriber);
/*[1]*/         } else if (state.get() == State.COMPLETE) {   //<------------------------------------------------------------------- the stage.get().name() in message below comes out as this State.COMPLETE value
                manageSubscribeInCompleteState(subscriber);
            } else if (state.get() == State.HAS_SUBSCRIPTION) {
                manageSubscribeInTheHasSubscriptionState(subscriber);
            } else {
/*[2]*/             throw new IllegalStateException("Illegal transition - subscribe happened in the "  <---------------- Message from here
                        + state.get().name() + " state");
            }
        }

So the state is changed to what it looks like what it would tolerate on another thread but the state change is only observed on this thread at [2] and certainly is NOT observable at point [1] as we have gone past the check for the state being State.COMPLETE and failed it, only for the print out of the tested value a few lines below show it would suceed if retested as the state IS now State.COMPLETE.

I understand this is just testing a TCK corner where a completed Subsciber is re-subscribed once it is finished and the 'correct' behaviour does this:

   private void manageSubscribeInCompleteState(Subscriber<? super T> subscriber) {
        subscriber.onSubscribe(new EmptySubscription());
        subscriber.onComplete();
    }

public class EmptySubscription implements Subscription {
    @Override
    public void request(long n) {
        // Ignored.
    }

    @Override
    public void cancel() {
        // Ignored.
    }
}

So this is not a problem that will impact customers as far as I can tell - just really picky TCKs!

I will have a rummage about in the code to see if I can see a good fix but not today most likely.

hutchig commented 5 years ago

This one failure occurred 1 out of 3 times on Windows 8 for this build instance and passed on the other 179 platforms that this build instance's level of code was tested against. Also it has not occurred on any of the previous levels of code on any platform.