reactor / reactor-core

Non-Blocking Reactive Foundation for the JVM
http://projectreactor.io
Apache License 2.0
4.99k stars 1.21k forks source link

Flaky test - FluxBlackboxProcessorVerification #3742

Open chemicL opened 8 months ago

chemicL commented 8 months ago

The following test failed during CI in 3.4.x

Gradle suite > Gradle test > reactor.core.publisher.tck.FluxBlackboxProcessorVerification > required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling FAILED
    java.lang.AssertionError: Publisher signalled [16] elements, which is more than the signalled demand: 15 expected [true] but found [false]
        at org.testng.Assert.fail(Assert.java:110)
        at org.testng.Assert.failNotEquals(Assert.java:1413)
        at org.testng.Assert.assertTrue(Assert.java:56)
        at org.reactivestreams.tck.PublisherVerification$25.run(PublisherVerification.java:978)
        at org.reactivestreams.tck.PublisherVerification.activePublisherTest(PublisherVerification.java:1135)
        at org.reactivestreams.tck.PublisherVerification.required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling(PublisherVerification.java:936)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:136)
        at org.testng.internal.invokers.TestInvoker.invokeMethod(TestInvoker.java:658)
        at org.testng.internal.invokers.TestInvoker.invokeTestMethod(TestInvoker.java:219)
        at org.testng.internal.invokers.MethodRunner.runInSequence(MethodRunner.java:50)
        at org.testng.internal.invokers.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:923)
        at org.testng.internal.invokers.TestInvoker.invokeTestMethods(TestInvoker.java:192)
        at org.testng.internal.invokers.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:146)
        at org.testng.internal.invokers.TestMethodWorker.run(TestMethodWorker.java:128)
        at java.util.ArrayList.forEach(ArrayList.java:1259)
        at org.testng.TestRunner.privateRun(TestRunner.java:808)
        at org.testng.TestRunner.run(TestRunner.java:603)
        at org.testng.SuiteRunner.runTest(SuiteRunner.java:429)
        at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:423)
        at org.testng.SuiteRunner.privateRun(SuiteRunner.java:383)
        at org.testng.SuiteRunner.run(SuiteRunner.java:326)
        at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
        at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:95)
        at org.testng.TestNG.runSuitesSequentially(TestNG.java:1249)
        at org.testng.TestNG.runSuitesLocally(TestNG.java:1169)
        at org.testng.TestNG.runSuites(TestNG.java:1092)
        at org.testng.TestNG.run(TestNG.java:[1060](https://github.com/reactor/reactor-core/actions/runs/8186085531/job/22383745779#step:5:1062))
        at org.gradle.api.internal.tasks.testing.testng.TestNGTestClassProcessor.runTests(TestNGTestClassProcessor.java:146)
        at org.gradle.api.internal.tasks.testing.testng.TestNGTestClassProcessor.stop(TestNGTestClassProcessor.java:91)
        at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:62)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
        at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
        at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
        at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
        at com.sun.proxy.$Proxy2.stop(Unknown Source)
        at org.gradle.api.internal.tasks.testing.worker.TestWorker$3.run(TestWorker.java:193)
        at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
        at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
        at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
        at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
        at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:113)
        at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:65)
        at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
        at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
Test class reactor.core.publisher.tck.FluxGenerateVerification
Test class reactor.core.publisher.tck.FluxSwitchOnFirstVerification

Your Environment

injae-kim commented 8 months ago

May I handle this issue?

chemicL commented 8 months ago

Be my guest @injae-kim :)

injae-kim commented 8 months ago

Thank you! I'll investigate the cause of failed test and create fix PR soon :)

injae-kim commented 8 months ago
  @Override @Test
  public void required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable {
    // the publisher is able to signal more elements than the subscriber will be requesting in total
    final int publisherElements = 20;

    final int demand1 = 10;
    final int demand2 = 5;
    final int totalDemand = demand1 + demand2;

    activePublisherTest(publisherElements, false, new PublisherTestRun<T>() {
      @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
      public void run(Publisher<T> pub) throws Throwable {
        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);

        sub.request(demand1);
        sub.request(demand2);
        sub.nextElement();
        sub.cancel();
...
          // if the Publisher tries to emit more elements than was requested (and/or ignores cancellation) this will throw
          assertTrue(onNextsSignalled <= totalDemand,
                     String.format("Publisher signalled [%d] elements, which is more than the signalled demand: %d",
                                   onNextsSignalled, totalDemand));

I checked that required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling TCK test requests 15 elements to upstream and then cancel it, finally assertThat(onNextsSignalled <= 15)

image

https://github.com/reactor/reactor-core/blob/e6d6aebe6eb9d7b6257f112a070163cbc60a3b1b/reactor-core/src/tckTest/java/reactor/core/publisher/tck/FluxBlackboxProcessorVerification.java#L45-L48

(I'm not sure) but on our FluxBlackboxProcessorVerification with parallel, I think publisher can produce element that larger than 15 as you can see on above screen shot.

So I think publisher can produce total 16 items (e.g. 1 -> 3 -> 6 -> .. -> 20 (15th) -> Complete(16th)) and TCK test can be failed!

So I suggest to fix our FluxBlackboxProcessorVerification#ftransformFlux to always produce 15 items properly. WDYT?

chemicL commented 8 months ago

It looks like it's not the test configuration that is broken but some operator along doesn't honour the demand. If the TCK's Subscriber demands 15 items, any configuration that we come up with should not deliver more items. So it seems that at least one operator along the way is broken with that regard. Instead of changing the pipeline to avoid this problematic outcome, perhaps it's best identifying which operator delivers more than was demanded. Some race occurs which doesn't properly synchronise delivery with demand.

injae-kim commented 8 months ago

It's best identifying which operator delivers more than was demanded. Some race occurs which doesn't properly synchronise delivery with demand.

I agree! good point. I think I found some clue on above investigation, so I'll check it more and share the root cause to you soon.

Thank you for your guide!