apache / pekko-connectors

Apache Pekko Connectors is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Apache Pekko.
https://pekko.apache.org/
Apache License 2.0
64 stars 32 forks source link

MqttSessionSpec: MQTT server connector should produce a duplicate publish on the server given two client connections #454

Closed raboof closed 9 months ago

raboof commented 9 months ago

Appears to be flaky. Seen before #148 #46

[info] MQTT server connector
[info] - should flow through a server session (68 milliseconds)
[info] - should receive two subscriptions for the same topic (38 milliseconds)
[info] - should unsubscribe a server session (131 milliseconds)
[info] - should reply to a ping request (38 milliseconds)
[info] - should close when no ping request received (1 second, 542 milliseconds)
[info] - should notify on disconnect (248 milliseconds)
[info] - should re-connect given connect, subscribe, disconnect, connect, publish (35 milliseconds)
[info] - should re-connect given connect, subscribe, connect again, publish (41 milliseconds)
[info] - should consume a duplicate publish on the server (27 milliseconds)
[info] - should produce a duplicate publish on the server given two client connections *** FAILED *** (15 seconds, 469 milliseconds)
[info]   java.lang.AssertionError: assertion failed: expected no StreamSupervisor children, but got [Actor[pekko://mqtt-spec/system/Materializers/StreamSupervisor-4/flow-320-0-ignoreSink#600515596], Actor[pekko://mqtt-spec/system/Materializers/StreamSupervisor-4/flow-338-0-ignoreSink#392179906], Actor[pekko://mqtt-spec/system/Materializers/StreamSupervisor-4/flow-327-0-unnamed#-375044653], Actor[pekko://mqtt-spec/system/Materializers/StreamSupervisor-4/flow-328-0-unnamed#-28601930], Actor[pekko://mqtt-spec/system/Materializers/StreamSupervisor-4/flow-337-0-unnamed#-269928412], Actor[pekko://mqtt-spec/system/Materializers/StreamSupervisor-4/flow-329-0-unnamed#1296919811], Actor[pekko://mqtt-spec/system/Materializers/StreamSupervisor-4/flow-319-0-unnamed#363979158], Actor[pekko://mqtt-spec/system/Materializers/StreamSupervisor-4/flow-311-0-unnamed#-1039162337], Actor[pekko://mqtt-spec/system/Materializers/StreamSupervisor-4/flow-309-0-unnamed#-702717299], Actor[pekko://mqtt-spec/system/Materializers/StreamSupervisor-4/flow-310-0-unnamed#233619764]]
[info]   at scala.Predef$.assert(Predef.scala:223)
[info]   at org.apache.pekko.stream.testkit.scaladsl.StreamTestKit$.$anonfun$assertNoChildren$2(StreamTestKit.scala:64)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info]   at org.apache.pekko.testkit.TestKitBase.poll$2(TestKit.scala:342)
[info]   at org.apache.pekko.testkit.TestKitBase.awaitAssert(TestKit.scala:359)
[info]   at org.apache.pekko.testkit.TestKitBase.awaitAssert$(TestKit.scala:331)
[info]   at org.apache.pekko.testkit.TestKit.awaitAssert(TestKit.scala:982)
[info]   at org.apache.pekko.stream.testkit.scaladsl.StreamTestKit$.$anonfun$assertNoChildren$1(StreamTestKit.scala:61)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info]   at org.apache.pekko.testkit.TestKitBase.within(TestKit.scala:429)
[info]   at org.apache.pekko.testkit.TestKitBase.within$(TestKit.scala:416)
[info]   at org.apache.pekko.testkit.TestKit.within(TestKit.scala:982)
[info]   at org.apache.pekko.testkit.TestKitBase.within(TestKit.scala:444)
[info]   at org.apache.pekko.testkit.TestKitBase.within$(TestKit.scala:444)
[info]   at org.apache.pekko.testkit.TestKit.within(TestKit.scala:982)
[info]   at org.apache.pekko.stream.testkit.scaladsl.StreamTestKit$.assertNoChildren(StreamTestKit.scala:61)
[info]   at org.apache.pekko.stream.testkit.scaladsl.StreamTestKit$.assertAllStagesStopped(StreamTestKit.scala:43)
[info]   at docs.scaladsl.MqttSessionSpec.$anonfun$new$155(MqttSessionSpec.scala:1921)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.wordspec.AnyWordSpecLike$$anon$3.apply(AnyWordSpecLike.scala:1240)
[info]   at org.apache.pekko.stream.connectors.testkit.scaladsl.LogCapturing.withFixture(LogCapturing.scala:70)
[info]   at org.apache.pekko.stream.connectors.testkit.scaladsl.LogCapturing.withFixture$(LogCapturing.scala:66)
[info]   at docs.scaladsl.MqttSessionSpec.withFixture(MqttSessionSpec.scala:45)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.invokeWithFixture$1(AnyWordSpecLike.scala:1238)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.$anonfun$runTest$1(AnyWordSpecLike.scala:1250)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.runTest(AnyWordSpecLike.scala:1250)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.runTest$(AnyWordSpecLike.scala:1232)
[info]   at docs.scaladsl.MqttSessionSpec.runTest(MqttSessionSpec.scala:45)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.$anonfun$runTests$1(AnyWordSpecLike.scala:1309)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:431)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:390)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:427)
[info]   at scala.collection.immutable.List.foreach(List.scala:431)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.runTests(AnyWordSpecLike.scala:1309)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.runTests$(AnyWordSpecLike.scala:1308)
[info]   at docs.scaladsl.MqttSessionSpec.runTests(MqttSessionSpec.scala:45)
[info]   at org.scalatest.Suite.run(Suite.scala:1114)
[info]   at org.scalatest.Suite.run$(Suite.scala:1096)
[info]   at docs.scaladsl.MqttSessionSpec.org$scalatest$wordspec$AnyWordSpecLike$$super$run(MqttSessionSpec.scala:45)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.$anonfun$run$1(AnyWordSpecLike.scala:1354)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.run(AnyWordSpecLike.scala:1354)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.run$(AnyWordSpecLike.scala:1352)
[info]   at docs.scaladsl.MqttSessionSpec.org$scalatest$BeforeAndAfterAll$$super$run(MqttSessionSpec.scala:45)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at docs.scaladsl.MqttSessionSpec.run(MqttSessionSpec.scala:45)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info]   at sbt.TestRunner.runTest$1(TestFramework.scala:153)
[info]   at sbt.TestRunner.run(TestFramework.scala:168)
[info]   at sbt.TestFramework$$anon$3$$anonfun$$lessinit$greater$1.$anonfun$apply$1(TestFramework.scala:336)
[info]   at sbt.TestFramework$.sbt$TestFramework$$withContextLoader(TestFramework.scala:296)
[info]   at sbt.TestFramework$$anon$3$$anonfun$$lessinit$greater$1.apply(TestFramework.scala:336)
[info]   at sbt.TestFramework$$anon$3$$anonfun$$lessinit$greater$1.apply(TestFramework.scala:336)
[info]   at sbt.TestFunction.apply(TestFramework.scala:348)
[info]   at sbt.Tests$.processRunnable$1(Tests.scala:475)
[info]   at sbt.Tests$.$anonfun$makeSerial$1(Tests.scala:481)
[info]   at sbt.std.Transform$$anon$3.$anonfun$apply$2(Transform.scala:47)
[info]   at sbt.std.Transform$$anon$4.work(Transform.scala:69)
[info]   at sbt.Execute.$anonfun$submit$2(Execute.scala:283)
[info]   at sbt.internal.util.ErrorHandling$.wideConvert(ErrorHandling.scala:24)
[info]   at sbt.Execute.work(Execute.scala:292)
[info]   at sbt.Execute.$anonfun$submit$1(Execute.scala:283)
[info]   at sbt.ConcurrentRestrictions$$anon$4.$anonfun$submitValid$1(ConcurrentRestrictions.scala:265)
[info]   at sbt.CompletionService$$anon$2.call(CompletionService.scala:65)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info]   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]   at java.lang.Thread.run(Thread.java:750)
pjfanning commented 9 months ago

We've reached a point where this test hardly ever passes in CI build

mdedetrich commented 9 months ago

We've reached a point where this test hardly ever passes in CI build

Indeed, it used by ~1 in 30 would pass but I think its closer to 1 in 100 (if that)