apache / pekko

Build highly concurrent, distributed, and resilient message-driven applications using Java/Scala
https://pekko.apache.org/
Apache License 2.0
1.17k stars 139 forks source link

ForkJoinPoolStarvationSpec #661

Open He-Pin opened 11 months ago

He-Pin commented 11 months ago
- must not starve tasks arriving from external dispatchers under high internal traffic *** FAILED *** (6 seconds, 14 milliseconds)
[09-20 10:18:16.027] [info]   java.lang.AssertionError: assertion failed: timeout (6 seconds) during expectMsg while waiting for All fine
[09-20 10:18:16.027] [info]   at scala.Predef$.assert(Predef.scala:223)
[09-20 10:18:16.027] [info]   at org.apache.pekko.testkit.TestKitBase.expectMsg_internal(TestKit.scala:472)
[09-20 10:18:16.028] [info]   at org.apache.pekko.testkit.TestKitBase.expectMsg(TestKit.scala:449)
[09-20 10:18:16.028] [info]   at org.apache.pekko.testkit.TestKitBase.expectMsg$(TestKit.scala:449)
[09-20 10:18:16.028] [info]   at org.apache.pekko.testkit.TestKit.expectMsg(TestKit.scala:982)
[09-20 10:18:16.028] [info]   at org.apache.pekko.dispatch.ForkJoinPoolStarvationSpec.$anonfun$new$6(ForkJoinPoolStarvationSpec.scala:78)
[09-20 10:18:16.028] [info]   at org.apache.pekko.dispatch.ForkJoinPoolStarvationSpec.$anonfun$new$6$adapted(ForkJoinPoolStarvationSpec.scala:75)
[09-20 10:18:16.028] [info]   at scala.collection.immutable.Range.foreach(Range.scala:158)
[09-20 10:18:16.028] [info]   at org.apache.pekko.dispatch.ForkJoinPoolStarvationSpec.$anonfun$new$2(ForkJoinPoolStarvationSpec.scala:75)
[09-20 10:18:16.029] [info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[09-20 10:18:16.029] [info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[09-20 10:18:16.029] [info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[09-20 10:18:16.029] [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[09-20 10:18:16.029] [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[09-20 10:18:16.029] [info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[09-20 10:18:16.029] [info]   at org.scalatest.wordspec.AnyWordSpecLike$$anon$3.apply(AnyWordSpecLike.scala:1239)
[09-20 10:18:16.029] [info]   at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
[09-20 10:18:16.030] [info]   at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
[09-20 10:18:16.030] [info]   at org.apache.pekko.testkit.PekkoSpec.withFixture(PekkoSpec.scala:80)
[09-20 10:18:16.030] [info]   at org.scalatest.wordspec.AnyWordSpecLike.invokeWithFixture$1(AnyWordSpecLike.scala:1237)
[09-20 10:18:16.030] [info]   at org.scalatest.wordspec.AnyWordSpecLike.$anonfun$runTest$1(AnyWordSpecLike.scala:1249)
[09-20 10:18:16.030] [info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[09-20 10:18:16.030] [info]   at org.scalatest.wordspec.AnyWordSpecLike.runTest(AnyWordSpecLike.scala:1249)
[09-20 10:18:16.030] [info]   at org.scalatest.wordspec.AnyWordSpecLike.runTest$(AnyWordSpecLike.scala:1231)
[09-20 10:18:16.031] [info]   at org.apache.pekko.testkit.PekkoSpec.runTest(PekkoSpec.scala:80)
[09-20 10:18:16.031] [info]   at org.scalatest.wordspec.AnyWordSpecLike.$anonfun$runTests$1(AnyWordSpecLike.scala:1308)
[09-20 10:18:16.031] [info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[09-20 10:18:16.031] [info]   at scala.collection.immutable.List.foreach(List.scala:431)
[09-20 10:18:16.031] [info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[09-20 10:18:16.031] [info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:390)
[09-20 10:18:16.031] [info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:427)
[09-20 10:18:16.031] [info]   at scala.collection.immutable.List.foreach(List.scala:431)
[09-20 10:18:16.032] [info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[09-20 10:18:16.032] [info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[09-20 10:18:16.032] [info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[09-20 10:18:16.032] [info]   at org.scalatest.wordspec.AnyWordSpecLike.runTests(AnyWordSpecLike.scala:1308)
[09-20 10:18:16.032] [info]   at org.scalatest.wordspec.AnyWordSpecLike.runTests$(AnyWordSpecLike.scala:1307)
[09-20 10:18:16.032] [info]   at org.apache.pekko.testkit.PekkoSpec.runTests(PekkoSpec.scala:80)
[09-20 10:18:16.032] [info]   at org.scalatest.Suite.run(Suite.scala:1114)
[09-20 10:18:16.032] [info]   at org.scalatest.Suite.run$(Suite.scala:1096)
[09-20 10:18:16.033] [info]   at org.apache.pekko.testkit.PekkoSpec.org$scalatest$wordspec$AnyWordSpecLike$$super$run(PekkoSpec.scala:80)
[09-20 10:18:16.033] [info]   at org.scalatest.wordspec.AnyWordSpecLike.$anonfun$run$1(AnyWordSpecLike.scala:1353)
[09-20 10:18:16.033] [info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[09-20 10:18:16.033] [info]   at org.scalatest.wordspec.AnyWordSpecLike.run(AnyWordSpecLike.scala:1353)
[09-20 10:18:16.033] [info]   at org.scalatest.wordspec.AnyWordSpecLike.run$(AnyWordSpecLike.scala:1351)
[09-20 10:18:16.033] [info]   at org.apache.pekko.testkit.PekkoSpec.org$scalatest$BeforeAndAfterAll$$super$run(PekkoSpec.scala:80)
[09-20 10:18:16.033] [info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[09-20 10:18:16.033] [info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[09-20 10:18:16.034] [info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[09-20 10:18:16.034] [info]   at org.apache.pekko.testkit.PekkoSpec.run(PekkoSpec.scala:80)
[09-20 10:18:16.034] [info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[09-20 10:18:16.034] [info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[09-20 10:18:16.034] [info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[09-20 10:18:16.034] [info]   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
[09-20 10:18:16.034] [info]   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
[09-20 10:18:16.034] [info]   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
[09-20 10:18:16.035] [info]   at java.base/java.lang.Thread.run(Thread.java:1583)

refs: https://github.com/apache/incubator-pekko/actions/runs/6247109120/job/16959068675#step:6:3238

if an actor exhausted its throughput, should then call externalSubmit? JDK 8: externalPush JDK 21: externalSubmit

Current jdk21 usage:

        } else if (s == YIELDING) {   // Thread.yield
            setState(RUNNABLE);

            // notify JVMTI that unmount has completed, thread is runnable
            notifyJvmtiUnmount(/*hide*/false);

            // external submit if there are no tasks in the local task queue
            if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
                externalSubmitRunContinuation(ct.getPool()); 
            } else {
                submitRunContinuation();
            }
        }

https://endoflife.date/java https://javaalmanac.io

He-Pin commented 11 months ago

https://github.com/apache/incubator-pekko/actions/runs/6247109120/job/16959069348#step:6:3153

He-Pin commented 11 months ago

refs: https://github.com/openjdk/jdk/pull/11319

He-Pin commented 11 months ago

@jrudolph You was doing some design on this, would you like to work on this to call externalSubmit etc thing after some iteration?

pjfanning commented 6 months ago

this seems to fail most of the time in the 1.0.x branch but not in the main branch (or at least is rare there)