akka / alpakka-kafka

Alpakka Kafka connector - Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/libraries/alpakka-kafka/current/
Other
1.42k stars 386 forks source link

Missing default testkit config breaks simple integration tests that use Alpakka Kafka Testkit #771

Open seglo opened 5 years ago

seglo commented 5 years ago

Versions used

Akka version: 2.5.20 Alpakka Kafka version: 1.0.1

Expected Behavior

Using Alpakka Kafka Testkit with defaults should run a simple integration test.

Actual Behavior

Several Akka Testkit and Alpakka Kafka Testkit settings are required to run relatively simple integration tests. I've noticed that the following configuration is necessary to run most tests out of the box (this can be found in /tests/src/test/resources/application.conf).

akka {
  test {
    single-expect-default = 10s
  }

  kafka.consumer {
    stop-timeout = 10ms
  }
}

For example, running akka.kafka.scaladsl.IntegrationSpec, "produce to plainSink and consume from plainSource". When akka.test.single-expect-default is not defined at something greater than than the default of 3s I get the following error.

assertion failed: timeout (3 seconds) during expectMsg while waiting for OnNext(1)
java.lang.AssertionError: assertion failed: timeout (3 seconds) during expectMsg while waiting for OnNext(1)
    at scala.Predef$.assert(Predef.scala:223)
    at akka.testkit.TestKitBase.expectMsg_internal(TestKit.scala:402)
    at akka.testkit.TestKitBase.expectMsg(TestKit.scala:379)
    at akka.testkit.TestKitBase.expectMsg$(TestKit.scala:379)
    at akka.testkit.TestKit.expectMsg(TestKit.scala:896)
    at akka.stream.testkit.TestSubscriber$ManualProbe.$anonfun$expectNextN$1(StreamTestKit.scala:397)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at akka.stream.testkit.TestSubscriber$ManualProbe.expectNextN(StreamTestKit.scala:397)
    at akka.kafka.scaladsl.IntegrationSpec.$anonfun$new$3(IntegrationSpec.scala:55)
    at akka.stream.testkit.scaladsl.StreamTestKit$.assertAllStagesStopped(StreamTestKit.scala:30)
    at akka.kafka.scaladsl.IntegrationSpec.$anonfun$new$2(IntegrationSpec.scala:43)
    at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
    at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    at org.scalatest.Transformer.apply(Transformer.scala:22)
    at org.scalatest.Transformer.apply(Transformer.scala:20)
    at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:1078)
    at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
    at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
    at akka.kafka.scaladsl.SpecBase.withFixture(SpecBase.scala:13)
    at org.scalatest.WordSpecLike.invokeWithFixture$1(WordSpecLike.scala:1076)
    at org.scalatest.WordSpecLike.$anonfun$runTest$1(WordSpecLike.scala:1088)
    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
    at org.scalatest.WordSpecLike.runTest(WordSpecLike.scala:1088)
    at org.scalatest.WordSpecLike.runTest$(WordSpecLike.scala:1070)
    at akka.kafka.scaladsl.SpecBase.runTest(SpecBase.scala:13)
    at org.scalatest.WordSpecLike.$anonfun$runTests$1(WordSpecLike.scala:1147)
    at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
    at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:373)
    at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:410)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
    at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:379)
    at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
    at org.scalatest.WordSpecLike.runTests(WordSpecLike.scala:1147)
    at org.scalatest.WordSpecLike.runTests$(WordSpecLike.scala:1146)
    at akka.kafka.scaladsl.SpecBase.runTests(SpecBase.scala:13)
    at org.scalatest.Suite.run(Suite.scala:1147)
    at org.scalatest.Suite.run$(Suite.scala:1129)
    at akka.kafka.testkit.scaladsl.ScalatestKafkaSpec.org$scalatest$BeforeAndAfterAll$$super$run(ScalatestKafkaSpec.scala:11)
    at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
    at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
    at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
    at akka.kafka.scaladsl.SpecBase.org$scalatest$WordSpecLike$$super$run(SpecBase.scala:13)
    at org.scalatest.WordSpecLike.$anonfun$run$1(WordSpecLike.scala:1192)
    at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
    at org.scalatest.WordSpecLike.run(WordSpecLike.scala:1192)
    at org.scalatest.WordSpecLike.run$(WordSpecLike.scala:1190)
    at akka.kafka.scaladsl.SpecBase.run(SpecBase.scala:13)
    at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
    at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1346)
    at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1340)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1340)
    at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1031)
    at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1010)
    at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1506)
    at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
    at org.scalatest.tools.Runner$.run(Runner.scala:850)
    at org.scalatest.tools.Runner.run(Runner.scala)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:131)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)

When akka.kafka.consumer.stop-timeout is not set to something less than 5 seconds I regularly get the following exception.

Futures timed out after [5 seconds]
java.util.concurrent.TimeoutException: Futures timed out after [5 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
    at scala.concurrent.Await$.$anonfun$result$1(package.scala:219)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:57)
    at scala.concurrent.Await$.result(package.scala:146)
    at akka.stream.testkit.scaladsl.StreamTestKit$.printDebugDump(StreamTestKit.scala:68)
    at akka.stream.testkit.scaladsl.StreamTestKit$.$anonfun$assertNoChildren$1(StreamTestKit.scala:56)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at akka.testkit.TestKitBase.within(TestKit.scala:360)
    at akka.testkit.TestKitBase.within$(TestKit.scala:348)
    at akka.testkit.TestKit.within(TestKit.scala:896)
    at akka.testkit.TestKitBase.within(TestKit.scala:374)
    at akka.testkit.TestKitBase.within$(TestKit.scala:374)
    at akka.testkit.TestKit.within(TestKit.scala:896)
    at akka.stream.testkit.scaladsl.StreamTestKit$.assertNoChildren(StreamTestKit.scala:47)
    at akka.stream.testkit.scaladsl.StreamTestKit$.assertAllStagesStopped(StreamTestKit.scala:31)
    at akka.kafka.scaladsl.IntegrationSpec.$anonfun$new$2(IntegrationSpec.scala:43)
    at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
    at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    at org.scalatest.Transformer.apply(Transformer.scala:22)
    at org.scalatest.Transformer.apply(Transformer.scala:20)
    at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:1078)
    at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
    at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
    at akka.kafka.scaladsl.SpecBase.withFixture(SpecBase.scala:13)
    at org.scalatest.WordSpecLike.invokeWithFixture$1(WordSpecLike.scala:1076)
    at org.scalatest.WordSpecLike.$anonfun$runTest$1(WordSpecLike.scala:1088)
    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
    at org.scalatest.WordSpecLike.runTest(WordSpecLike.scala:1088)
    at org.scalatest.WordSpecLike.runTest$(WordSpecLike.scala:1070)
    at akka.kafka.scaladsl.SpecBase.runTest(SpecBase.scala:13)
    at org.scalatest.WordSpecLike.$anonfun$runTests$1(WordSpecLike.scala:1147)
    at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
    at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:373)
    at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:410)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
    at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:379)
    at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
    at org.scalatest.WordSpecLike.runTests(WordSpecLike.scala:1147)
    at org.scalatest.WordSpecLike.runTests$(WordSpecLike.scala:1146)
    at akka.kafka.scaladsl.SpecBase.runTests(SpecBase.scala:13)
    at org.scalatest.Suite.run(Suite.scala:1147)
    at org.scalatest.Suite.run$(Suite.scala:1129)
    at akka.kafka.testkit.scaladsl.ScalatestKafkaSpec.org$scalatest$BeforeAndAfterAll$$super$run(ScalatestKafkaSpec.scala:11)
    at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
    at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
    at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
    at akka.kafka.scaladsl.SpecBase.org$scalatest$WordSpecLike$$super$run(SpecBase.scala:13)
    at org.scalatest.WordSpecLike.$anonfun$run$1(WordSpecLike.scala:1192)
    at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
    at org.scalatest.WordSpecLike.run(WordSpecLike.scala:1192)
    at org.scalatest.WordSpecLike.run$(WordSpecLike.scala:1190)
    at akka.kafka.scaladsl.SpecBase.run(SpecBase.scala:13)
    at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
    at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1346)
    at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1340)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1340)
    at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1031)
    at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1010)
    at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1506)
    at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
    at org.scalatest.tools.Runner$.run(Runner.scala:850)
    at org.scalatest.tools.Runner.run(Runner.scala)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:131)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)

Notes

I realize such timing issues can have different results depending on an invidiual machine's resources, but given this configuration is set by default for the Alpakka Kafka tests, and that it's easy to recreate the same errors without the config in a sample project I think it might make sense to include these defaults to users using the testkit.

I suggest including this default config when users run tests using the Alpakka Kafka Testkit, or to highlight this config in the documentation.

ennru commented 5 years ago

Thank you for this suggestion. The resolution of config doesn't allow us to put an application.conf to overwrite other settings in Alpakka Kafka. I've however added some text to the documentation about stop-timeout: https://github.com/akka/alpakka-kafka/blob/master/docs/src/main/paradox/testing.md#testing-from-java-code

seglo commented 5 years ago

@ennru Right, I thought that might be a problem.

Those docs are a big improvement! But I think it's still a little vague about the two suggestions I made in this issue. I suspect most users developing new integration tests a la Alpakka Kafka tests style will run into these exact two issues every time, because there's no inheritance of configuration that works. Do you mind if I could write a PR to include more details in the testing section of the docs?

ennru commented 5 years ago

Configuration can be inherited on config level:

MyConsumerConfigInTests: ${akka.kafka.consumer} {
  stop-timeout = 2s
}

But please, go ahead and suggest improvements to the docs.